In [None]:
import os
import boto3
from datetime import datetime, timezone, timedelta
from sqlalchemy import create_engine, MetaData, Table, select, text
# from airflow import DAG
from datetime import datetime, timedelta
# from airflow.models import Variable
# from airflow.operators.python_operator import PythonOperator
import logging
import pymysql
import pandas as pd
# from sqlalchemy import create_engine, MetaData, Table, select, text, insert, delete, truncate

# AWS 계정의 인증 정보 설정
aws_access_key_id='***'
aws_secret_access_key='***'
region_name = 'ap-northeast-2'

def check_foreign_key_constraints(df, engine, target_table_name):
    metadata = MetaData(bind=engine)
    target_table = Table(target_table_name, metadata, autoload_with=engine, autoload=True)
    
    # 외래키 제약조건 확인
    foreign_keys = target_table.foreign_keys
    if not foreign_keys:
        return True  
    
    for fk in foreign_keys:
        referenced_table = fk.column.table  # 참조되는 테이블
        referenced_column = fk.column.name  # 참조되는 컬럼
        
        # 참조되는 테이블의 모든 데이터를 가져옵니다.
        referenced_data = pd.read_sql_table(referenced_table.name, engine)
        
        # DataFrame의 해당 컬럼과 참조되는 테이블의 컬럼을 비교하여 외래키 제약을 확인합니다.
        if not df[fk.parent.name].isin(referenced_data[referenced_column]).all():
            return False  # 제약조건 위반 발견
    
    return True  # 모든 외래키 제약조건을 충족

def list_files(bucket_name, prefix):
    s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name)
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    files = [file['Key'] for file in response['Contents'] if file['Key'].endswith('.csv')]
    return files

def load_file_to_rds(s3_file_path, db_connection_string, table_name):
    try:
        # S3 파일을 pandas DataFrame으로 읽기
        df = pd.read_csv(s3_file_path, on_bad_lines='skip')
        
        # SQLAlchemy 엔진 생성
        engine = create_engine(db_connection_string)

        # 외래키 제약조건 검사
        # if not check_foreign_key_constraints(df, engine, table_name):
        #     raise ValueError("외래키 제약조건 위반 데이터가 있습니다.")
        
        # DataFrame을 RDS에 적재
        df.to_sql(table_name, engine, if_exists='append', index=False)
    except Exception as e:
        logging.error(f"IntegrityError 발생: {e}. 파일: {s3_file_path} - 데이터를 적재하지 못했습니다.")

def upload_csvdata_to_rds(schema):

    rds_engine = get_RDS_engine()
    metadata_rds_con = MetaData(bind=rds_engine, schema=schema)
    
    bucket_name = 'de-4-1-glue-test'
    s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name)
    
    target_table = Table('restaurant_restaurantinfo', metadata_rds_con, autoload_with=rds_engine, autoload=True)
    select_all_stmt = select([target_table])
    existing_data = rds_engine.execute(select_all_stmt)

bucket_name = 'de-4-1-glue-test'
prefix = 'backend/restaurant_restaurantinfo'
db_connection_string = 'mysql://admin:de-4-1-mysql@de-4-1-mysql.***.ap-northeast-2.rds.amazonaws.com:3306/de41mysql'
table_name = 'restaurant_restaurantinfo'

files = list_files(bucket_name, prefix)

for file in files:
    s3_file_path = f"s3://{bucket_name}/{file}"
    load_file_to_rds(s3_file_path, db_connection_string, table_name)
    print(f"{file} has been loaded to RDS.")
