<a href="https://colab.research.google.com/github/Boom-Ba/Files_Process/blob/main/File_Transfer_py.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import boto3
import json
from boto3 import client
import s3fs
from boto3.s3.transfer import Transferconfig_tb_name
import sys
from io import StringIO
import pandas as pd
import datetime
import time
from datetime import date, datetime, timezone
import psycopg2
from pyspark import SparkContext
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions

"""
import the connect library for psycopg2
import the error handling libraries for psycopg2
"""
from psycopg2 import OperationalError, errorcodes, errors
import psycopg2.extras as extras

#env='TST'/'PPD'

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
logger = glueContext.get_logger()

# # get Glue job run id
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'env'])
job_run_id = args['JOB_RUN_ID']
env = args['env']
env = env.upper()

"""
set log level default ERROR
"""
log_level = "INFO"
if('--log_level' in sys.argv):
    args_log = getResolvedOptions(sys.argv, ['log_level'])
    log_level = args_log['log_level']

class Glue_Job_Example:

    def __init__(self):
        self.s3 = boto3.client('s3')
        self.to_process=[]
        self.host_name = self.get_ssm_parameter(f"/random-{env}/config_tb_name/v1")
        self.database_name = self.get_ssm_parameter(f"/random-{env}/config_tb_name/v2")
        self.db_user_name = self.get_ssm_parameter(f"/random-{env}/config_tb_name/v3")
        self.db_password = self.get_ssm_parameter(f"/random-{env}/config_tb_name/v4")
        self.s3_bucket_name = self.get_ssm_parameter(f'/random-{env}/config_tb_name/v5')
        
        self.time_milliseconds = str(time.time() * 1000)

    def get_ssm_parameter(self, parameter_name):
        """
        get param from param store
        """
        ssm_client = boto3.client("ssm","us-east-1")
        try:
            parameter = ssm_client.get_parameter(Name=parameter_name,WithDecryption=True)
            return parameter['Parameter']['Value']
        except ssm_client.exceptions.ParameterNotFound:
            # print(logging.error(f"didn't work for {parameter_name}")) 
            error_message = f"didn't work for {parameter_name}"
            logger.error(error_message)
    
    """
    Create data lineage JSON & Insertion 
    """
    def get_data_lineage_crumbs(self, file_name):
        DESTINATION_PREFIX = 'parent_folder/processed' 
        archive_file = "s3://" + self.s3_bucket_name +"/" + DESTINATION_PREFIX + "/" + file_name
        time_crumb = self.time_milliseconds
        job_name = f"random-{env}-db_name-data-pipeline-glue-job"
        try:
            conn = psycopg2.connect(host=self.host_name , database=self.database_name ,user=self.db_user_name , password=self.db_password )
            data_linaege_crumb = f"""{{"crumb_source": "{archive_file}","crumb_source_key": "NA","crumb_ts": "{time_crumb}","job_name": "{job_name}","job_run_id": "{job_run_id}"}}"""
            sql1 = "UPDATE db_name.t1 set data_lineage_crumbs='" + data_linaege_crumb + "'"
            logger.info(sql1)
            sql2= "UPDATE db_name.random_table set data_lineage_crumbs='" + data_linaege_crumb + "'"
            cur = conn.cursor()
            cur.execute(sql1)
            insert_counts=cur.rowcount
            logger.info('INSERTION DATA_LINEAGE_CRUMBS: t1 ' + str(insert_counts))
            cur.execute(sql2)
            insert_counts=cur.rowcount
            logger.info('INSERTION DATA_LINEAGE_CRUMBS: random_table ' + str(insert_counts))
            conn.commit()
            cur.close()
        except Exception as error:
            raise error
        finally:
            if conn is not None:
                conn.close()
    """
    Deduplication logic happens on main
    """
    def delete_old_records_in_main(self):
        try:
            query1="""DELETE FROM db_name.t1 WHERE CAST(date as DATE) in (select CAST(date as DATE) from db_name.t1_stg)"""
            query2="""DELETE FROM db_name.random_table WHERE CAST(conversion_date as DATE) in (select CAST(conversion_date as DATE) from db_name.random_table_stg)"""
            conn = psycopg2.connect(host=self.host_name , database=self.database_name ,user=self.db_user_name , password=self.db_password )
            cur = conn.cursor()
            cur.execute(query1)
            delete_counts=cur.rowcount
            logger.info('OVERWRITING RECORDS IN MAIN: t1 '+ str(delete_counts))
            cur.execute(query2)
            delete_counts=cur.rowcount
            logger.info('OVERWRITING RECORDS IN MAIN: random_table ' + str(delete_counts))
            conn.commit()
            cur.close()
        except Exception as error:
            raise error
        finally:
            if conn is not None:
                conn.close()

    def truncate_stg(self):
        try:
            conn = psycopg2.connect(host=self.host_name , database=self.database_name ,user=self.db_user_name , password=self.db_password )
            cur = conn.cursor()
            sql5 = """DELETE FROM db_name.t1_stg"""
            sql6 = """DELETE FROM db_name.random_table_stg"""
            cur.execute(sql5)
            logger.info('TRUNCATING STG: t1 ' + str(cur.rowcount))
            cur.execute(sql6)
            trunc_counts= cur.rowcount
            logger.info('TRUNCATING STG: random_table ' + str(trunc_counts))
            conn.commit()
            conn.close()
            logger.info('Close connection')
        except Exception as error:
            raise error
        finally:
            if conn is not None:
                conn.close()

    def order_files_by_date_ascending(self):
        ordered_file_list =sorted(self.to_process, key = lambda x: x.split('_')[-1], reverse=False)
        return ordered_file_list

    def create_s3_processed(self):
        parent_path = self.ftp_directory_path.split("/")[-1]
        s3_processed= parent_path +'processed'
        return s3_processed

    def upload_file(self):
        s3 = boto3.client('s3')
        result = s3.list_objects(Bucket = self.s3_bucket_name, Prefix='/parent_folder/to_process')
        for o in result.get('Contents'):
            data = s3.get_object(Bucket=self.s3_bucket_name, Key=o.get('Key'))
            contents = data['Body'].read()
            logger.info(contents.decode("utf-8"))

    #get to_process s3
    def get_to_process_file(self):
        self.s3 = boto3.client('s3')
        for key in self.s3.list_objects(Bucket=self.s3_bucket_name)['Contents']:
            if list(key['Key'].split('/')[:2])==['parent_folder', 'to_process']:
                self.to_process.append( list(key['Key'].split('/'))[-1])
        return self.to_process 
    
    """"
    Archive file from source(to_process) to dest(processed) after data load into RPT
    """
    def archive_file(self):
        try:
            s3_client = boto3.client('s3')
            SOURCE_BUCKET = self.s3_bucket_name
            SOURCE_PREFIX = 'parent_folder/to_process' 
            DESTINATION_BUCKET =  self.s3_bucket_name
            DESTINATION_PREFIX = 'parent_folder/processed' 
            # List objects in source directory
            bucket_listing = s3_client.list_objects_v2(Bucket=SOURCE_BUCKET,Prefix=f'{SOURCE_PREFIX}/')
            for object in bucket_listing['Contents']:
                logger.info('\n Copying from ' + {object['Key']} ,' to ' + DESTINATION_PREFIX + object['Key'][len(SOURCE_PREFIX):])
                s3_client.copy_object(
                    CopySource = {'Bucket': SOURCE_BUCKET, 'Key': object['Key']},
                    Bucket = DESTINATION_BUCKET,
                    Key = DESTINATION_PREFIX + object['Key'][len(SOURCE_PREFIX):] # Remove source prefix, add destination prefix
                    )
            
        except Exception as error:
            logger.info('Nothing to Archive: ')
     
    """
    Clear To_process s3 bucket after file archive
    """
    def clear_to_process_s3(self):
        try:
            s3_client = boto3.client('s3')
            # SOURCE_BUCKET = 'random-ppd-cr-artifacts'
            SOURCE_BUCKET = self.s3_bucket_name
            SOURCE_PREFIX = 'parent_folder/to_process' 
            bucket_listing = s3_client.list_objects(Bucket=SOURCE_BUCKET,Prefix=f'{SOURCE_PREFIX}/')
            for obj in bucket_listing['Contents']:
                s3_client.delete_object(Bucket=SOURCE_BUCKET, Key=obj['Key'])
                logger.info('\n FILE: ' + obj['Key'] + ' HAS BEEN CLEARED...')
        except Exception as error:
            logger.info('To Process folder has been cleared: Empty Contents') 


    #read files main
    def read_s3to_aurora(self):
        #check file type by substring
        file_substring1='t1'
        file_substring2='random_table'
        # Get all files from to_process folder
        self.to_process=self.get_to_process_file()
        # Process the older files first
        self.to_process=self.order_files_by_date_ascending()
        logger.info('ORDERED FILE LIST:')
        for file in self.to_process:
            logger.info(file)
            file_name=file
            if file_substring1 in file_name:
                self.load_csv(file_name)
            elif file_substring2 in file_name:
                self.load_gz(file_name)
            # checking incorrect files
            else:
                logger.info('INCORRECT FILE NAME')
                logger.info(file_name)
                continue
            self.delete_old_records_in_main()
            self.load_from_stg_to_main()
            self.get_data_lineage_crumbs(file_name)
            self.truncate_stg()
    
    def load_gz(self, file_name):
        substring2='random_table'
        s3_path ='s3://' + self.s3_bucket_name + '/parent_folder/to_process/'+file_name
        # read csv files in chunk
        chunksize = 10 ** 6
        for df in pd.read_csv(s3_path, chunksize=chunksize):
            # The code to process df_chunk goes here
            if substring2 in file_name:      
                table_name='db_name.random_table_stg'
                df_columns = list(df.columns)
                columns = ",".join(df_columns)
                # create VALUES('%s', '%s",...) one '%s' per column
                values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) 
                insert_stmt = "INSERT INTO {} ({}) {}".format(table_name,columns,values)
                conn = psycopg2.connect(host=self.host_name , database=self.database_name ,user=self.db_user_name , password=self.db_password )
                cur=conn.cursor()
                psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
                conn.commit()
                cur.close()
        
    def load_csv(self,file_name):
        s3_path ='s3://' + self.s3_bucket_name + '/parent_folder/to_process/'+file_name
        substring1='t1'
        # read csv files in chunk
        chunksize = 10 ** 6
        for df in pd.read_csv(s3_path, chunksize=chunksize):
            if len(df) > 0:
                if substring1 in file_name:
                    table_name='db_name.t1_stg'
                    df_columns = list(df.columns)
                    columns = ",".join(df_columns)
                    # create VALUES('%s', '%s",...) one '%s' per column
                    values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) 
                    insert_stmt = "INSERT INTO {} ({}) {}".format(table_name,columns,values)
                    conn = psycopg2.connect(host=self.host_name , database=self.database_name ,user=self.db_user_name , password=self.db_password )
                    cur=conn.cursor()
                    psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
                    conn.commit()
                    cur.close()
                    logger.info('LOAD CSV JOB DONE')
            else:
                continue

    def load_from_stg_to_main(self):
        try:
            conn = psycopg2.connect(host=self.host_name , database=self.database_name ,user=self.db_user_name , password=self.db_password )
            sql4= """INSERT INTO db_name.t1 SELECT * from db_name.t1_stg"""
            sql5= """INSERT INTO db_name.random_table SELECT * from db_name.random_table_stg"""
            cur = conn.cursor()
            cur.execute(sql4)
            insert_counts=cur.rowcount
            logger.info('INSERTION STAGE-TO-MAIN: t1 ' + str(insert_counts))
            cur.execute(sql5)
            insert_counts=cur.rowcount
            logger.info('INSERTION STAGE-TO-MAIN: random_table ' + str(insert_counts))
            conn.commit()
            cur.close()
        except Exception as error:
            raise error
        finally:
            if conn is not None:
                conn.close()

    def read_from_aurora(self, query):
        try:
            conn = psycopg2.connect(host=self.host_name , database=self.database_name ,user=self.db_user_name , password=self.db_password )
            cur = conn.cursor()
            cur.execute(query)
            res= cur.fetchall()
            cur.close()
            return res
        except Exception as error:
            raise error
        finally:
            if conn is not None:
                conn.close()

    def create_s3_processed(self):
        parent_path = self.ftp_directory_path.split("/")[-1]
        s3_processed= parent_path +'processed'
        return s3_processed
            
    def write_to_aurora(self, query):
        try:
            conn = psycopg2.connect(host=self.host_name , database=self.database_name ,user=self.db_user_name , password=self.db_password )
            cur = conn.cursor()
            cur.execute(query)
            conn.commit()
            cur.close()
        except Exception as error:
            raise error
        finally:
            if conn is not None:
                conn.close()

    def write_many_to_aurora(self, query, params):
        try:
            conn = psycopg2.connect(host=self.host_name , database=self.database_name ,user=self.db_user_name , password=self.db_password )
            cur = conn.cursor()
            cur.executemany(query, params)
            conn.commit()
            cur.close()
        except Exception as error:
            raise error
        finally:
            if conn is not None:
                conn.close()

    def delete_table(self, query):
        try:
            conn = psycopg2.connect(host=self.host_name , database=self.database_name ,user=self.db_user_name , password=self.db_password )
            cur = conn.cursor()
            cur.execute(query)
            conn.commit()
            cur.close()
        except Exception as error:
            raise error
        finally:
            if conn is not None:
                conn.close()

if __name__ == "__main__":
    job = Glue_Job_Example()
    #delete duplicates
    job.delete_old_records_in_main()
    #main function read from s3 to aurora
    job.read_s3to_aurora()
    #archive files 
    job.archive_file()
    logger.info('ARCHIVE JOB DONE')
    #clear s3 bucket after files has been processed
    job.clear_to_process_s3()  
    
