In [None]:

import configparser
import json
import os
import pandas as pd
import requests
import sqlite3
import datetime
from bs4 import BeautifulSoup
from tabulate import tabulate

from concurrent.futures import ThreadPoolExecutor, as_completed
import numpy as np

# 로그 기록 시작 함수
def log_started():
    with open('etl_project_log.txt', 'a') as log_file:
        log_file.write("\n" + "="*50 + "\n")
        timestamp = datetime.datetime.now().strftime('%Y-%B-%d-%H-%M-%S')
        log_file.write(f"New execution at {timestamp}")
        log_file.write("\n" + "="*50 + "\n")

# 로그 기록 함수
def log_message(message, level="INFO"):
    timestamp = datetime.datetime.now().strftime('%Y-%B-%d-%H-%M-%S')
    with open('etl_project_log.txt', 'a') as log_file:
        log_file.write(f"{timestamp} - {level} - {message}\n")

# 설정 파일 읽기
def load_config(config_path='config.ini'):
    if not os.path.exists(config_path):
        log_message(f"Configuration file '{config_path}' not found.", level="ERROR")
        raise FileNotFoundError((f"Configuration file '{config_path}' not found."))
    
    config = configparser.ConfigParser()
    config.read(config_path)
    
    if 'DEFAULT' not in config or 'URL' not in config['DEFAULT'] or 'TABLE_CLASS' not in config['DEFAULT']:
        log_message("Invalid or missing configuration values in 'config.ini'.", level="ERROR")
        raise ValueError("Invalid or missing configuration values in 'config.ini'.")
    
    return config['DEFAULT']['URL'], config['DEFAULT']['TABLE_CLASS']
    

# Save
def save_gdp_data(df, output_csv_file='extracted_gdp_data.csv', output_json_file='extracted_gdp_data.json'):
    log_message("Saving Extracted Data")
    try:
        df.to_csv(output_csv_file, index=False)
        df.to_json(output_json_file, orient='records', force_ascii=False, indent=4)
        log_message(f"Data saved: CSV ({output_csv_file}), JSON ({output_json_file})")
    except Exception as e:
        log_message(f"Failed to save data: {str(e)}", level="ERROR")
        raise
    
def extract_gdp_data(url, table_class):
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status() # HTTP 응답 상태 코드를 확인. 200번대가 아닌 경우(예: 404, 500 등) HTTPError 예외를 발생시킴
        soup = BeautifulSoup(response.text, 'html.parser')
        table = soup.find('table', {'class': table_class})
        
        if table is None:
            log_message("No table found with the specified class.", level="ERROR")
            raise ValueError("Failed to locate the GDP table on the webpage.")
            
        df = pd.read_html(str(table))[0]  # 위키피디아에서 제공하는 표를 Pandas로 읽고 객체를 문자열로 변환
        
        df = df.iloc[:, [0, 1, 2]]  # 필요한 칼럼만 선택 (모든 행과 0, 1, 2번째 열을 선택)
        df.columns = ['Country', 'GDP (Nominal)', 'Year']
        
        df = df.dropna(subset=['Country', 'GDP (Nominal)']) # NaN 데이터 제거 
        df['GDP (B USD)'] = ( # GDP 값 정리 및 변환
            df['GDP (Nominal)']
            .str.replace(r'[^\d.]', '', regex=True)  # 숫자와 소수점 이외 제거
            .replace('', '0')  # 빈 문자열을 '0'으로 대체
            .astype(float)  # float으로 변환
            / 1e3  # 단위를 B USD로 변환
        )
        df['Year'] = df['Year'].str.replace(r'\[.*?\]', '', regex=True) # 각주 제거 (sup 이런 게 자꾸 따라와서..)
        df = df[['Country', 'GDP (B USD)', 'Year']]
        
        return df
        
    except Exception as e:
        log_message(f"Error during data extraction: {str(e)}", level="ERROR")
        raise


# Transform
def transform_gdp_data(df):
    log_message("Starting Data Transmission")
    try:
        log_message("Starting Data Transformation in parallel")

        # Region 데이터를 미리 로드
        with open('country_region_table.json', 'r', encoding='utf-8') as region_file:
            region_data = json.load(region_file)

        def transform_xchunk(chunk):
            # GDP 정렬 및 반올림
            chunk = chunk.sort_values(by='GDP (B USD)', ascending=False)
            chunk['GDP (B USD)'] = chunk['GDP (B USD)'].round(2)
            
            # Region 데이터를 연결
            chunk['Region'] = chunk['Country'].map(region_data)
            return chunk

        # 데이터프레임 분할 및 병렬 처리
        num_partitions = 5
        chunks = np.array_split(df, num_partitions)
        transformed_chunks = []
        with ThreadPoolExecutor() as executor:
            futures = [executor.submit(transform_chunk, chunk) for chunk in chunks]
            for future in as_completed(futures):
                transformed_chunks.append(future.result())
        
        # 결과 병합
        transformed_data = pd.concat(transformed_chunks)
        return transformed_data
        
    except Exception as e:
        log_message(f"Error during data transformation: {str(e)}", level="ERROR")
        raise



def load_gdp_data(df):
    log_message("Loading data into SQLite database")
    try:
        # SQLite 데이터베이스에 연결
        conn = sqlite3.connect('World_Economies.db')
        
        df[['Country', 'GDP (B USD)', 'Year', 'Region']].rename(
            columns={'GDP (B USD)': 'GDP_USD_billion'}
        ).to_sql( # 데이터프레임 데이터를 SQL 테이블로 변환하여 데이터베이스에 저장하는 pandas 메서드입
            'Countries_by_GDP', conn, if_exists='replace', index=False
        )
        
        conn.close()
        log_message("Data successfully loaded into SQLite database")
        
        
    except Exception as e:
        log_message(f"Error while loading data into SQLite database: {str(e)}", level="ERROR")
        raise



# GDP가 100B USD 이상인 국가 필터링
def filtered_100USD(df):
    filtered_100 = df[df['GDP (B USD)'] >= 100]
    print("Countries with a GDP of over 100B USD")
    print(filtered_100)
    return filtered_100


# Region별 상위 5개 국가의 GDP 평균 계산
def region_top5_calculate(df):
    region_top5_avg = (
        df.groupby('Region')
        .apply(lambda x: x.nlargest(5, 'GDP (B USD)')['GDP (B USD)'].mean())
        .reset_index(name='Top 5 Avg GDP (B USD)')
    )
    print("Average GDP of top 5 countries by region")
    print(region_top5_avg)
    return region_top5_avg


# 추가 요구사항 구현
def display_countries_with_gdp_over_100():
    log_message("Displaying countries with GDP over 100B USD")
    try:
        conn = sqlite3.connect('World_Economies.db')
        query = "SELECT Country, GDP_USD_billion FROM Countries_by_GDP WHERE GDP_USD_billion >= 100"
        result = pd.read_sql_query(query, conn)
        conn.close()
        
        print("Countries with GDP >= 100B USD:")
        print(tabulate(result, headers='keys', tablefmt='pretty', showindex=False))
        
    except Exception as e:
        log_message(f"Error querying database for countries with GDP >= 100B: {str(e)}", level="ERROR")
        raise
    
# Region별 상위 5개 국가의 GDP 평균 계산 및 출력
def display_region_top5_average_gdp():
    log_message("Calculating average GDP of top 5 countries by region")
    try:
        conn = sqlite3.connect('World_Economies.db')
        query = """
        WITH RankedCountries AS (
            SELECT Country, GDP_USD_billion, Region,
                   RANK() OVER (PARTITION BY Region ORDER BY GDP_USD_billion DESC) AS Rank
            FROM Countries_by_GDP
            WHERE Region IS NOT NULL
        )
        SELECT Region, AVG(GDP_USD_billion) AS Avg_Top5_GDP
        FROM RankedCountries
        WHERE Rank <= 5
        GROUP BY Region
        """
        result = pd.read_sql_query(query, conn)
        conn.close()
        
        print("Average GDP of top 5 countries by region (excluding None):")
        print(tabulate(result, headers='keys', tablefmt='pretty', showindex=False))
        
    except Exception as e:
        log_message(f"Error querying database for top 5 average GDP: {str(e)}", level="ERROR")
        raise


def etl_process():
    try:
        # 시작 시간 기록
        start_time = datetime.datetime.now()
        
        log_started()
        log_message("ETL Process Started")
        
        # 설정 로드
        url, table_class = load_config()
        
        # Extract
        extracted_data = extract_gdp_data(url, table_class)
        
        # Save Extracted Data
        save_gdp_data(extracted_data)
        
        # Transform
        transformed_data = transform_gdp_data(extracted_data)
        
        # Save Transformed Data
        save_gdp_data(transformed_data, 'transformed_gdp_data.csv', 'transformed_gdp_data.json')
        
        # Load into SQLite Database
        load_gdp_data(transformed_data)

        # Additional Analyses
        display_region_top5_average_gdp()
        display_countries_with_gdp_over_100()


        log_message("ETL Process Completed Successfully")
        
        # 종료 시간 기록 및 소요 시간 계산
        end_time = datetime.datetime.now()
        elapsed_time = end_time - start_time
        
        # 소요 시간 로그에 기록 및 출력
        log_message(f"ETL Process Duration: {elapsed_time}")
    
    except Exception as e:
        log_message(f"ETL Process Failed: {str(e)}", level="ERROR")
        
        
if __name__ == "__main__":
    etl_process()