In [2]:
import pandas as pd
import random
import math
import numpy as np
from datetime import datetime, timedelta
from tqdm import tqdm

# 계약 기간 설정 및 분포도 조절
def generate_contract_dates(base_date, end_date):
    """부드러운 분포 조정을 통한 랜덤 계약 날짜 생성."""
    # base_date와 end_date 사이의 날짜 범위 생성
    days = pd.date_range(base_date, end_date)
    date_range = len(days)  # 범위 내의 총 일수

    # 가우시안 함수 기반의 부드러운 분포 생성
    x = np.arange(date_range)
    mean = (date_range / 2) - 30  #계약의 특수성을 고려해 30일 shift
    sigma = date_range / 1.5   # 기울기 조정
    slope_weights = np.exp(-((x - mean) ** 2) / (2 * sigma ** 2))
    slope_weights = (1 / slope_weights)  # 역가우시안 변환으로 양 끝 강조
    slope_weights = slope_weights / slope_weights.sum()  # 정규화

    # 난수에 ±10% 범위 추가
    random_factors = np.array([random.uniform(0.9, 1.1) for _ in range(date_range)])
    slope_weights = slope_weights * random_factors
    slope_weights /= slope_weights.sum()  

    # 주말 가중치 설정
    weekend_weights = np.array([1.30 if day.weekday() >= 5 else 0.70 for day in days])
    weekend_weights /= weekend_weights.sum() 

    # 초기 날짜와 월초의 가중치 설정
    initial_bias_weights = np.ones(date_range)
    initial_bias_weights[:10] = 1.50  # 초기 10일 동안의 가중치
    month_start_weights = np.array([1.20 if day.day <= 7 else 0.80 for day in days])  # 월 초에 대한 조정
    initial_bias_weights *= month_start_weights  
    initial_bias_weights /= initial_bias_weights.sum()  

    # 최종 가중치 계산
    final_weights = slope_weights * weekend_weights * initial_bias_weights
    final_weights /= final_weights.sum()  # 최종 정규화

    # 랜덤하게 날짜 선택하고 Python datetime으로 변환
    selected_day = pd.to_datetime(np.random.choice(days, p=final_weights))

    # 계약 기간을 1~6개월 사이로 설정하며 종료 날짜가 end_date를 초과하지 않도록 조정
    contract_duration = random.randint(30, 180)
    contract_end_date = selected_day + timedelta(days=contract_duration)

    # 종료일이 데이터 종료일을 넘지 않도록 조정
    if contract_end_date > end_date:
        contract_end_date = end_date

    return selected_day, contract_end_date

# 기존 근무 시간을 업데이트하는 함수
def update_schedule(employee_schedule, work_days, work_times):
    """직원의 기존 스케줄에 새로운 근무 시간을 추가."""
    for day, time in zip(work_days, work_times):
        if day not in employee_schedule:
            employee_schedule[day] = []
        employee_schedule[day].append(time)

# 직원별 계약 스케줄 세부 정보 생성 (다중 계약 가능)
def generate_contract_schedule(employee_ids, store_ids, start_month, end_month):
    contract_schedule = {}
    for employee_id in employee_ids:
        num_contracts = random.randint(18, 22)  # 직원 당 계약 생성 개수
        contracts = []
        for _ in range(num_contracts):
            store_id = random.choice(store_ids)
            contract_start, contract_end = generate_contract_dates(start_month, end_month)
            work_days = random.sample(["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"], random.randint(2, 5))
            contracts.append({
                "매장": store_id,
                "시작일": contract_start,
                "종료일": contract_end,
                "근무일": work_days,
            })
        contract_schedule[employee_id] = contracts
    return contract_schedule

# 중복된 시간대 체크 함수
def is_overlapping(new_start, new_end, existing_hours):
    """새로운 근무 시간이 기존 근무 시간과 겹치는지 체크. 1시간 이동 시간 고려."""
    new_start_dt = datetime.strptime(new_start, "%H:%M")
    new_end_dt = datetime.strptime(new_end, "%H:%M")
    for existing_start, existing_end in existing_hours:
        existing_start_dt = datetime.strptime(existing_start, "%H:%M")
        existing_end_dt = datetime.strptime(existing_end, "%H:%M")
        # Ensure a 1-hour transition buffer between shifts
        if not (new_end_dt + timedelta(hours=1) <= existing_start_dt or new_start_dt >= existing_end_dt + timedelta(hours=1)):
            return True
    return False

# 고유 근무 시간 생성 함수
def generate_unique_work_hours(existing_hours, min_duration=3, max_duration=8):
    """기존 시간과 겹치지 않는 랜덤한 시작 및 종료 시간을 생성하는 함수."""
    latest_end = "21:00"
    earliest_start = "09:00"
    
    if existing_hours:
        for existing_start, existing_end in existing_hours:
            existing_start_dt = datetime.strptime(existing_start, "%H:%M")
            latest_end = min(latest_end, (existing_start_dt - timedelta(hours=1)).strftime("%H:%M"))

    end_time = (datetime.strptime(earliest_start, "%H:%M") + timedelta(minutes=random.randint(0, 720))).strftime("%H:%M")
    end_datetime = datetime.strptime(end_time, "%H:%M")
    duration_hours = random.randint(min_duration, max_duration)
    start_datetime = end_datetime - timedelta(hours=duration_hours)

    if start_datetime < datetime.strptime("09:00", "%H:%M"):
        start_datetime = datetime.strptime("09:00", "%H:%M")
        end_datetime = start_datetime + timedelta(hours=duration_hours)
    
    return start_datetime.strftime("%H:%M"), end_datetime.strftime("%H:%M")

# 근무 시간 생성 함수
def generate_work_hours(employee_id, record_date, employee_daily_schedule, employee_ids, max_attempts=100):
    """실질적 스케줄을 생성하는 함수"""
    attempts = 0
    if record_date not in employee_daily_schedule[employee_id]:
        employee_daily_schedule[employee_id][record_date] = []
    
    existing_hours = sorted(employee_daily_schedule[employee_id][record_date], key=lambda x: datetime.strptime(x[0], "%H:%M"))
    
    while attempts < max_attempts:
        start_time, end_time = generate_unique_work_hours(existing_hours)
        if not is_overlapping(start_time, end_time, existing_hours):
            employee_daily_schedule[employee_id][record_date].append((start_time, end_time))
            employee_daily_schedule[employee_id][record_date].sort()
            return start_time, end_time, employee_id, "근무"
        
        attempts += 1

    new_employee_id = random.choice([emp_id for emp_id in employee_ids if emp_id != employee_id])
    if record_date not in employee_daily_schedule[new_employee_id]:
        employee_daily_schedule[new_employee_id][record_date] = []

    attempts = 0
    existing_hours = sorted(employee_daily_schedule[new_employee_id][record_date], key=lambda x: datetime.strptime(x[0], "%H:%M"))
    while attempts < max_attempts:
        start_time, end_time = generate_unique_work_hours(existing_hours)
        if not is_overlapping(start_time, end_time, existing_hours):
            employee_daily_schedule[new_employee_id][record_date].append((start_time, end_time))
            employee_daily_schedule[new_employee_id][record_date].sort()
            return start_time, end_time, new_employee_id, "근무"
        attempts += 1
    
    return None, None, employee_id, "휴식"


# 각 직원의 카테고리별 초기 판매량과 피드백 점수 설정 함수
def assign_initial_feedback_and_sales(employee_info, category_base_feedback):
    """직원별 초기 판매량과 피드백 점수 설정"""
    for employee_id, info in employee_info.items():
        for category in category_base_feedback.keys():
            base_sales = category_base_feedback[category]['기준_판매량']
            base_feedback = category_base_feedback[category]['기준_피드백']
            
            # 기준선에서 ±30% 범위의 난수 발생
            initial_sales = int(base_sales * random.uniform(0.7, 1.3))
            initial_feedback = base_feedback + random.uniform(-0.7, 0.3)
            
            # 직원 정보에 초기 판매량과 피드백 점수 저장
            info['카테고리별_정보'][category]['초기_판매량'] = initial_sales
            info['카테고리별_정보'][category]['초기_피드백_점수'] = round(initial_feedback, 2)


def adjust_sales_based_on_feedback(current_sales, feedback_score, dispatch_count, category):
    """피드백 점수와 파견 횟수를 바탕으로 판매량 조정"""
    # 기본 판매량 범위 설정
    min_sales_base, max_sales_base = category_quantity_ranges[category]
    
    # 파견 횟수에 따른 판매량 증가 비율 계산
    # 파라미터 조정: 100회 시 약 1.15배, 500회 시 약 1.25배 정도의 세팅 값 (로그함수)
    min_increase_factor = 1.1 + 0.2 * np.log1p(dispatch_count / 100)
    max_increase_factor = 1.1 + 0.2 * np.log1p(dispatch_count / 100)
    
    min_sales = min_sales_base * min_increase_factor
    max_sales = max_sales_base * max_increase_factor
    
    # 피드백 점수를 적용하여 조정
    feedback_factor = 1 + 0.2 * feedback_score
    
    # 난수로 판매량을 생성하고 피드백 점수 적용
    random_sales = random.uniform(min_sales, max_sales)
    adjusted_sales = current_sales + int(random_sales * feedback_factor - current_sales)
    
    # 파견 횟수 증가에 따른 판매량 표준편차 감소
    sales_variability_factor = max(0.8, 1 - 0.05 * np.log1p(dispatch_count))  # 표준편차 감소 비율 조정
    adjusted_sales = adjusted_sales * sales_variability_factor
    
    # 상한 설정
    adjusted_sales = min(adjusted_sales, max_sales) 
    
    return max(adjusted_sales, 1)  # 최소 판매량 1 보장

# 피드백 점수 생성
def generate_feedback_score(파견_횟수, 기준_피드백_점수, distance, salary):
    """다양한 요인들을 바탕으로 피드백 점수 조정"""
    # 매장 거리와 급여의 영향을 반영하여 평균값 조정
    distance_penalty = max(0.8, 1 - 0.02 * distance)  # 거리가 멀수록 페널티 적용
    salary_bonus = min(1.2, 1 + 0.00015 * salary)  # 급여가 높을수록 보너스 적용

    # 파견 횟수에 따라 평균값을 조정. 적을 때는 음수, 많을 때는 양수 쪽으로 이동
    scale_factor = 0.15
    mean_shift = (np.log1p(파견_횟수) - 1) * scale_factor * distance_penalty * salary_bonus
    mean = max(-0.6, min(기준_피드백_점수 + mean_shift, 0.5 + 0.5 * (파견_횟수 / 1000)))  # 파견 횟수가 많을수록 양수값 증가

    # 파견 횟수에 따른 표준편차 설정
    min_sigma = 0.2
    max_sigma = 0.5
    sigma = min_sigma + (max_sigma - min_sigma) * (1 - np.exp(-0.03 * 파견_횟수))
    
    # 목표 확률 밀도를 맞추기 위해 수정된 Gaussian 분포 활용
    target_probability = 0.04  # 4% 확률
    high_score_threshold = 1.0

    # 변동성 생성
    variability = np.random.normal(loc=mean, scale=sigma)

    # 1점이 될 확률을 조정하기 위한 변환
    if np.random.rand() < target_probability:
        final_score = high_score_threshold
    else:
        final_score = round(max(-1, min(variability, high_score_threshold - 0.01)), 2)

    return final_score



# 직원-매장 거리 설정 및 변경 함수
def generate_employee_store_distances(employee_ids, store_ids, start_date, end_date):
    """직원과 매장 간의 거리를 설정, 3년 동안 1~2번 정도 랜덤 변경."""
    distances = {}
    distance_change_dates = {}  # 거리 변경 시점을 저장하는 딕셔너리

    for emp_id in employee_ids:
        for store_id in store_ids:
            initial_distance = random.uniform(1, 10)  # 초기 거리 설정 
            distances[(emp_id, store_id)] = [initial_distance]

            # 거리 변경 시점 설정 (3년 동안 1~2회)
            change_dates = sorted(random.sample(pd.date_range(base_date, end_date).tolist(), random.randint(1, 2)))
            distance_change_dates[(emp_id, store_id)] = change_dates

    return distances, distance_change_dates

# 직원 급여 초기 설정 함수
def assign_initial_salaries(employee_info):
    for emp_id, info in employee_info.items():
        # 정규직은 계약직보다 기본 급여가 높게 설정
        base_salary = 2500 if info['상태'] == '정규직' else 1800
        performance_factor = random.uniform(0.8, 1.2)
        info['초기_급여'] = int(base_salary * performance_factor * (1 + 0.1 * random.random()))  # 초기 급여 설정


# 급여 증가 로직
def update_salary(salary, employment_status):
    """매년 초 급여를 소폭 증가시킵니다."""
    increase_rate = random.uniform(0.05, 0.1)  # 5% ~ 10% 사이의 증가율 설정
    if employment_status == '정규직':
        increase_rate += 0.02  # 정규직의 경우 증가율이 조금 더 높음

    return round(salary * (1 + increase_rate), 2)


# 카테고리별 기준선
category_base_feedback = {
    '의류': {'기준_판매량': 75, '기준_피드백': -0.3},
    '화장품': {'기준_판매량': 150, '기준_피드백': -0.3},
    '신발': {'기준_판매량': 35, '기준_피드백': -0.3},
    '액세서리': {'기준_판매량': 100, '기준_피드백': -0.3}
}


# 판매 금액 및 판매량 설정
category_sales_ranges = {
    '의류': (10000, 200000),
    '화장품': (5000, 100000),
    '신발': (30000, 300000),
    '액세서리': (2000, 50000)
}

category_quantity_ranges = {
    '의류': (20, 200),
    '화장품': (50, 500),
    '신발': (20, 100),
    '액세서리': (30, 300)
}


# 계약 스케줄 생성 (다중 계약 포함)
start_month, end_month = 1, 12
base_date = datetime(2024, 1, 1)  # 데이터 시작 날짜
end_date = datetime(2026, 12, 31)  # 데이터 종료 날짜 (예: 3년 간의 데이터 생성)

# 데이터 생성
num_stores = 40
num_employees = 20
categories = ['의류', '화장품', '신발', '액세서리']
store_ids = [f"매장{str(i).zfill(3)}" for i in range(1, num_stores + 1)]
employee_ids = [f"직원{str(i).zfill(3)}" for i in range(1, num_employees + 1)]



# 매장 카테고리와 직원 정보 설정
store_categories = {store_id: random.choice(categories) for store_id in store_ids}

employee_info = {
    employee_id: {
        '성별': random.choice(['남성', '여성']),
        '연령': random.randint(20, 60),
        '상태': random.choice(['정규직', '계약직']),
        '카테고리별_정보': {
            category: {
                '파견_횟수': random.randint(0, 50),
                '기준_판매량': category_base_feedback[category]['기준_판매량'],
                '기준_피드백': category_base_feedback[category]['기준_피드백']
            }
            for category in category_base_feedback.keys()
        }
    }
    for employee_id in employee_ids
}

# 초기값 설정 호출
assign_initial_feedback_and_sales(employee_info, category_base_feedback)

# 계약 날짜 생성
contract_schedule = generate_contract_schedule(employee_ids, store_ids, base_date, end_date)

# 직원-매장 거리 및 변경 시점 생성
employee_store_distances, distance_change_dates = generate_employee_store_distances(employee_ids, store_ids, base_date, end_date)
assign_initial_salaries(employee_info)

final_records = []
employee_daily_schedule = {emp_id: {} for emp_id in employee_ids}
record_id = 1  # 기록 번호 초기화

for employee_id in tqdm(employee_ids, desc="직원별 근무 기록 생성"):
    current_date = base_date
    current_salary = employee_info[employee_id]['초기_급여']  # 초기 급여 설정

    while current_date <= end_date:
        date_str = current_date.strftime("%Y-%m-%d")
        weekday = current_date.strftime("%a")
        contracts = contract_schedule[employee_id]
        shifts_today = []
        is_working = False

        # 매년 초에 급여를 업데이트
        if current_date.month == 1 and current_date.day == 1:
            current_salary = update_salary(current_salary, employee_info[employee_id]['상태'])


        # 각 계약을 순회하며 다중 근무를 허용
        for contract in contracts:
            contract_start = contract["시작일"]
            contract_end = contract["종료일"]

            if contract_start <= current_date <= contract_end:
                is_working = True  # 계약 중임을 표시

                if weekday in contract["근무일"]:
                    # 근무 시간이 가능한지 확인 후 생성
                    start_time, end_time, emp_id, status = generate_work_hours(employee_id, date_str, employee_daily_schedule, employee_ids)
                    if status == "근무" and not is_overlapping(start_time, end_time, shifts_today):
                        store_id = contract["매장"]
                        category = store_categories[store_id]

                        # 파견 횟수 증가
                        employee_info[emp_id]['카테고리별_정보'][category]['파견_횟수'] += 1
                        current_파견_횟수 = employee_info[emp_id]['카테고리별_정보'][category]['파견_횟수']


                        # 거리 변경 시점 체크 및 거리 업데이트
                        if current_date in distance_change_dates[(employee_id, store_id)]:
                            new_distance = random.uniform(1, 10)  # 새로운 거리 설정
                            employee_store_distances[(employee_id, store_id)].append(new_distance)
                        current_distance = employee_store_distances[(employee_id, store_id)][-1]  # 최신 거리 값 사용



                        # 피드백 점수 조정
                        기준_피드백_점수 = employee_info[employee_id]['카테고리별_정보'][category]['기준_피드백']
                        feedback_score = generate_feedback_score(
                            current_파견_횟수,
                            기준_피드백_점수,
                            current_distance,
                            current_salary
                        )

                        # 파견 횟수와 피드백 점수를 바탕으로 판매량 조정 적용
                        employee_sales = adjust_sales_based_on_feedback(
                            employee_info[employee_id]['카테고리별_정보'][category]['기준_판매량'], 
                            feedback_score, 
                            current_파견_횟수, 
                            category
                        )
                        # 매출액 계산
                        employee_revenue = int(round(random.uniform(*category_sales_ranges[category]) * employee_sales))

                        # 레코드 추가
                        final_records.append([
                            record_id, date_str, store_id, category, None, None, 
                            emp_id, status, employee_info[employee_id]['성별'], employee_info[employee_id]['연령'], employee_info[employee_id]['상태'],
                            start_time, end_time, feedback_score, employee_sales, employee_revenue, current_파견_횟수,
                            contract_start, contract_end, current_distance, current_salary
                        ])
                        shifts_today.append((start_time, end_time))
                        record_id += 1

        # 계약 중이지만 근무 기록이 없으면 '휴가중', 계약이 없는 경우 '대기중' 처리
        if not is_working:
            # 계약이 없는 경우
            final_records.append([
                record_id, date_str, None, None, None, None, employee_id, "대기중",
                employee_info[employee_id]['성별'], employee_info[employee_id]['연령'], employee_info[employee_id]['상태'],
                None, None, None, None, None, None, None, None, 0, 0
            ])
            record_id += 1  # 기록 번호 증가
        elif not shifts_today:
            # 계약이 있지만 근무일이 아닌 경우
            final_records.append([
                record_id, date_str, None, None, None, None, employee_id, "휴가중",
                employee_info[employee_id]['성별'], employee_info[employee_id]['연령'], employee_info[employee_id]['상태'],
                None, None, None, None, None, None, None, None, 0, 0
            ])
            record_id += 1  # 기록 번호 증가

        current_date += timedelta(days=1)


# 데이터프레임 생성
df_final = pd.DataFrame(final_records, columns=[
    '기록_ID', '기록_날짜', '매장_ID', '매장_카테고리', '일일_매장_판매량', '일일_매장_매출액',
    '직원_ID', '근무_여부', '성별', '연령', '상태',
    '근무_시작', '근무_종료', '피드백_점수', '판매량', '매출액', '파견횟수',
    '계약_시작일', '계약_종료일', '직원_매장_거리', '직원_급여'
])

# 일일 매장 매출 및 판매량 계산
daily_sales_summary = df_final.groupby(['기록_날짜', '매장_ID']).agg(
    일일_매장_판매량=('판매량', 'sum'),
    일일_매장_매출액=('매출액', 'sum')
).reset_index()

# 매장별 집계 결과를 병합하면서 매장 판매량과 매출액이 누락되지 않도록 함
df_final = df_final.merge(daily_sales_summary, on=['기록_날짜', '매장_ID'], how='left', suffixes=('', '_calculated'))

# 병합된 컬럼 정리
df_final['일일_매장_판매량'] = df_final['일일_매장_판매량_calculated'].fillna(0)
df_final['일일_매장_매출액'] = df_final['일일_매장_매출액_calculated'].fillna(0)
df_final = df_final.drop(columns=['일일_매장_판매량_calculated', '일일_매장_매출액_calculated'], errors='ignore')

# NaN 값을 0으로 대체
df_final['판매량'] = df_final['판매량'].fillna(0)
df_final['매출액'] = df_final['매출액'].fillna(0)
df_final['파견횟수'] = df_final['파견횟수'].fillna(0)
df_final['직원_매장_거리'] = df_final['직원_매장_거리'].fillna(0)

# 판매 비율과 매출 비율 계산
df_final['판매_비율(%)'] = ((df_final['판매량'] / df_final['일일_매장_판매량']).fillna(0) * 100).round(2).astype(str) + '%'
df_final['매출_비율(%)'] = ((df_final['매출액'] / df_final['일일_매장_매출액']).fillna(0) * 100).round(2).astype(str) + '%'

# 최종 결과를 CSV로 저장
df_final.to_csv('final_schedule_data_with_metrics2.csv', index=False, encoding='utf-8-sig')

print("최종 데이터가 'final_schedule_data_with_metrics2.csv'로 저장되었습니다.")



직원별 근무 기록 생성:   0%|          | 0/20 [00:01<?, ?it/s]


KeyboardInterrupt: 

In [4]:
import pandas as pd
import random
import numpy as np
from datetime import datetime, timedelta
from tqdm import tqdm
from collections import defaultdict

# 설정
NUM_STORES = 40
NUM_EMPLOYEES = 20
START_DATE = datetime(2024, 1, 1)
END_DATE = datetime(2026, 12, 31)
CATEGORIES = ['의류', '화장품', '신발', '액세서리']

# 카테고리별 기준 피드백 및 판매량
category_base_feedback = {
    '의류': {'base_sales': 75, 'base_feedback': -0.3},
    '화장품': {'base_sales': 150, 'base_feedback': -0.3},
    '신발': {'base_sales': 35, 'base_feedback': -0.3},
    '액세서리': {'base_sales': 100, 'base_feedback': -0.3}
}

# 판매 금액 및 판매량 범위
category_sales_ranges = {
    '의류': (10000, 200000),
    '화장품': (5000, 100000),
    '신발': (30000, 300000),
    '액세서리': (2000, 50000)
}

category_quantity_ranges = {
    '의류': (20, 200),
    '화장품': (50, 500),
    '신발': (20, 100),
    '액세서리': (30, 300)
}

# 매장 및 직원 ID 생성
store_ids = [f"매장{str(i).zfill(3)}" for i in range(1, NUM_STORES + 1)]
employee_ids = [f"직원{str(i).zfill(3)}" for i in range(1, NUM_EMPLOYEES + 1)]

# 매장에 카테고리 할당
store_categories = {store_id: random.choice(CATEGORIES) for store_id in store_ids}

# 직원 정보 생성
employee_info = {
    employee_id: {
        'gender': random.choice(['남성', '여성']),
        'age': random.randint(20, 60),
        'status': random.choice(['정규직', '계약직']),
        # 근무일을 주말 포함하여 최소 4일에서 최대 6일로 설정
        'workdays': random.sample(['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'], random.randint(4, 6)),
        'category_info': {
            category: {
                'dispatch_count': random.randint(0, 50),
                'base_sales': category_base_feedback[category]['base_sales'],
                'base_feedback': category_base_feedback[category]['base_feedback']
            }
            for category in category_base_feedback.keys()
        }
    }
    for employee_id in employee_ids
}

def assign_initial_feedback_and_sales(employee_info, category_base_feedback):
    """직원별 초기 판매량과 피드백 점수 설정"""
    for employee_id, info in employee_info.items():
        for category in category_base_feedback.keys():
            base_sales = category_base_feedback[category]['base_sales']
            base_feedback = category_base_feedback[category]['base_feedback']

            # ±30% 범위의 난수 생성
            initial_sales = int(base_sales * random.uniform(0.7, 1.3))
            initial_feedback = base_feedback + random.uniform(-0.7, 0.3)

            # 직원 정보에 저장
            info['category_info'][category]['initial_sales'] = initial_sales
            info['category_info'][category]['initial_feedback'] = round(initial_feedback, 2)

assign_initial_feedback_and_sales(employee_info, category_base_feedback)

def generate_unique_shift(overlapping_shifts, min_duration=4, max_duration=8):
    """겹치지 않는 근무 시간 생성"""
    possible_start_hours = list(range(9, 17))  # 09:00부터 17:00까지 시작 가능한 시간
    random.shuffle(possible_start_hours)
    for start_hour in possible_start_hours:
        duration = random.randint(min_duration, max_duration)
        start_time_dt = datetime.strptime(f"{start_hour}:00", "%H:%M")
        end_time_dt = start_time_dt + timedelta(hours=duration)
        if end_time_dt.hour > 21:
            end_time_dt = datetime.strptime("21:00", "%H:%M")
        # 겹치는지 확인
        overlap = False
        for existing_start, existing_end in overlapping_shifts:
            existing_start_dt = datetime.strptime(existing_start, "%H:%M")
            existing_end_dt = datetime.strptime(existing_end, "%H:%M")
            if not (end_time_dt <= existing_start_dt or start_time_dt >= existing_end_dt):
                overlap = True
                break
        if not overlap:
            return start_time_dt.strftime("%H:%M"), end_time_dt.strftime("%H:%M")
    return None, None  # 겹치지 않는 시간대 생성 실패

# 계약 기간 설정 및 분포도 조절 함수 추가
def generate_contract_dates(base_date, end_date):
    """부드러운 분포 조정을 통한 랜덤 계약 날짜 생성."""
    # base_date와 end_date 사이의 날짜 범위 생성
    days = pd.date_range(base_date, end_date)
    date_range = len(days)  # 범위 내의 총 일수

    # 가우시안 함수 기반의 부드러운 분포 생성
    x = np.arange(date_range)
    mean = (date_range / 2)  + 15
    sigma = date_range / 1.5   # 기울기 조정
    slope_weights = np.exp(-((x - mean) ** 2) / (2 * sigma ** 2))
    slope_weights = (1 / slope_weights)  # 역가우시안 변환으로 양 끝 강조
    slope_weights = slope_weights / slope_weights.sum()  # 정규화

    # 난수에 ±10% 범위 추가
    random_factors = np.array([random.uniform(0.9, 1.1) for _ in range(date_range)])
    slope_weights = slope_weights * random_factors
    slope_weights /= slope_weights.sum()  

    # 주말 가중치 설정
    weekend_weights = np.array([1.30 if day.weekday() >= 5 else 0.70 for day in days])
    weekend_weights /= weekend_weights.sum() 

    # 초기 날짜와 월초의 가중치 설정
    initial_bias_weights = np.ones(date_range)
    initial_bias_weights[:5] = 5  # 초기 5일 동안의 가중치
    month_start_weights = np.array([1.4 if day.day <= 7 else 0.80 for day in days])  # 월 초에 대한 조정
    initial_bias_weights *= month_start_weights  
    initial_bias_weights /= initial_bias_weights.sum()  

    # 최종 가중치 계산
    final_weights = slope_weights * weekend_weights * initial_bias_weights
    final_weights /= final_weights.sum()  # 최종 정규화

    # 랜덤하게 날짜 선택하고 Python datetime으로 변환
    selected_day = pd.to_datetime(np.random.choice(days, p=final_weights))

    # 계약 기간을 1~6개월 사이로 설정하며 종료 날짜가 end_date를 초과하지 않도록 조정
    contract_duration = random.randint(30, 180)
    contract_end_date = selected_day + timedelta(days=contract_duration)

    # 종료일이 데이터 종료일을 넘지 않도록 조정
    if contract_end_date > end_date:
        contract_end_date = end_date

    return selected_day, contract_end_date

def generate_contract_schedule(employee_ids, store_ids, start_date, end_date):
    """직원별 계약 스케줄 생성 (분포를 고려하여 계약 날짜 설정 및 최대 두 계약 허용)"""
    contract_schedule = {}
    for employee_id in employee_ids:
        contracts = []
        employee_existing_contracts = []  # 직원의 기존 계약 목록 저장
        date_contract_count = defaultdict(int)  # 계약 수를 날짜별로 추적

        num_contracts = random.randint(16, 20)  # 직원당 계약 수를 늘림
        attempts = 0
        max_attempts = num_contracts * 100  # 최대 시도 횟수 증가

        while len(contracts) < num_contracts and attempts < max_attempts:
            attempts += 1

            # 계약 시작일과 종료일을 분포에 맞게 생성
            contract_start, contract_end = generate_contract_dates(start_date, end_date)

            # 계약 기간 내의 모든 날짜를 생성
            contract_dates = pd.date_range(contract_start, contract_end).to_pydatetime().tolist()

            # 하루에 두 개의 계약을 초과하지 않는지 확인
            overlap_exceeded = False
            for date in contract_dates:
                if date_contract_count[date] > 2:
                    overlap_exceeded = True
                    break
            if overlap_exceeded:
                continue  # 겹침 초과 시 재시도

            # 매장 할당
            store_id = random.choice(store_ids)

            # 직원의 근무일 중에서 계약별로 주 2회에서 5회 근무일 설정
            employee_workdays = employee_info[employee_id]['workdays']
            if len(employee_workdays) < 2:
                # 근무 가능일이 2일 미만인 경우, 전체 근무일 사용
                contract_workdays = employee_workdays
            else:
                contract_workdays = random.sample(employee_workdays, random.randint(2, min(5, len(employee_workdays))))

            # 근무 시간 설정 (기존 계약의 근무 시간과 충돌하지 않도록)
            overlapping_shifts = []
            for existing_contract in employee_existing_contracts:
                # 계약 기간이 겹치는지 확인
                latest_start = max(contract_start, existing_contract['start_date'])
                earliest_end = min(contract_end, existing_contract['end_date'])
                delta = (earliest_end - latest_start).days
                if delta >= 0:
                    overlapping_shifts.append((existing_contract['start_time'], existing_contract['end_time']))

            # 새로운 계약의 근무 시간을 겹치지 않도록 설정
            start_time, end_time = generate_unique_shift(overlapping_shifts)
            if start_time and end_time:
                # 근무 시간 저장
                new_contract = {
                    'store_id': store_id,
                    'start_date': contract_start,
                    'end_date': contract_end,
                    'workdays': contract_workdays,  # 계약별로 설정된 근무일
                    'start_time': start_time,
                    'end_time': end_time
                }
                contracts.append(new_contract)
                employee_existing_contracts.append(new_contract)

                # 계약 기간 내의 각 날짜에 대해 계약 수 증가
                for date in contract_dates:
                    date_contract_count[date] += 1
            else:
                # 겹치지 않는 시간대를 찾지 못한 경우 재시도
                continue

        contract_schedule[employee_id] = contracts
    return contract_schedule

def generate_employee_store_distances(employee_ids, store_ids, start_date, end_date):
    """직원-매장 간 거리 및 변경 시점 생성"""
    distances = {}
    distance_change_dates = {}

    for emp_id in employee_ids:
        for store_id in store_ids:
            initial_distance = random.uniform(1, 10)
            distances[(emp_id, store_id)] = [initial_distance]

            # 거리 변경 시점 (3년 동안 1~2회)
            num_changes = random.randint(1, 2)
            change_dates = sorted(random.sample(pd.date_range(start_date, end_date).tolist(), num_changes))
            distance_change_dates[(emp_id, store_id)] = change_dates

    return distances, distance_change_dates

def assign_initial_salaries(employee_info):
    """직원별 초기 급여 설정"""
    for emp_id, info in employee_info.items():
        base_salary = 2500 if info['status'] == '정규직' else 1800
        performance_factor = random.uniform(0.8, 1.2)
        # 급여 보너스를 강화
        info['initial_salary'] = int(base_salary * performance_factor * (1 + 0.2 * random.random()))

assign_initial_salaries(employee_info)

def update_salary(salary, employment_status):
    """매년 급여 업데이트"""
    increase_rate = random.uniform(0.05, 0.1)
    if employment_status == '정규직':
        increase_rate += 0.02
    return round(salary * (1 + increase_rate), 2)

def generate_feedback_score(dispatch_count, base_feedback_score, distance, salary):
    """베타 분포를 활용하여 피드백 점수를 생성합니다."""
    # 거리와 급여에 따른 조정 값 계산
    distance_penalty = 1 - 0.02 * distance  # 거리 페널티 계산
    distance_penalty = max(0.8, distance_penalty)  # 최소 페널티 적용
    
    # 파견 횟수에 따른 경험치 반영
    experience_factor = np.log1p(dispatch_count)
    
    # 스케일링 팩터 설정
    scaling_factor = 0.5  # 분포의 형태를 조절하는 스케일링 팩터
    
    # 베타 분포 파라미터 설정: alpha와 beta를 유사하게 설정하여 대칭 분포 유지
    alpha = 2 + (experience_factor * scaling_factor)
    beta = 2 + (distance * 0.25 * scaling_factor)
    
    # 베타 분포에서 랜덤한 값 생성
    feedback = np.random.beta(alpha, beta)
    
    # [0,1] 범위를 [0,3]으로 변환 후 [-1.6, 1.4]로 최종 변환
    feedback_score = feedback * 3 - 1.6
    
    # 거리 페널티 적용
    feedback_score *= distance_penalty
    
    # 기존 피드백 점수 20% 반영
    feedback_score = 0.8 * feedback_score + 0.2 * base_feedback_score
    
    # 피드백 점수를 -1에서 1 사이로 제한
    feedback_score = np.clip(feedback_score, -1.0, 1.0)
    
    # 소수점 둘째 자리까지 반올림
    feedback_score = round(feedback_score, 2)
    
    return feedback_score

def adjust_sales_based_on_feedback(base_sales, feedback_score, dispatch_count, category):
    """피드백 점수와 파견 횟수를 바탕으로 판매량 조정"""
    
    # 1. 파견 횟수에 따른 판매량 증가 비율 계산 (시그모이드 함수)
    # 시그모이드 함수의 중심을 100으로 설정하여 파견 횟수가 100일 때 중간값에 도달하도록 함
    f_dispatch = 0.5 / (1 + np.exp(-0.005 * (dispatch_count - 100)))  
    
    # 2. 파견 횟수와 피드백 점수의 영향력 가중치 설정
    A = 0.4  # 파견 횟수의 영향력
    B = 0.3  # 피드백 점수의 영향력
    
    # 3. 파견 횟수에 따른 판매량 증가
    sales_with_dispatch = base_sales * (1 + A * f_dispatch)
    
    # 4. 피드백 점수에 따른 판매량 조정
    # 피드백 점수가 높을수록 판매량이 증가하고, 낮을수록 감소
    sales_with_feedback = sales_with_dispatch * (1 + B * feedback_score)
    
    # 5. 파견 횟수에 따른 추가적 비선형적 판매량 증가
    # 파견 횟수가 많을수록 판매량이 더 많이 증가하도록 조정
    additional_factor = 1 + 0.0005 * dispatch_count  # 파견 횟수가 500일 때 약 1.25배 증가
    sales_with_feedback *= additional_factor
    
    # 6. 최소 판매량과 최대 판매량 설정 (동적으로 증가)
    # 파견 횟수가 증가할수록 min과 max 범위를 넓힘
    # 스케일링 팩터 설정
    min_scaling = 0.001  # 파견 횟수에 따른 최소 판매량 증가 비율
    max_scaling = 0.04    # 파견 횟수에 따른 최대 판매량 증가 비율
    
    # 동적으로 증가하는 절대 범위
    min_sales_absolute = base_sales * (0.5 + min_scaling * dispatch_count)
    max_sales_absolute = base_sales * (5.0 + max_scaling * dispatch_count)
    
    # 상한선 설정 (옵션: 필요에 따라 조정)
    max_min_sales_absolute = base_sales * 1.2  # 예: 최소 판매량 절대 범위의 상한선
    max_max_sales_absolute = base_sales * 15.0 # 예: 최대 판매량 절대 범위의 상한선
    
    # 절대 범위 상한선 적용
    min_sales_absolute = min(min_sales_absolute, max_min_sales_absolute)
    max_sales_absolute = min(max_sales_absolute, max_max_sales_absolute)
    
    # 판매량 조정 결과를 기반으로 상대적인 최소, 최대 범위를 설정
    # sales_with_feedback의 50% ~ 150% 범위로 설정
    min_sales_relative = sales_with_feedback * 0.5  
    max_sales_relative = sales_with_feedback * 1.5  
    
    # 최종 min_sales와 max_sales는 절대 범위와 상대 범위의 교집합으로 설정
    min_sales = max(min_sales_absolute, min_sales_relative)
    max_sales = min(max_sales_absolute, max_sales_relative)
    
    # 7. 판매량을 min_sales와 max_sales 사이의 랜덤 값으로 설정
    adjusted_sales = random.uniform(min_sales, max_sales)
    
    # 8. 판매량을 정수로 변환
    adjusted_sales = int(adjusted_sales)
    
    return adjusted_sales

def generate_unique_work_hours(existing_shifts, min_duration=3, max_duration=8, travel_buffer=1):
    """겹치지 않는 근무 시간 생성 (이동 시간 고려)"""
    max_attempts = 100
    attempts = 0
    while attempts < max_attempts:
        start_hour = random.randint(9, 17)
        duration = random.randint(min_duration, max_duration)
        start_time = datetime.strptime(f"{start_hour}:00", "%H:%M")
        end_time = start_time + timedelta(hours=duration)
        if end_time.hour > 21:
            end_time = datetime.strptime("21:00", "%H:%M")

        # 이동 시간 고려하여 겹치는지 확인
        overlap = False
        for existing_start, existing_end in existing_shifts:
            existing_start_dt = datetime.strptime(existing_start, "%H:%M")
            existing_end_dt = datetime.strptime(existing_end, "%H:%M")
            # 이동 시간 1시간 고려
            if not (end_time + timedelta(hours=travel_buffer) <= existing_start_dt or start_time >= existing_end_dt + timedelta(hours=travel_buffer)):
                overlap = True
                break
        if not overlap:
            return start_time.strftime("%H:%M"), end_time.strftime("%H:%M")
        attempts += 1
    return None, None  # 겹치지 않는 시간대 생성 실패

# 초기 변수 설정
final_records = []
employee_daily_schedule = {emp_id: {} for emp_id in employee_ids}
record_id = 1

# 계약 스케줄 생성 (수정된 함수 사용)
contract_schedule = generate_contract_schedule(employee_ids, store_ids, START_DATE, END_DATE)

# 직원-매장 거리 및 변경 시점 생성
employee_store_distances, distance_change_dates = generate_employee_store_distances(employee_ids, store_ids, START_DATE, END_DATE)

for employee_id in tqdm(employee_ids, desc="직원별 기록 생성"):
    current_date = START_DATE
    current_salary = employee_info[employee_id]['initial_salary']

    while current_date <= END_DATE:
        date_str = current_date.strftime("%Y-%m-%d")
        weekday = current_date.strftime("%a")
        contracts = contract_schedule.get(employee_id, [])

        # 모든 계약 중 현재 날짜가 포함되는 계약 찾기
        active_contracts = [contract for contract in contracts if contract['start_date'] <= current_date <= contract['end_date']]

        # 오늘 근무 가능한 계약들 필터링 (근무일 포함)
        valid_contracts = [contract for contract in active_contracts if weekday in contract['workdays']]

        if active_contracts:
            if valid_contracts:
                status = '근무'
                # 오늘의 스케줄 초기화
                if date_str not in employee_daily_schedule[employee_id]:
                    employee_daily_schedule[employee_id][date_str] = []

                existing_shifts = employee_daily_schedule[employee_id][date_str]

                # 각 계약에 대해 근무 시간은 계약에 저장된 시간을 사용
                for contract in valid_contracts:
                    start_time = contract['start_time']
                    end_time = contract['end_time']

                    # 근무 스케줄 추가
                    employee_daily_schedule[employee_id][date_str].append((start_time, end_time))

                    # 매장 및 카테고리 정보
                    store_id = contract['store_id']
                    category = store_categories[store_id]

                    # 파견 횟수 및 경험치 업데이트
                    employee_info[employee_id]['category_info'][category]['dispatch_count'] += 1
                    dispatch_count = employee_info[employee_id]['category_info'][category]['dispatch_count']

                    # 현재 거리 가져오기
                    if current_date in distance_change_dates.get((employee_id, store_id), []):
                        new_distance = random.uniform(1, 10)
                        employee_store_distances[(employee_id, store_id)].append(new_distance)
                    current_distance = employee_store_distances[(employee_id, store_id)][-1]

                    # 피드백 점수 생성
                    base_feedback = employee_info[employee_id]['category_info'][category]['base_feedback']
                    feedback_score = generate_feedback_score(dispatch_count, base_feedback, current_distance, current_salary)

                    # 판매량 조정
                    current_sales = employee_info[employee_id]['category_info'][category]['base_sales']
                    employee_sales = adjust_sales_based_on_feedback(current_sales, feedback_score, dispatch_count, category)

                    # 매출액 계산
                    unit_price = int(round(random.uniform(*category_sales_ranges[category]) / 100)) * 100
                    employee_revenue = unit_price * employee_sales

                    # 레코드 추가
                    final_records.append([
                        record_id, date_str, store_id, category, None, None,
                        employee_id, status, employee_info[employee_id]['gender'], employee_info[employee_id]['age'], employee_info[employee_id]['status'],
                        start_time, end_time, feedback_score, employee_sales, employee_revenue, dispatch_count,
                        contract['start_date'], contract['end_date'], current_distance, current_salary
                    ])
                    record_id += 1
            else:
                status = '휴식중'
                # 휴식 중인 날의 레코드 추가
                final_records.append([
                    record_id, date_str, None, None, None, None, employee_id, status,
                    employee_info[employee_id]['gender'], employee_info[employee_id]['age'], employee_info[employee_id]['status'],
                    None, None, None, None, None, None, None, None, 0, current_salary
                ])
                record_id += 1
        else:
            status = '대기중'
            # 대기 중인 날의 레코드 추가
            final_records.append([
                record_id, date_str, None, None, None, None, employee_id, status,
                employee_info[employee_id]['gender'], employee_info[employee_id]['age'], employee_info[employee_id]['status'],
                None, None, None, None, None, None, None, None, 0, current_salary
            ])
            record_id += 1

        # 매년 급여 업데이트
        if current_date.month == 1 and current_date.day == 1 and current_date != START_DATE:
            current_salary = update_salary(current_salary, employee_info[employee_id]['status'])

        current_date += timedelta(days=1)

# 데이터프레임 생성
columns = [
    '기록_ID', '기록_날짜', '매장_ID', '매장_카테고리', '일일_매장_판매량', '일일_매장_매출액',
    '직원_ID', '근무_여부', '성별', '연령', '상태',
    '근무_시작', '근무_종료', '피드백_점수', '판매량', '매출액', '파견횟수',
    '계약_시작일', '계약_종료일', '직원_매장_거리', '직원_급여'
]

df_final = pd.DataFrame(final_records, columns=columns)

# 일일 매장 판매량 및 매출액 계산
daily_sales_summary = df_final.groupby(['기록_날짜', '매장_ID']).agg(
    일일_매장_판매량=('판매량', 'sum'),
    일일_매장_매출액=('매출액', 'sum')
).reset_index()

# df_final에 병합
df_final = df_final.merge(daily_sales_summary, on=['기록_날짜', '매장_ID'], how='left', suffixes=('', '_calculated'))
df_final['일일_매장_판매량'] = df_final['일일_매장_판매량_calculated'].fillna(0)
df_final['일일_매장_매출액'] = df_final['일일_매장_매출액_calculated'].fillna(0)
df_final = df_final.drop(columns=['일일_매장_판매량_calculated', '일일_매장_매출액_calculated'], errors='ignore')

# NaN 처리 및 데이터 타입 변환
df_final['판매량'] = df_final['판매량'].fillna(0).astype(int)
df_final['매출액'] = df_final['매출액'].fillna(0).astype(int)
df_final['파견횟수'] = df_final['파견횟수'].fillna(0).astype(int)
df_final['직원_매장_거리'] = df_final['직원_매장_거리'].fillna(0)
df_final['일일_매장_판매량'] = df_final['일일_매장_판매량'].fillna(0).astype(int)
df_final['일일_매장_매출액'] = df_final['일일_매장_매출액'].fillna(0).astype(int)

# 판매 비율 및 매출 비율 계산
df_final['판매_비율(%)'] = np.where(
    df_final['일일_매장_판매량'] != 0,
    ((df_final['판매량'] / df_final['일일_매장_판매량']) * 100).round(2).astype(str) + '%',
    '0%'
)
df_final['매출_비율(%)'] = np.where(
    df_final['일일_매장_매출액'] != 0,
    ((df_final['매출액'] / df_final['일일_매장_매출액']) * 100).round(2).astype(str) + '%',
    '0%'
)

# 데이터 무결성 검사
assert (df_final['판매량'] >= 0).all(), "판매량에 음수가 존재합니다."
assert (df_final['매출액'] >= 0).all(), "매출액에 음수가 존재합니다."

# CSV로 저장
df_final.to_csv('final_schedule_data_with_metrics2.csv', index=False, encoding='utf-8-sig')

print("최종 데이터가 'final_schedule_data_with_metrics2.csv'로 저장되었습니다.")


직원별 기록 생성: 100%|██████████| 20/20 [00:01<00:00, 13.97it/s]


최종 데이터가 'final_schedule_data_with_metrics2.csv'로 저장되었습니다.
