In [2]:
import pandas as pd
from faker import Faker
import random
from datetime import datetime, timedelta
from xpinyin import Pinyin
import uuid

# 初始化Faker，设置为中文
fake = Faker('zh_CN')
pinyin = Pinyin()

# 设置随机种子以确保可重复性
random.seed(42)

# 1. 生成招聘数据（保持不变）
def generate_recruitment_data():
    recruitment_data = []
    for i in range(800):
        is_full_time = '是' if i < 75 else '否'  # 前75名为正式员工
        candidate = {
            '候选人ID': str(uuid.uuid4())[:8],
            '姓名': fake.name(),
            '性别': random.choice(['男', '女']),
            '电话': fake.phone_number(),
            '邮箱': fake.email(),
            '应聘职位': random.choice(['研究员', '实验室技术员', '数据分析师', '项目经理', '质量控制专员']),
            '应聘部门': random.choice(['研发部', '临床研究部', '数据科学部', '质量保证部']),
            '面试日期': fake.date_between(start_date='-1y', end_date='today'),
            '是否正式员工': is_full_time
        }
        recruitment_data.append(candidate)
    
    df = pd.DataFrame(recruitment_data)
    df.to_csv('recruitment_data.csv', index=False, encoding='utf-8-sig')
    return df





In [3]:
# 2 generating HRIS data 
def generate_hris_data(recruitment_df):
    # 计算需要的正式员工和劳务派遣员工数量
    total_employees = 500
    regular_employees_count = int(total_employees * 0.7)  # 70% 正式员工
    dispatch_employees_count = total_employees - regular_employees_count  # 30% 劳务派遣
    
    # 尝试从'是否正式员工'为'是'的组中获取足够的正式员工
    full_time_df = recruitment_df[recruitment_df['是否正式员工'] == '是']
    
    # 如果"是"组中的员工不足，需要从"否"组中转换一些
    if len(full_time_df) < regular_employees_count:
        # 计算还需要多少正式员工
        additional_needed = regular_employees_count - len(full_time_df)
        additional_regular = recruitment_df[recruitment_df['是否正式员工'] == '否'].sample(n=additional_needed, random_state=42)
        regular_employees = pd.concat([full_time_df, additional_regular])
    else:
        # 如果有足够的正式员工，只取需要的数量
        regular_employees = full_time_df.sample(n=regular_employees_count, random_state=42)
    
    # 从剩余候选人中获取劳务派遣员工
    remaining_candidates = recruitment_df[~recruitment_df.index.isin(regular_employees.index)]
    dispatch_employees = remaining_candidates.sample(n=dispatch_employees_count, random_state=42)
    
    # 合并数据集
    hris_candidates = pd.concat([regular_employees, dispatch_employees])
    
    hris_data = []
    for _, row in hris_candidates.iterrows():
        # 根据员工在合并后的数据框中的位置确定是否为正式员工
        is_regular = row.name in regular_employees.index
        
        employee = {
            '员工ID': str(uuid.uuid4())[:8],
            '姓名': row['姓名'],
            '拼音姓名': pinyin.get_pinyin(row['姓名'], tone_marks='marks', splitter='').upper(),
            '性别': row['性别'],
            '部门': row['应聘部门'],
            '职位': row['应聘职位'],
            '入职日期': fake.date_between(start_date=row['面试日期'], end_date='today'),
            '员工类型': '正式员工' if is_regular else '劳务派遣',
            '电话': row['电话'],
            '邮箱': row['邮箱']
        }
        hris_data.append(employee)
    
    df = pd.DataFrame(hris_data)
    df.to_csv('hris_data.csv', index=False, encoding='utf-8-sig')
    return df

In [4]:
def generate_performance_data(hris_df):
    performance_data = []
    # 创建评估者列表（假设有5名评估者）
    evaluators = ['E001', 'E002', 'E003', 'E004', 'E005']
    
    for _, row in hris_df.iterrows():
        # 引入缺失值：30%的员工没有绩效评分或评语
        has_missing = random.random() < 0.3
        rating = '' if has_missing else round(random.uniform(2.5, 5.0), 1)
        comment = '' if has_missing else random.choice([
            '工作表现出色，超额完成任务目标。',
            '工作稳定，需进一步提升创新能力。',
            '工作态度积极，需加强专业技能。',
            '需提高工作效率，减少错误率。'
        ])
        # 引入数据不一致：10%的员工绩效评分与评语不匹配
        if random.random() < 0.1 and not has_missing:
            if rating >= 4.5:
                comment = '需提高工作效率，减少错误率。'  # 高分低评
            elif rating <= 3.0:
                comment = '工作表现出色，超额完成任务目标。'  # 低分高评
                
        # 引入过时数据：20%的员工评估是上一年度的
        is_old_data = random.random() < 0.2
        
        # 生成评估日期
        if is_old_data:
            # 2023年的评估
            start_month = random.randint(10, 11)
            start_day = random.randint(1, 28)
            start_date = f"2023-{start_month:02d}-{start_day:02d}"
            
            # 完成日期在开始日期之后1-3周
            completion_days = start_day + random.randint(7, 21)
            # 处理月底溢出
            if completion_days > 30:
                next_month = start_month + 1
                completion_day = completion_days - 30
                completion_date = f"2023-{next_month:02d}-{completion_day:02d}"
            else:
                completion_date = f"2023-{start_month:02d}-{completion_days:02d}"
        else:
            # 2024年的评估
            start_month = random.randint(9, 10)
            start_day = random.randint(1, 28)
            start_date = f"2024-{start_month:02d}-{start_day:02d}"
            
            # 完成日期在开始日期之后1-3周
            completion_days = start_day + random.randint(7, 21)
            # 处理月底溢出
            if completion_days > 30:
                next_month = start_month + 1
                completion_day = completion_days - 30
                completion_date = f"2024-{next_month:02d}-{completion_day:02d}"
            else:
                completion_date = f"2024-{start_month:02d}-{completion_days:02d}"
            
        # 随机选择评估者ID
        evaluator_id = random.choice(evaluators)
        
        # 引入格式问题：10%的员工评分使用不同格式（整数或文字）
        if random.random() < 0.1 and not has_missing:
            rating = random.choice(['优秀', '良好', '中等', '需改进'])  # 文字评分
            
        performance = {
            '员工ID': row['员工ID'],
            '姓名': row['姓名'],
            '部门': row['部门'],
            '职位': row['职位'],
            '评估开始日期': start_date,
            '评估完成日期': completion_date,
            '绩效评分': rating,
            '评语': comment,
            '评估者ID': evaluator_id
        }
        performance_data.append(performance)
    
    df = pd.DataFrame(performance_data)
    df.to_csv('performance_data.csv', index=False, encoding='utf-8-sig')
    return df

In [5]:
# 4. 修改生成学习管理系统数据，引入问题
def generate_learning_data(hris_df):
    learning_data = []
    courses = [
        '药物研发流程培训', '数据分析与可视化', '实验室安全规范', 
        '项目管理基础', '质量控制与合规性', '生物信息学入门'
    ]
    for _, row in hris_df.iterrows():
        num_courses = random.randint(1, 3)  # 每人1-3门课程
        for _ in range(num_courses):
            course = random.choice(courses)
            start_date = fake.date_between(start_date=row['入职日期'], end_date='today')
            
            # 引入缺失值：20%的记录缺少得分
            score = '' if random.random() < 0.2 else random.randint(60, 100)
            
            # 引入过时数据：15%的认证记录显示为过期但仍标记为有效
            completion_status = '已完成'
            if random.random() < 0.15:
                start_date = fake.date_between(start_date='-3y', end_date='-2y')  # 过期课程
                completion_status = '已完成'  # 错误地标记为有效
            
            # 引入数据不一致：10%的课程与员工职位不匹配
            if random.random() < 0.1:
                course = random.choice(['高级财务管理', '市场营销策略'])  # 与职位无关的课程
            
            learning = {
                '员工ID': row['员工ID'],
                '姓名': row['姓名'],
                '部门': row['部门'],
                '课程名称': course,
                '开始日期': start_date,
                '完成状态': completion_status,
                '得分': score
            }
            learning_data.append(learning)
        
        # 引入重复记录：5%的员工有重复的课程记录
        if random.random() < 0.05:
            duplicate = learning_data[-1].copy()  # 复制最后一条记录
            learning_data.append(duplicate)
    
    df = pd.DataFrame(learning_data)
    df.to_csv('learning_data.csv', index=False, encoding='utf-8-sig')
    return df

# 执行生成流程
if __name__ == '__main__':
    print("生成招聘数据...")
    recruitment_df = generate_recruitment_data()
    print("生成HRIS数据...")
    hris_df = generate_hris_data(recruitment_df)
    print("生成绩效管理系统数据...")
    performance_df = generate_performance_data(hris_df)
    print("生成学习管理系统数据...")
    learning_df = generate_learning_data(hris_df)
    print("所有数据生成完成！")

生成招聘数据...
生成HRIS数据...
生成绩效管理系统数据...
生成学习管理系统数据...
所有数据生成完成！


In [6]:
import pandas as pd
import mysql.connector
from mysql.connector import Error

# MySQL 连接配置（根据您的环境修改）
config = {
    'user': 'root',  # 替换为您的 MySQL 用户名
    'password': 'Taylor@1989',  # 替换为您的实际 MySQL 密码
    'host': 'localhost',
    'database': 'hris_database',
    'raise_on_warnings': True
}

try:
    # 连接到 MySQL
    conn = mysql.connector.connect(**config)
    cursor = conn.cursor()
    
    # 读取 CSV 文件
    recruitment_df = pd.read_csv('recruitment_data.csv')
    hris_df = pd.read_csv('hris_data.csv')
    performance_df = pd.read_csv('performance_data.csv')
    learning_df = pd.read_csv('learning_data.csv')
    
    # 导入数据到 MySQL
    def insert_df_to_mysql(df, table_name, conn):
        for _, row in df.iterrows():
            columns = ', '.join(row.index)
            placeholders = ', '.join(['%s'] * len(row))
            sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
            # 将 NaN 转换为 None 以插入 NULL
            values = tuple(None if pd.isna(x) else x for x in row)
            cursor.execute(sql, values)
        conn.commit()
    
    # 注意这些行的缩进应与函数定义同级
    insert_df_to_mysql(recruitment_df, 'recruitment_data', conn)
    insert_df_to_mysql(hris_df, 'hris_data', conn)
    insert_df_to_mysql(performance_df, 'performance_data', conn)
    insert_df_to_mysql(learning_df, 'learning_data', conn)
    
    print("数据已成功导入 MySQL 数据库 'hris_database'")
    
except Error as e:
    print(f"错误: {e}")
    
finally:
    if 'conn' in locals() and conn is not None and conn.is_connected():
        cursor.close()
        conn.close()
        print("MySQL 连接已关闭")

数据已成功导入 MySQL 数据库 'hris_database'
MySQL 连接已关闭
