In [34]:
import yaml
from sqlalchemy import create_engine, inspect
import pandas as pd
from dateutil.parser import parse
import tabula
import requests
import json
import boto3

class DatabaseConnector:
    def __init__(self, file=None):
        self.file = file
        self.db_creds = self.read_db_creds()
        self.db_engine = self.init_db_engine()
        self.db_table_list = self.list_db_tables()

    def read_db_creds(self):
        with open(self.file, 'r') as f:
            db_creds = yaml.safe_load(f)
            return db_creds
    
    def init_db_engine(self):
        db_engine = create_engine(f"postgresql://{self.db_creds['RDS_USER']}:{self.db_creds['RDS_PASSWORD']}@{self.db_creds['RDS_HOST']}:{self.db_creds['RDS_PORT']}/{self.db_creds['RDS_DATABASE']}")
        return db_engine

    def list_db_tables(self):
        insp = inspect(self.db_engine)
        db_table_list = insp.get_table_names()
        return db_table_list
    
    def upload_to_db(self, clean_dataframe, table_name):
        db_to_sql = clean_dataframe.to_sql(table_name, self.db_engine, if_exists='replace', index=False)
        return db_to_sql

class DataExtractor:
    def __init__(self, database=None):
        self.database = database

    def read_rds_table(self, table_name):
        table_data = pd.read_sql_table(table_name, self.database).set_index('index')
        return table_data

    def retrieve_pdf_data(self, pdf_path):
        pdf_df_page = tabula.read_pdf(pdf_path, pages='all')
        pdf_df = pd.concat(pdf_df_page, ignore_index=True)
        return pdf_df
    
    def list_number_of_stores(self, number_of_stores_endpoint, header):
        response = requests.get(number_of_stores_endpoint, headers=header)
        number_of_stores_data = response.json()
        return number_of_stores_data['number_stores']
    
    def retrieve_stores_data(self, store_endpoint, number_of_stores, header):
        store_df = []
        for store_number in range(number_of_stores):
            response = requests.get(f'{store_endpoint}{store_number}', headers=header).json()
            store = pd.json_normalize(response)
            store_df.append(store)
        stores_df = pd.concat(store_df).set_index('index')
        return stores_df
    
    def extract_from_s3_csv(self,s3_path, local_path):
        # Split S3 path into bucket and key
        bucket, key = s3_path.replace('s3://', '').split('/')   
        # Create an S3 client
        s3 = boto3.client('s3')
        
        # Download the file from S3
        s3.download_file(bucket, key, local_path)
        
        # Read the CSV file into a pandas DataFrame
        df = pd.read_csv(local_path, index_col=0)
        
        return df
    
    def extract_from_s3_json(self,s3_path, local_path):
        # Split S3 path into bucket and key
        bucket, key = s3_path.replace('s3://', '').split('/')   
        # Create an S3 client
        s3 = boto3.client('s3')
        
        # Download the file from S3
        s3.download_file(bucket, key, local_path)
        
        # Read the CSV file into a pandas DataFrame
        df = pd.read_json(local_path)
        return df
    
class DataCleaning:
    def __init__(self, dataframe):
        self.dataframe = dataframe

    def clean_user_data(self):
        # Remove NULL values and duplicates
        self.dataframe = self.dataframe.dropna().drop_duplicates()

        # Clean country code
        self.dataframe['country_code'] = self.dataframe['country_code'].replace('GGB', 'GB')
        self.dataframe = self.dataframe[self.dataframe['country_code'].str.len() == 2]

        # Clean dates
        self.dataframe.loc[:,'date_of_birth'] = pd.to_datetime(self.dataframe['date_of_birth'].apply(parse))
        self.dataframe.loc[:,'join_date'] = pd.to_datetime(self.dataframe['join_date'].apply(parse))

        # Clean phone numbers
        regex = '^(\(?\+?[0-9]*\)?)?[0-9_\- \(\)]*$'
        self.dataframe.loc[:,'phone_number'] = self.dataframe['phone_number'].str.replace('(0)', '', regex=False)
        self.dataframe.loc[:,'phone_number'] = self.dataframe['phone_number'].replace(r'\D+', '', regex=True)

        return self.dataframe

    def clean_card_data(self):
        card_provider_list = ['Diners Club / Carte Blanche', 'American Express', 'JCB 16 digit',
                             'JCB 15 digit', 'Maestro', 'Mastercard', 'Discover',
                             'VISA 19 digit', 'VISA 16 digit', 'VISA 13 digit']

        # Filter card data based on card providers
        self.dataframe = self.dataframe[self.dataframe['card_provider'].isin(card_provider_list)]

        # Clean and format date columns
        self.dataframe.loc[:,'expiry_date'] = pd.to_datetime(self.dataframe['expiry_date'], errors='coerce', format='%m/%y')
        self.dataframe.loc[:,'date_payment_confirmed'] = pd.to_datetime(self.dataframe['date_payment_confirmed'], errors='coerce', format='%Y-%m-%d')

        # Drop NULL values and duplicates
        self.dataframe = self.dataframe.dropna().drop_duplicates()

        return self.dataframe

    def clean_store_data(self):
        self.dataframe = self.dataframe[self.dataframe['country_code'].str.len() == 2]
        self.dataframe.loc[:, 'opening_date'] = pd.to_datetime(self.dataframe['opening_date'], errors='coerce', format='%Y-%m-%d')

        self.dataframe.loc[:, 'continent'] = self.dataframe['continent'].replace(['eeEurope', 'eeAmerica'], ['Europe', 'America'])

        self.dataframe = self.dataframe.drop(columns='lat')
        self.dataframe['staff_numbers'] = self.dataframe['staff_numbers'].apply(lambda x: "".join(filter(str.isdigit, str(x))))

        self.dataframe = self.dataframe.dropna().drop_duplicates()

        return self.dataframe
        
    def convert_product_weights(self):
        replacements = {
            'kg': '',
            'g': '/1000',
            'ml': '/1000',
            'x': '*',
            'oz': '/35.274',
            '77/1000 .': '77/1000'
        }
    
        self.dataframe['weight'] = self.dataframe['weight'].replace(replacements, regex=True)
        self.dataframe['weight'] = self.dataframe['weight'].str.replace('77/1000 .', '77/1000', regex=True)
        self.dataframe['weight'] = self.dataframe['weight'].apply(lambda x: eval(str(x))).astype(float)
        return self.dataframe

    def clean_products_data(self):
        self.dataframe.loc[:,'removed'] = self.dataframe['removed'].str.replace('Still_avaliable', 'Still_available')
        self.dataframe = self.dataframe[self.dataframe['removed'].isin(['Still_available', 'Removed'])]
        self.dataframe = self.convert_product_weights()
        return self.dataframe
    
    def clean_orders_data(self):
        self.dataframe = self.dataframe.drop(columns=['level_0', 'first_name', 'last_name', '1'])
        self.dataframe = self.dataframe.dropna().drop_duplicates()
        return self.dataframe
    
    def clean_date_times(self):
        self.dataframe = self.dataframe[self.dataframe['day'].apply(lambda x: len(str(x)) <= 2)]
        self.dataframe = self.dataframe.dropna().drop_duplicates()
        return self.dataframe

In [17]:
yaml_database = DatabaseConnector(file='db_creds.yaml').init_db_engine()
user_data_df = DataExtractor(yaml_database).read_rds_table('legacy_users')
cleaned_df = DataCleaning(user_data_df).clean_user_data()
user_data_to_sql = DatabaseConnector(file='sales_data_creds.yaml').upload_to_db(cleaned_df, 'dim_users')


In [None]:
pdf_file = DataExtractor().retrieve_pdf_data("https://data-handling-public.s3.eu-west-1.amazonaws.com/card_details.pdf")
data_pdf = DataCleaning(dataframe=pdf_file).clean_card_data()
card_details_to_sql = DatabaseConnector(file='sales_data_creds.yaml').upload_to_db(data_pdf, 'dim_card_details')

In [None]:
headers = {'x-api-key': 'yFBQbwXe9J3sd6zWVAMrK6lcxxr0q1lr2PT6DDMX'}

number_of_stores = DataExtractor().list_number_of_stores('https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/number_stores', headers)
stores_data = DataExtractor().retrieve_stores_data(f'https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/store_details/', number_of_stores, headers)

stores_data_clean = DataCleaning(stores_data).clean_store_data()
store_data_to_sql = DatabaseConnector('sales_data_creds.yaml').upload_to_db(stores_data_clean, 'dim_store_details')

In [None]:
#import s3fs
#def extract_from_s3(s3_resource):
    #df = pd.read_csv(s3_resource, index_col=0)
    #return df

#s3_df = extract_from_s3('s3://data-handling-public/products.csv')
#s3_df

In [None]:
# Use the local_path in the function
s3_df = DataExtractor().extract_from_s3_csv('s3://data-handling-public/products.csv', local_path = '/Users/kthuv/AiCore/Projects/MRDC/products.csv')

s3_data_df = DataCleaning(s3_df).clean_products_data()

products_df = DatabaseConnector(file='sales_data_creds.yaml').upload_to_db(s3_data_df, 'dim_products')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self.dataframe['weight'] = self.dataframe['weight'].replace(replacements, regex=True)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self.dataframe['weight'] = self.dataframe['weight'].str.replace('77/1000 .', '77/1000', regex=True)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self.dataframe['wei

In [None]:
yaml_database = DatabaseConnector(file='db_creds.yaml').init_db_engine()
orders_data_df = DataExtractor(yaml_database).read_rds_table('orders_table')
cleaned_order_df = DataCleaning(orders_data_df).clean_orders_data()
orders_data_to_sql = DatabaseConnector(file='sales_data_creds.yaml').upload_to_db(cleaned_order_df, 'orders_table')

In [35]:

s3_json_df = DataExtractor().extract_from_s3_json('s3://data-handling-public/date_details.json', local_path = '/Users/kthuv/AiCore/Projects/MRDC/date_details.json')
date_times_cleaned = DataCleaning(s3_json_df).clean_date_times()
date_times_to_sql = DatabaseConnector(file='sales_data_creds.yaml').upload_to_db(date_times_cleaned, 'dim_date_times')
