In [1]:
import pandas as pd
import numpy as np
import random

  from pandas.core import (


In [2]:
# 设置随机种子以确保结果可复现
np.random.seed(42)

In [3]:
def generate_users_and_teachers(n_users=40000, n_teachers=100):
    # 生成老师信息
    teacher_ids = range(1000, 1000 + n_teachers)
    teacher_groups = np.random.choice(["A", "B"], n_teachers, p=[0.5, 0.5])  # 随机分配教师组
    teachers = pd.DataFrame({
        "teacher_id": teacher_ids,
        "teacher_group": teacher_groups,
    })

    # 生成用户信息
    user_ids = range(1, n_users + 1)
    cities = ["一线", "新一线", "二线", "三线", "四线", "五线"]
    city_tiers = np.random.choice(cities, n_users, p=[0.1, 0.2, 0.25, 0.2, 0.15, 0.1])
    enrollment_dates = np.random.choice(["2024-01-06", "2024-01-07"], n_users, p=[0.5, 0.5])
    traffic_sources = np.random.choice(["主流渠道", "次级渠道"], n_users, p=[0.7, 0.3])

    # 分配老师确保每位老师一期只带200名学生
    teacher_assignments = {tid: {"2024-01-06": [], "2024-01-07": []} for tid in teachers['teacher_id']}
    
    for user_id, enrollment_date in zip(user_ids, enrollment_dates):
        available_teachers = [
            tid for tid in teacher_assignments
            if len(teacher_assignments[tid][enrollment_date]) < 200
        ]
        
        # 如果没有可用老师，尝试分配到其他期次
        if not available_teachers:
            # 尝试分配到另一期
            other_date = "2024-01-07" if enrollment_date == "2024-01-06" else "2024-01-06"
            available_teachers = [
                tid for tid in teacher_assignments
                if len(teacher_assignments[tid][other_date]) < 200
            ]
            if not available_teachers:
                raise ValueError(f"No available teachers for either date. "
                                 f"Check the number of students per teacher or the number of teachers.")
            enrollment_date = other_date  # 更新期次

        # 选择当前配额最少的老师
        selected_teacher = min(available_teachers, key=lambda tid: len(teacher_assignments[tid][enrollment_date]))
        teacher_assignments[selected_teacher][enrollment_date].append(user_id)

    # 创建用户数据框
    user_data = []
    for teacher_id, assigned_users in teacher_assignments.items():
        teacher_group = teachers.loc[teachers['teacher_id'] == teacher_id, 'teacher_group'].values[0]
        for enrollment_date, users in assigned_users.items():
            for user_id in users:
                user_data.append({
                    "user_id": user_id,
                    "city": random.choice(cities),
                    "enrollment_date": enrollment_date,
                    "traffic_source": random.choice(["主流渠道", "次级渠道"]),
                    "experience_teacher_id": teacher_id,
                    "teacher_group": teacher_group
                })

    users = pd.DataFrame(user_data)
    
    return users, teachers

In [4]:
# 生成家访电话记录表 (仅限实验组)
def generate_call_logs(users, call_rate=0.8):
    calls = []
    for _, user in users.iterrows():
        if (user["teacher_group"] == "A") & (user["enrollment_date"] == '2024-01-07') :  # 仅实验组的老师进行家访电话
            if random.random() < call_rate:  # 假设 80% 实验组用户都接到家访电话
                calls.append({
                    "call_id": len(calls) + 1,
                    "teacher_id": user["experience_teacher_id"],
                    "user_id": user["user_id"],
                    "call_date": pd.to_datetime(user["enrollment_date"]) + pd.to_timedelta(np.random.randint(1, 10), unit="d"),
                    "call_duration": np.random.randint(3, 20)  # 通话时长 3-20 分钟
                })
    call_logs = pd.DataFrame(calls)
    return call_logs

In [5]:
def generate_renewals(users, teachers):
    # 为每位老师分配两个续报率：第一期（5%-10%）和第二期（10%-20%）
    teacher_renewal_rates_1 = {tid: np.random.uniform(0.05, 0.1) for tid in teachers["teacher_id"]}
    teacher_renewal_rates_2 = {tid: np.random.uniform(0.1, 0.2) for tid in teachers["teacher_id"]}

    renewals = []
    for _, user in users.iterrows():
        finish_status = np.random.choice([1, 0], p=[0.9, 0.1])  # 90% 的用户完成课程
        if finish_status == 1:
            # 根据期次和分组选择续报率
            if user["enrollment_date"] == "2024-01-06":
                renewal_rate = teacher_renewal_rates_1[user["experience_teacher_id"]]
            elif user["enrollment_date"] == "2024-01-07":
                if user["teacher_group"] == "A":
                    renewal_rate = teacher_renewal_rates_2[user["experience_teacher_id"]]
                else:
                    renewal_rate = teacher_renewal_rates_1[user["experience_teacher_id"]]
            else:
                raise ValueError(f"Unexpected enrollment_date: {user['enrollment_date']}")
            
            renewal_status = np.random.choice([1, 0], p=[renewal_rate, 1 - renewal_rate])
        else:
            renewal_status = 0

        # 如果续报成功，生成续报日期
        renewal_date = pd.to_datetime(user["enrollment_date"]) + pd.to_timedelta(np.random.randint(1, 20), unit="d") if renewal_status == 1 else None

        renewals.append({
            "user_id": user["user_id"],
            "finish_status": finish_status,
            "renewal_status": renewal_status,
            "renewal_date": renewal_date
        })

    renewals = pd.DataFrame(renewals)
    return renewals

In [6]:
# 整合实验数据
def construct_detailed_data(users, teachers, call_logs, renewals):

    # 合并用户信息和续费信息
    detailed_data = pd.merge(users, renewals, on="user_id", how="left")
    # 合并家访电话记录
    detailed_data = pd.merge(detailed_data, call_logs, on="user_id", how="left")
    # 合并教师信息
    detailed_data = pd.merge(detailed_data, teachers, left_on="experience_teacher_id", right_on="teacher_id", how="left")
    
    # 删除 detailed_data 表中多余的的 teacher_id
    detailed_data.drop('teacher_id_x',axis = 1, inplace = True)
    detailed_data.drop('teacher_id_y',axis = 1, inplace = True)
    
    detailed_data.rename(columns = {'experience_teacher_id':'teacher_id'}, inplace  = True)

    return detailed_data

In [7]:
# 生成虚拟数据
users,teachers = generate_users_and_teachers()
call_logs = generate_call_logs(users)
renewals = generate_renewals(users,teachers)

In [8]:
# 删除 users 表中的 teacher_group
users.drop('teacher_group',axis = 1, inplace = True)

In [9]:
detailed_data = construct_detailed_data(users, teachers, call_logs, renewals)

In [10]:
# 保存数据到 Excel 文件
detailed_data.to_excel('AB_data.xlsx', index = False)

# 创建一个 ExcelWriter 对象，指定保存的文件名
output_file = "generated_data.xlsx"
with pd.ExcelWriter(output_file, engine="openpyxl") as writer:
    
    users.to_excel(writer, sheet_name='users', index=False)
    call_logs.to_excel(writer, sheet_name='call_logs', index=False)
    renewals.to_excel(writer, sheet_name='renewals', index=False)
    teachers.to_excel(writer, sheet_name='teachers', index=False)