In [None]:
import pandas as pd
import numpy as np
import os
import glob
from datetime import datetime
import gc
from tqdm import tqdm
import psutil
import warnings
warnings.filterwarnings('ignore')

print("="*70)
print("한국전력공사 LP 데이터 배치 처리 시스템")
print("="*70)

# ============================================================================
# 1. 시스템 환경 체크 및 설정
# ============================================================================

def check_system_resources():
    """시스템 리소스 확인"""
    
    memory = psutil.virtual_memory()
    available_gb = memory.available / (1024**3)
    total_gb = memory.total / (1024**3)
    
    print(f"시스템 리소스 체크:")
    print(f"  전체 메모리: {total_gb:.1f} GB")
    print(f"  사용 가능: {available_gb:.1f} GB ({memory.percent:.1f}% 사용중)")
    
    # 배치 크기 권장
    if available_gb > 8:
        recommended_batch = 50
    elif available_gb > 4:
        recommended_batch = 30
    else:
        recommended_batch = 20
    
    print(f"  권장 배치 크기: {recommended_batch}개 파일")
    
    return recommended_batch

# ============================================================================
# 2. 배치 처리 설정
# ============================================================================

class BatchProcessor:
    def __init__(self, data_dir, customer_df, batch_size=30):
        self.data_dir = data_dir
        self.customer_df = customer_df
        self.batch_size = batch_size
        self.processed_batches = 0
        self.total_records = 0
        
        # 결과 저장
        self.batch_results = []
        self.customer_aggregates = {}
        self.daily_summaries = []
        
        print(f"배치 처리기 초기화:")
        print(f"  데이터 디렉토리: {data_dir}")
        print(f"  배치 크기: {batch_size}개 파일")
        print(f"  대상 고객: {len(customer_df):,}명")
    
    def scan_files(self):
        """LP 데이터 파일 스캔"""
        
        file_pattern = os.path.join(self.data_dir, "processed_LPData_*.csv")
        self.csv_files = glob.glob(file_pattern)
        self.csv_files.sort()
        
        print(f"파일 스캔 결과:")
        print(f"  발견된 파일: {len(self.csv_files):,}개")
        
        if len(self.csv_files) > 0:
            print(f"  첫 파일: {os.path.basename(self.csv_files[0])}")
            print(f"  마지막: {os.path.basename(self.csv_files[-1])}")
            
            # 샘플 파일 크기 확인
            try:
                sample_size = len(pd.read_csv(self.csv_files[0]))
                estimated_total = sample_size * len(self.csv_files)
                print(f"  예상 총 레코드: {estimated_total:,}개")
            except Exception as e:
                print(f"  샘플 파일 읽기 실패: {e}")
        
        return len(self.csv_files) > 0
    
    def process_single_batch(self, batch_files, batch_num, total_batches):
        """단일 배치 처리"""
        
        print(f"배치 {batch_num}/{total_batches} 처리 시작... (파일 {len(batch_files)}개)")
        
        batch_data_list = []
        batch_stats = {
            'batch_num': batch_num,
            'files_processed': 0,
            'files_failed': 0,
            'total_records': 0,
            'processing_time': None,
            'memory_usage': None
        }
        
        start_time = datetime.now()
        
        # 파일별 로딩
        for file_path in tqdm(batch_files, desc=f"배치 {batch_num} 로딩"):
            try:
                df = pd.read_csv(file_path)
                
                if len(df) == 0:
                    continue
                
                # 필수 컬럼 확인
                required_cols = ['대체고객번호', 'LP 수신일자', '순방향 유효전력']
                if not all(col in df.columns for col in required_cols):
                    continue
                
                # 유효 고객만 필터링
                valid_customers = set(self.customer_df['고객번호'])
                df = df[df['대체고객번호'].isin(valid_customers)]
                
                if len(df) > 0:
                    # 파일 날짜 정보 추가
                    filename = os.path.basename(file_path)
                    try:
                        date_part = filename.split('_')[2]  # YYYYMMDD
                        df['file_date'] = date_part
                    except:
                        df['file_date'] = 'unknown'
                    
                    batch_data_list.append(df)
                    batch_stats['files_processed'] += 1
                    batch_stats['total_records'] += len(df)
                
            except Exception as e:
                print(f"  파일 처리 실패: {os.path.basename(file_path)} - {e}")
                batch_stats['files_failed'] += 1
        
        # 배치 데이터 결합 및 처리
        if batch_data_list:
            batch_combined = pd.concat(batch_data_list, ignore_index=True)
            
            # 배치별 분석 수행
            batch_analysis = self.analyze_batch_data(batch_combined, batch_num)
            
            # 고객별 누적 통계 업데이트
            self.update_customer_aggregates(batch_combined)
            
            # 일별 요약 업데이트
            self.update_daily_summaries(batch_combined)
            
            # 메모리 정리
            del batch_data_list, batch_combined
            gc.collect()
            
            # 배치 완료 정보
            end_time = datetime.now()
            batch_stats['processing_time'] = (end_time - start_time).total_seconds()
            batch_stats['memory_usage'] = psutil.virtual_memory().percent
            
            self.batch_results.append({**batch_stats, **batch_analysis})
            self.processed_batches += 1
            self.total_records += batch_stats['total_records']
            
            print(f"  배치 {batch_num} 완료:")
            print(f"    처리: {batch_stats['files_processed']}개 파일")
            print(f"    레코드: {batch_stats['total_records']:,}개")
            print(f"    소요시간: {batch_stats['processing_time']:.1f}초")
            print(f"    메모리: {batch_stats['memory_usage']:.1f}%")
            
        else:
            print(f"  배치 {batch_num}: 처리할 유효 데이터 없음")
        
        return batch_stats
    
    def analyze_batch_data(self, df, batch_num):
        """배치 데이터 분석"""
        
        # 날짜/시간 파싱
        df['LP 수신일자'] = pd.to_datetime(df['LP 수신일자'])
        df['날짜'] = df['LP 수신일자'].dt.date
        df['시간'] = df['LP 수신일자'].dt.hour
        df['요일'] = df['LP 수신일자'].dt.dayofweek
        
        analysis = {
            'unique_customers': df['대체고객번호'].nunique(),
            'unique_dates': df['날짜'].nunique(),
            'avg_power': df['순방향 유효전력'].mean(),
            'max_power': df['순방향 유효전력'].max(),
            'std_power': df['순방향 유효전력'].std(),
            'total_consumption': df['순방향 유효전력'].sum() * 0.25,  # 15분->시간 변환
        }
        
        # 시간대별 평균
        hourly_avg = df.groupby('시간')['순방향 유효전력'].mean()
        analysis['peak_hour'] = hourly_avg.idxmax()
        analysis['peak_avg_power'] = hourly_avg.max()
        analysis['off_peak_avg_power'] = hourly_avg.min()
        
        return analysis
    
    def update_customer_aggregates(self, df):
        """고객별 누적 통계 업데이트"""
        
        customer_stats = df.groupby('대체고객번호').agg({
            '순방향 유효전력': ['count', 'sum', 'mean', 'std', 'max'],
            '지상무효': ['mean'],
            '진상무효': ['mean'],
            '피상전력': ['mean']
        }).round(3)
        
        # 컬럼명 정리
        customer_stats.columns = [
            'record_count', 'power_sum', 'power_mean', 'power_std', 'power_max',
            'reactive_lag_mean', 'reactive_lead_mean', 'apparent_power_mean'
        ]
        
        for customer_id in customer_stats.index:
            if customer_id not in self.customer_aggregates:
                self.customer_aggregates[customer_id] = {
                    'total_records': 0,
                    'total_consumption': 0,
                    'max_power': 0,
                    'power_sum_sq': 0,
                }
            
            # 누적 업데이트
            agg = self.customer_aggregates[customer_id]
            new_stats = customer_stats.loc[customer_id]
            
            agg['total_records'] += int(new_stats['record_count'])
            agg['total_consumption'] += new_stats['power_sum'] * 0.25
            agg['max_power'] = max(agg['max_power'], new_stats['power_max'])
            
            # 최종 통계 계산용 임시 저장
            agg['latest_mean'] = new_stats['power_mean']
            agg['latest_std'] = new_stats['power_std']
    
    def update_daily_summaries(self, df):
        """일별 요약 업데이트"""
        
        df['날짜'] = df['LP 수신일자'].dt.date
        
        daily_stats = df.groupby('날짜').agg({
            '대체고객번호': 'nunique',
            '순방향 유효전력': ['count', 'sum', 'mean', 'max'],
            'LP 수신일자': 'count'
        }).round(2)
        
        daily_stats.columns = [
            'unique_customers', 'record_count', 'total_power', 'avg_power', 'max_power', 'total_records'
        ]
        
        for date in daily_stats.index:
            summary = {
                'date': date,
                'customers': int(daily_stats.loc[date, 'unique_customers']),
                'records': int(daily_stats.loc[date, 'record_count']),
                'total_consumption': daily_stats.loc[date, 'total_power'] * 0.25,
                'avg_power': daily_stats.loc[date, 'avg_power'],
                'max_power': daily_stats.loc[date, 'max_power']
            }
            self.daily_summaries.append(summary)
    
    def save_intermediate_results(self, batch_num):
        """중간 결과 저장"""
        
        try:
            # 배치 결과 저장
            batch_df = pd.DataFrame(self.batch_results)
            batch_df.to_csv(f'batch_results_{batch_num:03d}.csv', index=False)
            
            # 고객 누적 통계 저장
            if self.customer_aggregates:
                customer_df = pd.DataFrame.from_dict(self.customer_aggregates, orient='index')
                customer_df.to_csv(f'customer_aggregates_{batch_num:03d}.csv')
            
            # 일별 요약 저장
            if self.daily_summaries:
                daily_df = pd.DataFrame(self.daily_summaries)
                daily_df.to_csv(f'daily_summaries_{batch_num:03d}.csv', index=False)
            
            print(f"  중간 결과 저장 완료 (배치 {batch_num})")
            
        except Exception as e:
            print(f"  중간 결과 저장 실패: {e}")
    
    def run_batch_processing(self, save_interval=5):
        """배치 처리 실행"""
        
        if not self.scan_files():
            print("처리할 파일이 없습니다.")
            return False
        
        total_files = len(self.csv_files)
        total_batches = (total_files + self.batch_size - 1) // self.batch_size
        
        print(f"배치 처리 시작:")
        print(f"  총 파일: {total_files:,}개")
        print(f"  배치 크기: {self.batch_size}개")
        print(f"  총 배치: {total_batches}개")
        print(f"  중간 저장: {save_interval}배치마다")
        
        # 배치별 처리
        for i in range(0, total_files, self.batch_size):
            batch_files = self.csv_files[i:i+self.batch_size]
            batch_num = i // self.batch_size + 1
            
            # 배치 처리
            batch_stats = self.process_single_batch(batch_files, batch_num, total_batches)
            
            # 중간 저장
            if batch_num % save_interval == 0:
                self.save_intermediate_results(batch_num)
                
                # 메모리 정리
                gc.collect()
                
                # 진행 상황 요약
                self.print_progress_summary(batch_num, total_batches)
        
        # 최종 결과 저장
        self.save_final_results()
        
        print(f"배치 처리 완료!")
        print(f"  처리된 배치: {self.processed_batches}개")
        print(f"  총 레코드: {self.total_records:,}개")
        
        return True
    
    def print_progress_summary(self, current_batch, total_batches):
        """진행 상황 요약 출력"""
        
        progress_pct = (current_batch / total_batches) * 100
        
        print(f"진행 상황 요약 (배치 {current_batch}/{total_batches}, {progress_pct:.1f}%):")
        print(f"  누적 레코드: {self.total_records:,}개")
        print(f"  고유 고객: {len(self.customer_aggregates):,}명")
        print(f"  일별 요약: {len(self.daily_summaries):,}일")
        
        # 메모리 사용량
        memory_pct = psutil.virtual_memory().percent
        print(f"  메모리 사용: {memory_pct:.1f}%")
        
        if memory_pct > 80:
            print("  메모리 사용량 높음 - 가비지 컬렉션 실행")
            gc.collect()
    
    def save_final_results(self):
        """최종 결과 저장"""
        
        print(f"최종 결과 저장 중...")
        
        try:
            # 1. 배치 처리 결과
            if self.batch_results:
                batch_final = pd.DataFrame(self.batch_results)
                batch_final.to_csv('final_batch_results.csv', index=False)
                print(f"  배치 결과 저장: final_batch_results.csv")
            
            # 2. 고객별 최종 통계
            if self.customer_aggregates:
                customer_final = pd.DataFrame.from_dict(self.customer_aggregates, orient='index')
                
                # 최종 통계 계산
                customer_final['avg_power'] = customer_final.apply(
                    lambda x: x['total_consumption'] / (x['total_records'] * 0.25) if x['total_records'] > 0 else 0, 
                    axis=1
                )
                
                customer_final.to_csv('final_customer_statistics.csv')
                print(f"  고객 통계 저장: final_customer_statistics.csv")
            
            # 3. 일별 요약
            if self.daily_summaries:
                daily_final = pd.DataFrame(self.daily_summaries)
                daily_final = daily_final.groupby('date').agg({
                    'customers': 'max',
                    'records': 'sum',
                    'total_consumption': 'sum',
                    'avg_power': 'mean',
                    'max_power': 'max'
                }).round(2)
                
                daily_final.to_csv('final_daily_summaries.csv')
                print(f"  일별 요약 저장: final_daily_summaries.csv")
            
        except Exception as e:
            print(f"  최종 결과 저장 실패: {e}")

# ============================================================================
# 메인 실행 함수
# ============================================================================

def main_batch_processing():
    """메인 배치 처리 실행"""
    
    # 시스템 리소스 체크
    recommended_batch = check_system_resources()
    
    # 고객 데이터 로딩
    print(f"고객 데이터 로딩...")
    try:
        # customer_df = pd.read_excel('제13회 산업부 공모전 대상고객.xlsx')
        # 임시 샘플 데이터 (실제 환경에서는 위 코드 사용)
        customer_df = pd.DataFrame({
            '순번': range(1, 3001),
            '고객번호': [f'CUST_{i:04d}' for i in range(1, 3001)],
            '계약전력': np.random.choice([100, 200, 300, 500, 700, 900], 3000),
            '계약종별': np.random.choice([222, 226, 311, 322, 726], 3000),
            '사용용도': np.random.choice(['02', '09'], 3000)
        })
        print(f"  고객 데이터 로딩 완료: {len(customer_df):,}명")
        
    except Exception as e:
        print(f"  고객 데이터 로딩 실패: {e}")
        return
    
    # LP 데이터 디렉토리 설정
    data_directory = "/path/to/lp/data"  # 실제 경로로 변경
    
    # 배치 처리기 초기화
    processor = BatchProcessor(
        data_dir=data_directory,
        customer_df=customer_df,
        batch_size=recommended_batch
    )
    
    # 배치 처리 실행
    success = processor.run_batch_processing(save_interval=5)
    
    if success:
        print(f"배치 처리 성공!")
        print(f"다음 단계: 변동계수 계산 및 분석")
    else:
        print(f"배치 처리 실패")

# 실행 예시
if __name__ == "__main__":
    main_batch_processing()

print("\n" + "="*70)
print("배치 처리 완료 후 생성되는 파일들:")
print("  - final_batch_results.csv: 배치별 처리 결과")
print("  - final_customer_statistics.csv: 고객별 누적 통계") 
print("  - final_daily_summaries.csv: 일별 요약 통계")
print("  - batch_results_XXX.csv: 중간 저장 파일들")
print("="*70)