In [None]:
import pandas as pd
import logging
import os
import awswrangler as wr
import redshift_connector

# --- File and Table ---
FILENAME='customer.csv'
EXCEL_PATH = 'AWSCloud/Adventure Works on AWs/Entities used in Dashboard/DimCustomer.csv'
TARGET_TABLE = 'dimCustomer'
STAGING_TABLE = 'customer'

# --- S3 and Redshift config ---
S3_BUCKET = 'bucket'
S3_PATH = f's3://{S3_BUCKET}/{FILENAME}'
AWS_REGION = 'eu-north-1'
IAM_ROLE = 'XXX'  # Role with S3 read access

# --- Setup logging ---
logging.basicConfig(filename='person.log',
                    level=logging.INFO,
                    #format='%(asctime)s %(levelname)s %(lineno)d : %(message)s')
                    format='%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s')

# --- Redshift credentials ---
REDSHIFT_CONFIG = {
    'database': 'dev',
    'user': 'admin',
    'password': 'XXXXXX',
    'host': 'XXXXX',
    'port': 'XXXX' 
}


# --- Connect to Redshift ---
def get_redshift_connection():
    try:
        conn = redshift_connector.connect(**REDSHIFT_CONFIG)
        conn.autocommit = False
        return conn
    except Exception as e:
        logging.error(f"Redshift connection failed: {e}")
        raise

# --- Load Excel & drop duplicates ---
def load_and_clean_excel(path):
    try:
        df = pd.read_csv(path,encoding="cp1252")
        logging.info(f"Loaded Excel with {len(df)} rows.")
        
        df.drop_duplicates(subset=['CustomerKey'], keep='last', inplace=True)
        logging.info(f"Removed duplicates, remaining rows: {len(df)}.")
        
        return df
    except Exception as e:
        logging.error(f"Excel load/clean failed: {e}")
        raise

# --- Upload DataFrame to S3 and load into Redshift using COPY ---
def upload_to_staging(conn, df):
    try:
        # Upload CSV to S3 using awswrangler
        wr.s3.to_csv(df,S3_PATH,index=False)
        
        logging.info(f"CSV written to S3 at {S3_PATH}")

        # Run COPY command in Redshift
        cursor = conn.cursor()
        cursor.execute(f"DELETE FROM {STAGING_TABLE}")

        copy_sql = f"""
        COPY {STAGING_TABLE} (CustomerKey,FirstName,Gender,GeographyKey,MaritalStatus,Title)        
        FROM '{S3_PATH}'
        IAM_ROLE '{IAM_ROLE}'
        REGION '{AWS_REGION}'
        FORMAT AS CSV
        TIMEFORMAT AS 'MM/DD/YYYY HH:MI'
        ENCODING UTF8
        IGNOREHEADER 1
        EMPTYASNULL
        BLANKSASNULL        
        """
        cursor.execute(copy_sql)
        logging.info("COPY from S3 to staging table completed.")
    except Exception as e:
        logging.error(f"Failed in upload_to_staging: {e}")
        raise                   
           
       
# --- Merge logic to prevent duplicates ---
def merge_data(conn):
    cursor = conn.cursor()
    try:
        merge_sql = f"""
        MERGE INTO {TARGET_TABLE} 
        USING {STAGING_TABLE} 
        ON {TARGET_TABLE}.CustomerKey = {STAGING_TABLE}.CustomerKey
        WHEN MATCHED THEN
            UPDATE SET
                    
                FirstName={STAGING_TABLE}.FirstName,
                Gender={STAGING_TABLE}.Gender  ,
                GeographyKey={STAGING_TABLE}.GeographyKey,
                MaritalStatus={STAGING_TABLE}.MaritalStatus,
                Title={STAGING_TABLE}.Title                
        WHEN NOT MATCHED THEN
            INSERT (CustomerKey,FirstName,Gender,GeographyKey,MaritalStatus,Title)
            VALUES ({STAGING_TABLE}.CustomerKey,{STAGING_TABLE}.FirstName ,{STAGING_TABLE}.Gender,{STAGING_TABLE}.GeographyKey,{STAGING_TABLE}.MaritalStatus,{STAGING_TABLE}.Title);
        """
        cursor.execute(merge_sql)
        logging.info("Merge completed successfully.")
    except Exception as e:
        logging.error(f"Merge failed: {e}")
        raise

# --- Main pipeline ---
def main():
    try:
        print(os.getcwd())
        df = load_and_clean_excel(EXCEL_PATH)
        conn = get_redshift_connection()
        upload_to_staging(conn,df)
        merge_data(conn)
        conn.commit()
        logging.info("Pipeline finished successfully.")
    except Exception as e:
        if 'conn' in locals():
            conn.rollback()
            logging.error("Rolled back due to failure.")
        logging.error(f"Pipeline failed: {e}")
    finally:
        if 'conn' in locals():
            conn.close()

if __name__ == "__main__":
    main()
