In [9]:
import csv
import os
import snowflake.connector
import sys
import json
import shutil
import logging
from datetime import datetime, timedelta
import pandas as pd
import snowflake.connector
from snowflake.connector.file_transfer_agent import SnowflakeFileTransferAgent


def check_schema(file_name,data):
    """
    CHeck the schema of the csv file and compare to the schema mentioned in the 
    process json.
    """
    with open(path+"\\Process\\"+process_name+".json",'r') as w:
        data=json.load(w)
    
    logging.info("Column list for process '{}' is {}\n".format(data["Process_Name"],data["Schema"]) )

    df=pd.read_csv(data["SourcePath"]+"\\"+file_name,nrows=0)  
    col_list=df.columns.tolist()
    logging.info("COlumn list available in csv file {} is {}\n".format(file_name,col_list))

    if data["Schema"]==col_list:
        logging.info("Schema is matching, process for data loading\n")

        conn = snowflake.connector.connect(
        user=data["SF_Conn"]["user"],
        password=data["SF_Conn"]["password"],
        account=data["SF_Conn"]["account"],   # e.g. abcd-xy123
        warehouse=data["SF_Conn"]["warehouse"],
        database=data["SF_Conn"]["database"],
        schema=data["SF_Conn"]["schema"]
        )

        logging.info("Connected to SF!\n")
        cursor = conn.cursor()
    
        load_to_stg(file_name,data,cursor)
    else:
        logging.info("Please check the schema in the csv file {} as it is not in sync with the proces json\n".format(file_name))
        
def json_process(process_name):
    """
    Process the json file availabe for mentioned process
    """
    logging.info("Details of process extraction started for process '{}'\n".format(process_name))
    with open(path+"\\Process\\"+process_name+".json",'r') as w:
        data=json.load(w)
        #Reading all values available in json
        try:
            Prcs_nm=data["Process_Name"]
            file_format=data["File_Format"]
            Trgt_db=data["TargetDB"]
            StageTbl=data["StageTbl"]
            TargetTbl=data["TargetTbl"]
            InputFilePath=data["SourcePath"]
            logging.info("Json file is successfully read for process: {}\n".format(process_name))     
        except Exception as e:
            logging.error("Please check the JSON for proper values\n", e)
            pass
        file_process(data)
        
def load_to_stg(file_name,data,cursor):

    """
        Loads the csv file to DB stage layer
        Here we are using Snowflake as our staging DB
    """

    file_path = data["SourcePath"]+"\\"+file_name
    stage_name = "@my_internal_stage"

    sql = f"PUT file://{file_path} {stage_name}"

    cursor.execute(sql)

    logging.info("File uploaded successfully to SF internal stage {}!\n".format(stage_name))

    cursor.execute("truncate table "+data["StageTbl"]+";")

    logging.info ("Stage table {} is Truncated is successful\n".format(data["StageTbl"]))

    sql=f"COPY INTO "+data["StageTbl"] +" FROM @my_internal_stage FILE_FORMAT = (TYPE = 'CSV' SKIP_HEADER=1) PURGE = TRUE"
    
    cursor.execute(sql)
    
    stg_cnt_qry=cursor.execute("select count (*) from {}".format(data["StageTbl"]))

    stg_cnt=cursor.fetchone()[0]
    
    logging.info("No of rows Inserted into stage table {} is {}\n".format(data["StageTbl"],stg_cnt))

    logging.info("Data is loaded to stage table {} successfully\n".format(data["StageTbl"]))

    #logging.info("Load the CSV file into SF stage layer started for file: {}".format(file_name))
    load_from_stg_to_core(file_name,data,cursor,stg_cnt)

def load_from_stg_to_core(file_name,data,cursor,stg_cnt):
    
    """
        Loads the data from stage layer to core layer
        Here we are using Snowflake as our staging and core DB
    """
    logging.info("Load the from stage to core layer started for file: {}\n".format(file_name))

    next_batch_qry="select coalesce(max(batch_sk),0)+1 from "+data["TargetTbl"] +";"
    
    cursor.execute(next_batch_qry)
    
    next_batch_sk=cursor.fetchone()[0]

    print("Next Batch sk is {}".format(next_batch_sk))

    del_qry=f"delete from "+data["TargetTbl"]+" where ("+data["key_col"]+") in (select "+data["key_col"] +" from "+data["CI_View"]+" );"

    qry=f"Insert into "+data["TargetTbl"]+" Select *, "+str(next_batch_sk)+",current_timestamp() from "+data["CI_View"]+";"

    logging.info("Deleting from core table which exists in CI View using query:\n{}".format(del_qry))
        
    cursor.execute(del_qry)

    logging.info("No of rows deleted from core table {} is {}\n".format(data["TargetTbl"],cursor.rowcount))

    logging.info("Inserting into core table using query:\n{}\n".format(qry))
    
    cursor.execute(qry)
    
    core_cnt=cursor.rowcount
    logging.info("No of rows inserted into core table {} is {}\n".format(data["TargetTbl"],cursor.rowcount))
    
    ######SF Audit entry #########

    audit_qry="""Insert into {} (process_name,file_name,load_timestamp,file_row_count,stage_row_count,core_row_count,core_batch_sk) 
    values ('{}','{}',current_timestamp(),0,{},{},{})
""".format(data["Audit_Tbl"],data["Process_Name"],file_name,stg_cnt,core_cnt,next_batch_sk)

    logging.info("Making audit entry:\n{}".format(audit_qry))

    cursor.execute(audit_qry)

    logging.info("Audit entry is successful into audit table:{}\n".format(data["Audit_Tbl"]))
    #logging.info("No of rows loaded into Table: {} is {}".format(data["TargetTbl"],Tbl_row_cnt))
    
    post_processing(file_name,data,cursor,stg_cnt,core_cnt,next_batch_sk)
    
def post_processing(file_name,data,cursor,stg_cnt,core_cnt,next_batch_sk):
    
    """
        Check count between source file and target table.
        If it matches then move the file to Archive folder
    """
    logging.info("Move the loaded file into Archive folder for file: {}\n".format(file_name))
    
    df=pd.read_csv(data["SourcePath"]+"\\"+file_name,nrows=5)   
    logging.info("Sample rows for file :{} is \n {}".format(file_name,df))

    df=pd.read_csv(data["SourcePath"]+"\\"+file_name)  
    row_count=len(df)
    logging.info("Number of rows available in file:{} is {}\n".format(file_name,row_count))
    
    if stg_cnt==row_count and stg_cnt==core_cnt:
        logging.info("Successfully loaded data into SF for filename: {}\n".format(file_name))

        audit_qry_upd="""Update {} set file_row_count={} where process_name='{}' and core_batch_sk={};
        """.format(data["Audit_Tbl"],row_count,data["Process_Name"],next_batch_sk)

        logging.info("Updating the file row count for process {} and batch_sk {}\n".format(data["Process_Name"],next_batch_sk))
        
        cursor.execute(audit_qry_upd)
        
        source_path=data["SourcePath"]+"\\"+file_name
        target_path=data["ArchivePath"]+"\\"+file_name

        shutil.move(source_path,target_path)

        logging.info ("File :{} is moved to archive folder: {}\n".format(file_name,data["ArchivePath"]))
    else:
        logging.error("Source to Target row count is not matching, Please check file: {}\n".format(file_name))

    
def file_process(data):
    
    """
    Function to process the files to load to DB
    """

    # Remove files from archive which are older than 7 days;
    files=os.listdir(data["ArchivePath"])
    for file in files:
        timestamp = datetime.fromtimestamp(os.path.getmtime(data["ArchivePath"]+"\\"+file))
        if datetime.now()-timestamp>timedelta(days=7):
                logging.info("{} is Older than 7 days, Removing it from Archive".format(file))
                os.remove(data["ArchivePath"]+"\\"+file)
            
    logging.info ("File pattern for process:'{}' is '{}'*.csv \n".format(data["Process_Name"],data["File_Format"]))
    
    files=os.listdir(data["SourcePath"])
    #print(files)
    to_be_processed=[]
    
    for file in files:
        """
        Identify if only file with ProductWiseSales name is present or name
        Ignore all other files
        """
        if data["File_Format"] in file:
            logging.info("List of files to be processed {} \n".format(file))
            to_be_processed.append(file)
    #print (file) 
    #print(to_be_processed)
    if not to_be_processed:
        logging.error("No file is there to process for Process:{}\n".format(data["Process_Name"]))
    else:
        for file_name in to_be_processed:
            
            check_schema(file_name,data)
            
#Process starts from here
if __name__ == "__main__":
    #print("My program starts here")
    path=r"C:\Users\neela\Downloads\PythonDemo\DataIngestion"
    logPath=path+"\\log\\"
    
    process_list=["Product","Employee"]
    
    process_name=input("Enter the process name you want to trigger")
    logging.basicConfig(
                filename=logPath+process_name+'_'+datetime.now().strftime("%Y%m%d%H%M%S")+'.log',
                level=logging.INFO,           # minimum log level to record
                format='%(asctime)s - %(levelname)s - %(message)s'
            )
    if process_name in process_list:
        logging.info("Processes available are {}\n".format(process_list))
        logging.info("Data Ingestion Process started at {}\n".format(datetime.now().strftime("%Y%m%d%H%M%S")))
        logging.info("'{}' process will be triggered \n".format(process_name))
        print("'{}' will be triggered, Please check the log path '{}' for details".format(process_name,logPath+"product*"))
        json_process(process_name)
        
        for handler in logging.root.handlers[:]:
            handler.close()
            logging.root.removeHandler(handler)
    else:
        logging.error("Please enter a valid process name \n")
        print("Please enter a valid Process Name from list {}".format(process_list))  
        for handler in logging.root.handlers[:]:
            handler.close()
            logging.root.removeHandler(handler)
    

Enter the process name you want to trigger Product


'Product' will be triggered, Please check the log path 'C:\Users\neela\Downloads\PythonDemo\DataIngestion\log\product*' for details
Next Batch sk is 15
