In [5]:

import yaml
from sqlalchemy import create_engine
import pandas as pd
from sqlalchemy import inspect
from sqlalchemy import text
from pandasgui import show
import numpy as np 
import tabula
import requests
import jpype


In [6]:
class DatabaseConnector:
    def __init__(self, yaml_file_path = 'db_creds.yaml'):
        self.engine = self.init_db_engine()

    def read_db_creds(self):
        with open('db_creds.yaml', 'r') as file:
            db_creds = yaml.safe_load(file)
            return db_creds

    def init_db_engine(self):
        engine = create_engine(f"postgresql://{self.read_db_creds()['RDS_USER']}:{self.read_db_creds()['RDS_PASSWORD']}@{self.read_db_creds()['RDS_HOST']}:{self.read_db_creds()['RDS_PORT']}/{self.read_db_creds()['RDS_DATABASE']}")
        engine.execution_options(isolation_level = 'AUTOCOMMIT').connect()
        return engine
        
    def list_db_tables(self):
        inspector = inspect(self.engine) 
        db_tables = inspector.get_table_names()
        return db_tables
    
    def upload_to_db(self, df, table_name):
        df.to_sql(table_name, con=self.engine, if_exists='replace', index=False)
    

In [29]:
class DataExtractor:
    def __init__(self, engine):
        self.engine = engine
        self.header_dictionary = {'x-api-key': 'yFBQbwXe9J3sd6zWVAMrK6lcxxr0q1lr2PT6DDMX'}
        self.base_url = 'https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/'
        
    def reads_rds_table(self, table_name):
        data = pd.read_sql_table(table_name, self.engine)
        df = pd.DataFrame(data)
        return df
        
    def retrieve_pdf_data(self, pdf_url):
        df_list = tabula.read_pdf(pdf_url, pages='all')
        extracted_data = pd.concat(df_list, ignore_index=True)
        return extracted_data
       
    def list_number_of_stores(self, number_of_stores_endpoint):
        response = requests.get(number_of_stores_endpoint, headers=self.header_dictionary)
        print(response.json())
        number_of_stores = response.json()['number_stores']
        return number_of_stores
    
    def retrieve_stores_data(self, retrieve_store_endpoint, number_of_stores):
        store_data_list = []
        for store_number in range(0, number_of_stores):
            endpoint_url = f"{self.base_url}{retrieve_store_endpoint}/{store_number}"
            response = requests.get(endpoint_url, headers=self.header_dictionary)
        
        store_df = pd.DataFrame(store_data_list)
        return store_df

In [30]:
RDS_CONNECTOR = DatabaseConnector()

RDS_CONNECTOR.init_db_engine()
       
Display_Data = DataExtractor(RDS_CONNECTOR.engine)

df = Display_Data.reads_rds_table("orders_table")

pdf_url = 'https://data-handling-public.s3.eu-west-1.amazonaws.com/card_details.pdf'
df1 = Display_Data.retrieve_pdf_data(pdf_url)


number_of_stores_endpoint = 'https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/number_stores'
retrieve_store_endpoint = 'https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/store_details/{store_number}'

number_of_stores = Display_Data.list_number_of_stores(number_of_stores_endpoint)
stores_df = Display_Data.retrieve_stores_data(retrieve_store_endpoint, number_of_stores)

print(stores_df)


  df[c] = pd.to_numeric(df[c], errors="ignore")


{'statusCode': 200, 'number_stores': 451}
Empty DataFrame
Columns: []
Index: []


In [None]:
class DataCleaning:
    def __init__(self, df):
        self.df = df

    def clean_user_data(self):
        self.clean_legacy_store_details()
        self.clean_legacy_users()
        self.clean_orders_table()

    def clean_legacy_store_details(self):
        self.df['lat'] = np.nan
        self.df = self.df.replace('NULL', np.nan)
        self.df = self.df.replace('N/A', np.nan)
        self.df.drop(self.df.columns[0], axis=1, inplace=True)

        self.df['opening_date'] = pd.to_datetime(self.df['opening_date'], errors='coerce', utc=False, format='mixed').dt.date

        self.df.dropna(axis=0, how='all', subset=self.df.columns[1:], inplace=True)
        self.df = self.df.dropna(axis=1, how='all')
        self.df = self.df.replace('NaT', np.nan)
        self.df = self.df.dropna(subset=['opening_date'])

        self.df['continent'] = self.df['continent'].replace('eeEurope', 'Europe')
        self.df['continent'] = self.df['continent'].replace('eeAmerica', 'America')
        self.df['staff_numbers'] = self.df['staff_numbers'].replace('e30', 30)

        return self.df

    def clean_legacy_users(self):
        self.df = self.df.replace('NULL', np.nan)
        self.df = self.df.replace('N/A', np.nan)
        self.df.drop(self.df.columns[0], axis=1, inplace=True)

        self.df['date_of_birth'] = pd.to_datetime(self.df['date_of_birth'], errors='coerce', utc=False, format='mixed').dt.date
        self.df['join_date'] = pd.to_datetime(self.df['join_date'], errors='coerce', utc=False, format='mixed').dt.date

        self.df.dropna(axis=0, how='all', subset=self.df.columns[1:], inplace=True)
        self.df = self.df.dropna(axis=1, how='all')
        self.df = self.df.replace('NaT', np.nan)
        self.df = self.df.dropna(subset=['date_of_birth'])

        self.df.drop_duplicates(inplace=True)

        phone_patterns = [r'^\+\d{1,3}-\d{3}-\d{3}-\d{4}$', r'^\d{3}-\d{3}-\d{4}$', r'^\+49-\d{3}-\d{6,}$',
                          r'^\+44\s?\d{1,5}\s?\d{4}\s?\d{4}$', r'^\+?[0-9()-]{7,}$']
        valid_phone_numbers = self.df['phone_number'].str.match('|'.join(phone_patterns))
        self.df.loc[~valid_phone_numbers, 'phone_number'] = np.nan

        valid_email_addresses = self.df['email_address'].str.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
        self.df.loc[~valid_email_addresses, 'email_address'] = np.nan

        return self.df
    
    def clean_orders_table(self):
        self.df = self.df.replace('NULL', np.nan)
        self.df = self.df.replace('N/A', np.nan)
        self.df.drop(self.df.columns[0], axis=1, inplace=True)

        self.df.dropna(axis=0, how='all', subset=self.df.columns[1:], inplace=True)
        self.df = self.df.dropna(axis=1, how='all')

        valid_card_numbers = self.df['card_number'].astype(str).apply(len).between(11, 19)
        self.df['card_number'] = np.where(valid_card_numbers, self.df['card_number'].astype(str), np.nan)

        return self.df
    
    def clean_card_data(self):
        self.df = self.df.replace('NULL', np.nan)
        self.df = self.df.replace('N/A', np.nan)

        self.df['date_payment_confirmed'] = pd.to_datetime(self.df['date_payment_confirmed'], errors='coerce', utc=False, format='mixed').dt.date

        self.df.dropna(axis=0, how='all', subset=self.df.columns[1:], inplace=True)
        self.df = self.df.dropna(axis=1, how='all')
        self.df = self.df.replace('NaT', np.nan)
        self.df = self.df.dropna(subset=['date_payment_confirmed'])

        self.df.drop_duplicates(inplace=True)

        return self.df

In [None]:

clean_data = DataCleaning(df)

cleaned_data = clean_data.clean_orders_table()

RDS_CONNECTOR.upload_to_db(cleaned_data, 'dim_users')

clean_data1 = DataCleaning(df1)

cleaned_data1 = clean_data1.clean_card_data()

RDS_CONNECTOR.upload_to_db(cleaned_data1, 'dim_card_details')