In [1]:
#!pip install schedule

In [1]:
import os
import numpy as np
import boto3
import pandas as pd
from dotenv import load_dotenv
import time
import logging
from botocore.exceptions import ClientError, NoCredentialsError
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import schedule
import warnings
# Suppress only DeprecationWarning
warnings.filterwarnings("ignore", category=DeprecationWarning)
from model_pipeline import model_pipeline

class S3BucketHandler(FileSystemEventHandler):
    def __init__(self, credentials_file='.env'):
        self.credentials_file = credentials_file
        self.bucket_name = "scetru-ml-bucket"
        self.s3 = self._load_credentials()
        self.last_contents = set()
        self.new_csv_files = [] #empty list to hold new files detected.

    def _load_credentials(self):
        load_dotenv(self.credentials_file)
        access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
        secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
        return boto3.client('s3', aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)

    def on_any_event(self, event):
        column_types = {
            'bvn': 'str',
            'application_id': 'str',
            'amount_requested': 'float',
            'date_created': 'date',
            'airtime_in_90days': 'float',
            'bill_payment_in_90days': 'float',
            'cable_tv_in_90days': 'float',
            'deposit_in_90days': 'float',
            'easy_payment_in_90days': 'float',
            'farmer_in_90days': 'float',
            'inter_bank_in_90days': 'float',
            'mobile_in_90days': 'float',
            'utility_bills_in_90days': 'float',
            'withdrawal_in_90days': 'float',
            }
        
        required_columns = list(column_types.keys())
        #bucket_name = "scetru-ml-bucket"
        
        try:
            current_contents = set(self._list_bucket_contents(self.bucket_name))
            new_files = current_contents - self.last_contents
            
            for file in new_files:
                if file.endswith('.csv'):
                    #### logging.info(f"Alert: New .csv file detected in bucket {self.bucket_name}: {file}")
                    df = self._read_csv_from_s3(self.bucket_name, file)
                        
                    # Check for required columns
                    missing_columns = set(required_columns) - set(df.columns)
                    if missing_columns:
                        print(f"Missing required columns in {file}: {missing_columns}")
                    else:
                        self.new_csv_files.append(df)
                        #### logging.info(f"All required columns present in {file}")                         
      
            self.last_contents = current_contents #set the last state of the bucket.

            # Merge dataframes if multiple new CSV files were detected
            while self.new_csv_files: 
                trans_data = pd.concat(self.new_csv_files, ignore_index=True) 
                trans_data = trans_data[required_columns]
                trans_data = trans_data[~trans_data['application_id'].isnull()] # excluded records where application_id is null
                trans_data.reset_index(drop=True, inplace=True)
                # convert columns to the right data types
                trans_data = S3BucketHandler.convert_columns_type(trans_data, column_types)
                trans_data = trans_data.drop_duplicates()
                trans_data.fillna(0.0, inplace=True)
                
                # merge trans_data with do_good_table
                merged_df = self.join_trans_with_do_good(trans_data)
                model_outcome = model_pipeline(merged_df)
                # logging.info(f"Parsed the merged DataFrame to model pipeline")
                
                # read and update the complete_table
                complete_table_ = self.read_and_create_complete_table(model_outcome)

                if not complete_table_.empty:
                    complete_table_.to_csv("complete_table.csv", index=False)

                    S3BucketHandler.saved_processed_df_as_csv(complete_table_) # save for audit
                    # upload the complete table
           
                    self.upload_files_to_s3_bucket(
                            local_filepath = "processed_loan_request", 
                            bucket_name = "complete-table" ,
                            s3_path_prefix = complete_table_.file_key.iloc[0].split('/')[0],
                            files_to_upload = [str(i) for i in complete_table_.application_id])
                
                else:
                    print("No records in complete table, continuing to watch for new files.")
                
                # Clear the list of new CSV files
                self.new_csv_files.clear()
                 
        except ClientError as e:
            print(f"Error accessing S3 bucket: {e}")

    def _list_bucket_contents(self, bucket_name):
        try:
            response = self.s3.list_objects_v2(Bucket=bucket_name)
            return [obj['Key'] for obj in response.get('Contents', [])]
        except ClientError as e:
            # Handle bucket not found error here (e.response['Error']['Code'] == 'NoSuchBucket')
            #### logging.error(f"Bucket {self.bucket_name} does not exist or access denied: {e}")
            return []  # Return an empty list to avoid further errors

    def _read_csv_from_s3(self, bucket_name, file_key):
        obj = self.s3.get_object(Bucket=bucket_name, Key=file_key)
        df = pd.read_csv(obj['Body'])
        df['file_key'] = file_key  # Add file_key as a new column
        return df
        
    def join_trans_with_do_good(self, trans_data):
        do_good_table = self.collate_file("scetru-fcmb-do-good-table")
        columns_ = ['bvn', 'applicationID', 'date_of_default', 'outstanding_balance']
        do_good_table = do_good_table[columns_]
        do_good_table['bvn'] = do_good_table['bvn'].astype(str)

        merged_df = trans_data.merge(do_good_table, on='bvn', how='left')
        merged_df['date_of_default'] = pd.to_datetime(merged_df['date_of_default'], errors='coerce')
        current_date = pd.Timestamp.now()
        merged_df['default_in_last_90days'] = np.where(
            ((current_date - merged_df['date_of_default']).dt.days <= 90) & (merged_df['outstanding_balance'] != 0), 
            'Y', 
            'N'
        )
        merged_df['has_it_make_it_good'] = np.where(
            (merged_df['outstanding_balance'] == 0) | (merged_df['default_in_last_90days'] == 'N'), 
            'Y', 
            'N'
        )
        
        merged_df.drop(columns=['date_of_default', 'outstanding_balance', 'applicationID'], inplace=True)
        return merged_df

    def read_and_create_complete_table(self, outcome_table):
        # Read and create complete table
        complete_table = self.collate_file("complete-table")
        complete_table.replace(r'^\s*$', np.nan, regex=True, inplace=True) # fill missing column with Nan
        complete_table['application_id'] = complete_table['application_id'].astype(str)
        complete_table['bvn'] = complete_table['bvn'].astype(str)
        # Excluding transactions previously processed by ml services or streaming process.
        complete_table = complete_table[(complete_table['decline_reason'].isnull()) & (complete_table['amount_approved'].isnull())].reset_index(drop=True)
        complete_table = complete_table.drop(columns=['amount_approved', 'decline_reason'])
        create_complete_table = complete_table.merge(outcome_table[outcome_table['application_id'].isin(complete_table['application_id'].unique())][['bvn', 'application_id', 'amount_approved','decline_reason']],
                               on=['bvn', 'application_id'], how='inner', suffixes=('', '_outcome'))

        # Add new columns and reorder (consider using pipe syntax)
        create_complete_table['updated_date'] = pd.Timestamp.now().floor('min')
        create_complete_table['loan_message'] = 'COMPLETED'
        create_complete_table = create_complete_table[['bvn', 'dob', 'amount_requested', 'application_id', 'loan_tenure','loan_repayment_structure',
                              'internal_id', 'amount_approved','created_date', 'updated_date', 'decline_reason', 'loan_message',
                              'file_key']]
        return create_complete_table

    def collate_file(self, bucket_name):
        new_files = []
        files = set(self._list_bucket_contents(bucket_name))
        for file in files:
            df = self._read_csv_from_s3(bucket_name, file)
            new_files.append(df)

        collated_files = pd.concat(new_files, ignore_index=True)
        return collated_files

    
    def upload_files_to_s3_bucket(self, local_filepath, bucket_name, s3_path_prefix='', files_to_upload=None):
        """
        Uploads all CSV files from a folder to an S3 bucket.
        
        Args:
            local_filepath (str): Path to the folder containing CSV files.
            bucket_name (str): Name of the S3 bucket to upload the files to.
            s3_path_prefix (str, optional): Prefix to add to the S3 file paths. Defaults to an empty string.
            files_to_upload (list, optional): List of filenames without extension to upload. Defaults to None.
        
        Returns:
            None
        """

        # Filter for CSV files to upload
        csv_files = [file for file in os.listdir(local_filepath) if file.endswith('.csv') 
                     and (files_to_upload is None or os.path.splitext(file)[0] 
                          in files_to_upload)
                    ]
        
        if not csv_files:
            print(f"No CSV files found in {local_filepath}.")
            return
        
        for filename in csv_files:
            local_file = os.path.join(local_filepath, filename)
            s3_file = f"{s3_path_prefix}/{filename}" if s3_path_prefix else filename

            try:
                self.s3.upload_file(local_file, bucket_name, s3_file)
                print(f"Upload Successful: from {local_file} to s3://{bucket_name}/{s3_file}")
            except FileNotFoundError:
                print(f"The file {local_file} was not found")
            except NoCredentialsError:
                print("Credentials not available")

    
    def clean_ml_bucket(self):
        """
        Deletes all files from the S3 bucket.
        """
        files_to_delete = self._list_bucket_contents(self.bucket_name)
        
        if files_to_delete:
            delete_objects = {'Objects': [{'Key': key} for key in files_to_delete]}
            try:
                self.s3.delete_objects(Bucket=self.bucket_name, Delete=delete_objects)
                print(f"Deleted {len(files_to_delete)} .csv files from {self.bucket_name}")
            except ClientError as e:
                print(f"An error occurred while deleting objects: {e}")
        else:
            print("No files found in the bucket")
        

    @staticmethod
    def convert_columns_type(df, column_types):
        """
        Converts columns in the DataFrame to the specified data types.

        Parameters:
        df (pd.DataFrame): The DataFrame to be converted.
        column_types (dict): A dictionary where keys are column names and values are the desired data types.

        Returns:
        pd.DataFrame: The DataFrame with converted columns.
        """
        for col, dtype in column_types.items():
            if col in df.columns:
                if dtype == 'date':
                    df[col] = pd.to_datetime(df[col]).dt.date
                else:
                    df[col] = df[col].astype(dtype)
            else:
                print(f"Warning: Column '{col}' does not exist in the DataFrame.")
        return df

    @staticmethod
    def saved_processed_df_as_csv(df_outcome):
        if len(df_outcome) > 0:
            # Ensure the directory to save the files exists
            output_dir = 'processed_loan_request' #staging area
            os.makedirs(output_dir, exist_ok=True)
    
            # Iterate over each unique application_id and save records to separate CSV files
            for application_id, group in df_outcome.groupby('application_id'):
                # Define the file path locally
                local_file_path = os.path.join(output_dir, f"{application_id}.csv")
    
                # Save the group to a CSV file
                group.to_csv(local_file_path, index=False)
    
            print("Files saved successfully.")


In [2]:
def monitor_s3_bucket(interval=1):
    handler = S3BucketHandler()
    observer = Observer()
    observer.schedule(handler, path='.', recursive=False)

    # Schedule the clean_ml_bucket method to run at midnight 
    schedule.every().day.at("00:00").do(handler.clean_ml_bucket)

    try:
        # Check bucket existence and credential validity before starting monitoring
        if not handler._list_bucket_contents(handler.bucket_name):
            #### logging.error(f"Bucket {bucket_name} does not exist or access denied.")
            return

        #### logging.info(f"Bucket {bucket_name} exists and access successful. Starting monitoring.")
        observer.start()
        
        while True:
            schedule.run_pending() # Run scheduled tasks
            time.sleep(interval)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

In [3]:
monitor_s3_bucket()

Deleted 2 .csv files from scetru-ml-bucket


In [13]:
# Upload return_complete_table
# archive the bucket_data
# clean bucket at midnight after 3 days.

#### TEST THE UPDATED COMPLETED TABLE

In [None]:
## update comeplete table
handler = S3BucketHandler()
local_filepath = "processed_loan_request", 
bucket_name = "complete-table" ,
s3_path_prefix = return_complete_table.file_key.iloc[0].split('/')[0],
files_to_upload = [str(i) for i in return_complete_table.application_id])

handler.load_files_to_s3_bucket(self, local_filepath, bucket_name, s3_path_prefix='', files_to_upload=None

In [7]:
## update comeplete table
handler = S3BucketHandler()
complete_ = handler.collate_file("complete-table")
#result = complete_[complete_['loan_message'] == 'IN PROGRESS']
#result.replace(r'^\s*$', np.nan, regex=True, inplace=True)
#result
complete_

Unnamed: 0,bvn,dob,amount_requested,application_id,loan_tenure,loan_repayment_structure,internal_id,amount_approved,created_date,updated_date,decline_reason,loan_message,file_key
0,22207845921,28-01-1998,670000.0,7564669779,1 year,monthly,sce-3a2ebb25-9ada-47d7-ba78-4faa79ebba42,0.0,2024-11-26 21:28,2024-11-28 10:31:00,Loan declined due to low transaction or incomp...,COMPLETED,fcmb_20241126/7564669779.csv
1,22150969042,02-01-2008,67000.0,253435676,1 year,monthly,sce-4d9471aa-7ea1-4234-8b2e-5313f0103e54,90000.0,2024-08-02 16:26,2024-08-02 17:26:00,,COMPLETED,fcmb_20240802/253435676.csv
2,22207845921,26-08-2024,340000.0,7564669779,1 year,monthly,sce-50b7f803-5ce4-42c4-871e-dd07278f824f,0.0,2024-08-28 15:41,2024-11-01 10:26:00,Loan declined due to low transaction or incomp...,Completed,fcmb_20240828/7564669779.csv
3,22377535967,02-02-2013,45000.0,54682632,1 year,biweekly,sce-de4dd6b7-2479-495f-aa02-49982e97d560,,2024-08-02 16:32,,Customer does not have enough transactions,DECLINED,fcmb_20240802/54682632.csv
4,22150969042,30-12-1998,340000.0,156748766,1 year,monthly,sce-4a0e8bc7-d2d8-4388-b766-219f51bc041a,90000.0,2024-11-26 21:27,2024-11-28 10:31:00,,COMPLETED,fcmb_20241126/156748766.csv
5,22150969042,21-01-1996,450000.0,156748766,1 year,biweekly,sce-cefe9f56-242c-4c2d-9d66-7441946edc5c,90000.0,2024-11-28 11:30,2024-11-28 11:38:00,,COMPLETED,fcmb_20241128/156748766.csv


In [17]:
handler = S3BucketHandler()
response = handler._list_bucket_contents("complete-table")
response

['fcmb_20240802/253435676.csv',
 'fcmb_20240802/54682632.csv',
 'fcmb_20240828/7564669779.csv',
 'fcmb_20241126/156748766.csv',
 'fcmb_20241126/7564669779.csv',
 'fcmb_20241128/156748766.csv']

In [29]:
handler = S3BucketHandler()
complete_ = handler.collate_file("complete-table")
# Define a function to parse datetime strings with or without seconds 
def parse_datetime(datetime_str): 
    try: 
        return pd.to_datetime(datetime_str, format='%Y-%m-%d %H:%M:%S') 
    except ValueError: 
        return pd.to_datetime(datetime_str, format='%Y-%m-%d %H:%M') 

# Convert 'update_date' column to datetime 
complete_['created_date'] = complete_['created_date'].apply(parse_datetime)
# Filter the DataFrame based on the date part of the 'update_date' column 
filtered_df = complete_[complete_['created_date'].dt.date == pd.to_datetime('2024-11-26').date()]
filtered_df

Unnamed: 0,bvn,dob,amount_requested,application_id,loan_tenure,loan_repayment_structure,internal_id,amount_approved,created_date,updated_date,decline_reason,loan_message,file_key
6,22207845921,28-01-1998,670000.0,7564669779,1 year,monthly,sce-3a2ebb25-9ada-47d7-ba78-4faa79ebba42,0.0,2024-11-26 21:28:00,2024-11-27 11:57:00,Loan declined due to low transaction or incomp...,Completed,fcmb_20241126/7564669779.csv
8,22150969042,30-12-1998,340000.0,156748766,1 year,monthly,sce-4a0e8bc7-d2d8-4388-b766-219f51bc041a,90000.0,2024-11-26 21:27:00,2024-11-27 11:57:00,,Completed,fcmb_20241126/156748766.csv
