In [1]:
# 정상 범위 기반 이상치 탐지 함수
# SEMI_PHOTO_SENSORS 데이터 분석

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

class RangeBasedAnomalyDetector:
    """정상 범위 기반 이상치 탐지 클래스"""
    
    def __init__(self, normal_ranges=None):
        """
        초기화
        
        Parameters:
        - normal_ranges: 정상 범위 딕셔너리 (선택사항)
        """
        self.normal_ranges = normal_ranges or {
            'EXPOSURE_DOSE': (20, 40),
            'FOCUS_POSITION': (-50, 50),
            'STAGE_TEMP': (22.9, 23.1),
            'HUMIDITY': (40, 50),
            'ALIGNMENT_ERROR_X': (0, 3),
            'ALIGNMENT_ERROR_Y': (0, 3),
            'LENS_ABERRATION': (0, 5),
            'ILLUMINATION_UNIFORMITY': (98, 100),
            'RETICLE_TEMP': (22.95, 23.05)
        }
    
    def detect_anomalies(self, df):
        """
        정상 범위를 벗어나는 이상치를 탐지
        
        Parameters:
        - df: 데이터프레임
        
        Returns:
        - anomaly_details: 이상치가 발견된 모든 행의 상세 정보
        - summary: 요약 정보
        """
        anomaly_details = []
        summary = {}
        
        # 각 행을 검사
        for idx, row in df.iterrows():
            row_anomalies = []
            
            # 각 파라미터에 대해 정상 범위 체크
            for param, (min_val, max_val) in self.normal_ranges.items():
                if param in row:
                    value = row[param]
                    if pd.notna(value) and (value < min_val or value > max_val):
                        row_anomalies.append({
                            'parameter': param,
                            'value': value,
                            'normal_min': min_val,
                            'normal_max': max_val,
                            'deviation': min(abs(value - min_val), abs(value - max_val))
                        })
            
            # 이상치가 발견된 행이면 상세 정보 저장
            if row_anomalies:
                anomaly_info = {
                    'row_index': idx,
                    'pno': row.get('PNO', 'N/A'),
                    'equipment_id': row.get('EQUIPMENT_ID', 'N/A'),
                    'lot_no': row.get('LOT_NO', 'N/A'),
                    'wafer_id': row.get('WAFER_ID', 'N/A'),
                    'timestamp': row.get('TIMESTAMP', 'N/A'),
                    'anomalous_parameters': [item['parameter'] for item in row_anomalies],
                    'anomaly_count': len(row_anomalies),
                    'anomaly_details': row_anomalies,
                    'full_row_data': row.to_dict()
                }
                anomaly_details.append(anomaly_info)
        
        # 요약 정보 생성
        for param in self.normal_ranges.keys():
            param_anomalies = [detail for detail in anomaly_details 
                              if param in detail['anomalous_parameters']]
            summary[param] = {
                'anomaly_count': len(param_anomalies),
                'percentage': (len(param_anomalies) / len(df)) * 100 if len(df) > 0 else 0
            }
        
        return anomaly_details, summary
    
    def analyze_by_equipment(self, df, anomaly_details):
        """장비별 이상치 분석"""
        equipment_analysis = {}
        
        for anomaly in anomaly_details:
            equipment = anomaly['equipment_id']
            if equipment not in equipment_analysis:
                equipment_analysis[equipment] = {
                    'total_anomalies': 0,
                    'anomalous_measurements': 0,
                    'parameters': {}
                }
            
            equipment_analysis[equipment]['anomalous_measurements'] += 1
            equipment_analysis[equipment]['total_anomalies'] += anomaly['anomaly_count']
            
            for param in anomaly['anomalous_parameters']:
                if param not in equipment_analysis[equipment]['parameters']:
                    equipment_analysis[equipment]['parameters'][param] = 0
                equipment_analysis[equipment]['parameters'][param] += 1
        
        # 총 측정 수 추가
        for equipment in equipment_analysis.keys():
            if 'EQUIPMENT_ID' in df.columns:
                total_measurements = len(df[df['EQUIPMENT_ID'] == equipment])
                equipment_analysis[equipment]['total_measurements'] = total_measurements
                equipment_analysis[equipment]['anomaly_rate'] = (
                    equipment_analysis[equipment]['anomalous_measurements'] / total_measurements * 100
                )
        
        return equipment_analysis
    
    def save_results(self, anomaly_details, filename='anomaly_results.csv'):
        """이상치 탐지 결과를 CSV 파일로 저장"""
        if not anomaly_details:
            print("No anomaly data to save.")
            return False
        
        rows = []
        for anomaly in anomaly_details:
            base_info = {
                'row_index': anomaly['row_index'],
                'pno': anomaly['pno'],
                'equipment_id': anomaly['equipment_id'],
                'lot_no': anomaly['lot_no'],
                'wafer_id': anomaly['wafer_id'],
                'timestamp': anomaly['timestamp'],
                'anomaly_count': anomaly['anomaly_count'],
                'anomalous_parameters': ', '.join(anomaly['anomalous_parameters'])
            }
            base_info.update(anomaly['full_row_data'])
            rows.append(base_info)
        
        results_df = pd.DataFrame(rows)
        results_df.to_csv(filename, index=False, encoding='utf-8')
        print(f"Anomaly results saved: {filename}")
        return True
    
    def visualize_anomalies(self, summary, equipment_analysis=None, save_plot=True):
        """이상치 시각화"""
        param_counts = {param: info['anomaly_count'] for param, info in summary.items() 
                       if info['anomaly_count'] > 0}
        
        if not param_counts:
            print("시각화할 이상치가 없습니다.")
            return
        
        if equipment_analysis:
            fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        else:
            fig, ax1 = plt.subplots(1, 1, figsize=(10, 6))
        
        # 파라미터별 이상치 개수
        ax1.bar(param_counts.keys(), param_counts.values())
        ax1.set_title('파라미터별 이상치 개수')
        ax1.set_ylabel('이상치 개수')
        ax1.tick_params(axis='x', rotation=45)
        
        # 장비별 이상치 개수
        if equipment_analysis:
            equipment_counts = {eq: analysis['anomalous_measurements'] 
                              for eq, analysis in equipment_analysis.items()}
            if equipment_counts:
                ax2.bar(equipment_counts.keys(), equipment_counts.values())
                ax2.set_title('장비별 이상 측정 개수')
                ax2.set_ylabel('이상 측정 개수')
        
        plt.tight_layout()
        
        if save_plot:
            plt.savefig('range_based_anomalies.png', dpi=300, bbox_inches='tight')
            print("시각화 저장됨: range_based_anomalies.png")
        
        plt.show()


def detect_range_based_anomalies(file_path, 
                                 normal_ranges=None, 
                                 start_time=None, 
                                 end_time=None,
                                 verbose=True,
                                 save_results=False,
                                 visualize=False,
                                 output_filename=None):
    """
    정상 범위 기반 이상치 탐지 메인 함수
    
    Parameters:
    - file_path: CSV 파일 경로
    - normal_ranges: 정상 범위 딕셔너리 (선택사항)
    - start_time: 시작 시간 (선택사항)
    - end_time: 종료 시간 (선택사항)  
    - verbose: 상세 출력 여부
    - save_results: 결과 저장 여부
    - visualize: 시각화 여부
    - output_filename: 출력 파일명
    
    Returns:
    - result: 분석 결과 딕셔너리
    """
    
    try:
        # 1. 데이터 로딩
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        df = pd.read_csv(file_path)
        
        if verbose:
            print("=" * 80)
            print("Range-Based Anomaly Detection System")
            print("=" * 80)
            print(f"File loaded successfully: {file_path}")
            print(f"Data shape: {df.shape}")
        
        # TIMESTAMP 컬럼 처리
        if 'TIMESTAMP' in df.columns:
            df['TIMESTAMP'] = pd.to_datetime(df['TIMESTAMP'])
            
            if verbose:
                print(f"Data period: {df['TIMESTAMP'].min()} ~ {df['TIMESTAMP'].max()}")
        
        # 시간 필터링
        if start_time or end_time:
            if 'TIMESTAMP' not in df.columns:
                print("Warning: No TIMESTAMP column found, skipping time filtering.")
            else:
                if start_time:
                    start_time = pd.to_datetime(start_time)
                    df = df[df['TIMESTAMP'] >= start_time]
                if end_time:
                    end_time = pd.to_datetime(end_time)
                    df = df[df['TIMESTAMP'] <= end_time]
                
                if len(df) == 0:
                    return {"message": "No data found in the specified time range.", "anomalies": []}
                
                if verbose:
                    print(f"Data after time filtering: {df.shape[0]} rows")
        
        # 2. 이상치 탐지 실행
        detector = RangeBasedAnomalyDetector(normal_ranges)
        anomaly_details, summary = detector.detect_anomalies(df)
        
        # 3. 장비별 분석
        equipment_analysis = None
        if 'EQUIPMENT_ID' in df.columns:
            equipment_analysis = detector.analyze_by_equipment(df, anomaly_details)
        
        # 4. 결과 출력
        if verbose:
            print(f"\nTotal {len(anomaly_details)} rows with anomalies detected")
            print(f"{(len(anomaly_details)/len(df)*100):.2f}% of total data")
            
            # 파라미터별 요약
            print("\nAnomaly Summary by Parameter:")
            has_anomalies = False
            for param, info in summary.items():
                if info['anomaly_count'] > 0:
                    print(f"  {param}: {info['anomaly_count']} cases ({info['percentage']:.1f}%)")
                    has_anomalies = True
            
            if not has_anomalies:
                print("  All parameters are within normal ranges.")
            
            # 상세 이상치 정보 (처음 5개만)
            if anomaly_details:
                print(f"\nDetailed Anomaly Information (First 5):")
                for i, anomaly in enumerate(anomaly_details[:5], 1):
                    print(f"\n[Anomaly {i}]")
                    print(f"  Row Index: {anomaly['row_index']}")
                    print(f"  PNO: {anomaly['pno']}")
                    print(f"  Equipment ID: {anomaly['equipment_id']}")
                    print(f"  Measurement Time: {anomaly['timestamp']}")
                    print(f"  Anomalous Parameters: {', '.join(anomaly['anomalous_parameters'])}")
                    
                    for detail in anomaly['anomaly_details']:
                        print(f"    - {detail['parameter']}: {detail['value']:.3f} "
                              f"(Normal Range: {detail['normal_min']} ~ {detail['normal_max']})")
                
                if len(anomaly_details) > 5:
                    print(f"\n... and {len(anomaly_details) - 5} more")
            
            # 장비별 분석
            if equipment_analysis:
                print(f"\nAnomaly Analysis by Equipment:")
                for equipment, analysis in equipment_analysis.items():
                    print(f"  {equipment}:")
                    print(f"    Total Measurements: {analysis['total_measurements']}")
                    print(f"    Anomalous Measurements: {analysis['anomalous_measurements']} "
                          f"({analysis['anomaly_rate']:.1f}%)")
                    
                    if analysis['parameters']:
                        top_param = max(analysis['parameters'], key=analysis['parameters'].get)
                        print(f"    Main Anomalous Parameter: {top_param} ({analysis['parameters'][top_param]} cases)")
        
        # 5. 결과 저장
        if save_results:
            filename = output_filename or 'range_based_anomaly_results.csv'
            detector.save_results(anomaly_details, filename)
        
        # 6. 시각화
        if visualize:
            # 전체 데이터 시각화
            if verbose:
                print("\nGenerating overall data visualization...")
            detector.visualize_overall_data(df, save_plot=True)
            
            # 이상치 시각화 (이상치가 있을 때만)
            if anomaly_details:
                if verbose:
                    print("Generating anomaly visualization...")
                detector.visualize_anomalies(summary, equipment_analysis)
        
        # 7. 결과 반환
        result = {
            'total_rows': len(df),
            'anomaly_count': len(anomaly_details),
            'anomaly_rate': (len(anomaly_details) / len(df)) * 100 if len(df) > 0 else 0,
            'anomalies': anomaly_details,
            'summary': summary,
            'equipment_analysis': equipment_analysis,
            'normal_ranges': detector.normal_ranges
        }
        
        if verbose:
            print(f"\n{'='*80}")
            print("Analysis Complete!")
            print(f"{'='*80}")
        
        return result
        
    except Exception as e:
        error_msg = f"Error occurred: {str(e)}"
        if verbose:
            print(error_msg)
        return {"error": error_msg, "anomalies": []}


def detect_anomalies_from_dataframe(df, 
                                   normal_ranges=None,
                                   start_time=None,
                                   end_time=None,
                                   verbose=False):
    """
    DataFrame에서 직접 이상치 탐지
    
    Parameters:
    - df: 데이터프레임
    - normal_ranges: 정상 범위 딕셔너리
    - start_time: 시작 시간
    - end_time: 종료 시간
    - verbose: 상세 출력 여부
    
    Returns:
    - anomaly_details: 이상치 목록
    - summary: 요약 정보
    """
    
    try:
        # 시간 필터링
        filtered_df = df.copy()
        if (start_time or end_time) and 'TIMESTAMP' in df.columns:
            df['TIMESTAMP'] = pd.to_datetime(df['TIMESTAMP'])
            if start_time:
                start_time = pd.to_datetime(start_time)
                filtered_df = filtered_df[filtered_df['TIMESTAMP'] >= start_time]
            if end_time:
                end_time = pd.to_datetime(end_time)
                filtered_df = filtered_df[filtered_df['TIMESTAMP'] <= end_time]
        
        # 이상치 탐지
        detector = RangeBasedAnomalyDetector(normal_ranges)
        anomaly_details, summary = detector.detect_anomalies(filtered_df)
        
        if verbose:
            print(f"Total {len(anomaly_details)} anomalies detected")
            for param, info in summary.items():
                if info['anomaly_count'] > 0:
                    print(f"  {param}: {info['anomaly_count']} cases")
        
        return anomaly_details, summary
        
    except Exception as e:
        if verbose:
            print(f"Error occurred: {str(e)}")
        return [], {}


# ===================================================================
# 사용 예시 및 메인 실행부
# ===================================================================

if __name__ == "__main__":
    # 기본 설정
    CSV_FILE_PATH = '/home/minjoo/PRISM-Monitor/prism_monitor/data/Industrial_DB_sample/SEMI_PHOTO_SENSORS.csv'
    
    # 사용자 정의 정상 범위 (선택사항)
    custom_ranges = {
        'EXPOSURE_DOSE': (20, 40),
        'FOCUS_POSITION': (-50, 50),
        'STAGE_TEMP': (22.9, 23.1),
        'HUMIDITY': (40, 50),
        'ALIGNMENT_ERROR_X': (0, 3),
        'ALIGNMENT_ERROR_Y': (0, 3),
        'LENS_ABERRATION': (0, 5),
        'ILLUMINATION_UNIFORMITY': (98, 100),
        'RETICLE_TEMP': (22.95, 23.05)
    }
    
    # 메인 함수 실행
    print("Executing range-based anomaly detection...")
    
    result = detect_range_based_anomalies(
        file_path=CSV_FILE_PATH,
        normal_ranges=custom_ranges,
        start_time=None,  # '2024-01-15 08:00:00'
        end_time=None,    # '2024-01-16 09:00:00'
        verbose=True,
        save_results=True,
        visualize=True,
        output_filename='detected_anomalies.csv'
    )
    
    # 결과 확인
    if 'error' not in result:
        print(f"\nFinal Results:")
        print(f"  Total Data: {result['total_rows']} rows")
        print(f"  Anomalies: {result['anomaly_count']} rows ({result['anomaly_rate']:.2f}%)")
        
        # 이상치가 있으면 간단한 요약 출력
        if result['anomalies']:
            print(f"\nDetected Anomaly Row Indices: {[a['row_index'] for a in result['anomalies'][:10]]}")
            print(f"Main Anomalous Parameters: {set().union(*[a['anomalous_parameters'] for a in result['anomalies']])}")
    else:
        print(f"Error: {result['error']}")

Executing range-based anomaly detection...
Range-Based Anomaly Detection System
File loaded successfully: /home/minjoo/PRISM-Monitor/prism_monitor/data/Industrial_DB_sample/SEMI_PHOTO_SENSORS.csv
Data shape: (35, 15)
Data period: 2024-01-15 08:30:15 ~ 2024-02-02 08:46:05

Total 4 rows with anomalies detected
11.43% of total data

Anomaly Summary by Parameter:
  ILLUMINATION_UNIFORMITY: 4 cases (11.4%)
  RETICLE_TEMP: 2 cases (5.7%)

Detailed Anomaly Information (First 5):

[Anomaly 1]
  Row Index: 23
  PNO: PS024
  Equipment ID: PHO_003
  Measurement Time: 2024-01-23 08:15:20
  Anomalous Parameters: ILLUMINATION_UNIFORMITY, RETICLE_TEMP
    - ILLUMINATION_UNIFORMITY: 97.800 (Normal Range: 98 ~ 100)
    - RETICLE_TEMP: 23.060 (Normal Range: 22.95 ~ 23.05)

[Anomaly 2]
  Row Index: 24
  PNO: PS025
  Equipment ID: PHO_003
  Measurement Time: 2024-01-23 08:16:05
  Anomalous Parameters: ILLUMINATION_UNIFORMITY
    - ILLUMINATION_UNIFORMITY: 97.900 (Normal Range: 98 ~ 100)

[Anomaly 3]
  Row