In [32]:
import re
import pandas as pd
from datetime import datetime
import tabula as tb
import requests
import boto3
import io
import yaml
from sqlalchemy import create_engine, text

class DatabaseConnector:
    
    @staticmethod
    def read_db_creds(creds):
        with open(creds, 'r') as f:
            yaml_data = yaml.safe_load(f)
        return yaml_data

    @staticmethod
    def init_db_engine(creds):
        credentials = DatabaseConnector.read_db_creds(creds)

        HOST = credentials['HOST']
        PASSWORD = credentials['PASSWORD']
        USER = credentials['USER']
        DATABASE = credentials['DATABASE']
        PORT = credentials['PORT']

        connection_string = f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}"
        engine = create_engine(connection_string)

        return engine
    
    def list_db_tables(self):
        engine = self.init_db_engine("db_creds.yaml")
        tables = []
        with engine.connect() as connection:
            query = text("SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public';")
            result = connection.execute(query)

        for row in result:
            tables.append(row[0])

        return tables

    def upload_to_db(self, df, table_name):
        try:
            creds = "sales_db_creds.yaml"
            engine = self.init_db_engine(creds)
            df.to_sql(table_name, engine, if_exists='replace', index=False)
            print(f"Table '{table_name}' has been created in the database.")
        except Exception as e:
            print(f"Error creating table '{table_name}': {str(e)}")

class DataExtractor:
    def __init__(self, db_connector):
        self.db_connector = db_connector

    def read_rds_table(self, table_name, creds):
        tables = self.db_connector.list_db_tables()
        if table_name in tables:
            engine = self.db_connector.init_db_engine(creds)
            dataframe = pd.read_sql(table_name, engine)
            return dataframe
        else:
            print("Table not found.")
            return None

    @staticmethod
    def retrieve_pdf_data(url):
        all_pages_df = tb.read_pdf(url, pages='all')
        combined_df = pd.concat(all_pages_df, ignore_index=True)
        return combined_df

    @staticmethod
    def list_number_of_stores(endpoint, header_dict):
        response = requests.get(endpoint, headers=header_dict)
        if response.status_code == 200:
            return response.json()['number_stores']
        else:
            print("Failed to fetch the number of stores")
            return 0
    
    @staticmethod
    def retrieve_stores_data(endpoint, header_dict, no_of_stores):
        store_data_list = []

        for store in range(no_of_stores):
            full_endpoint = endpoint.format(store)
            response = requests.get(full_endpoint, headers=header_dict)
            if response.status_code == 200:
                store_data_list.append(response.json())
            else:
                print(f"Failed to retrieve data for store {store}")
                store_data_list.append(0)

        stores_df = pd.DataFrame(store_data_list)
        return stores_df

    @staticmethod
    def extract_from_s3(s3_address):
        s3 = boto3.client('s3')
        bucket, key = s3_address.replace('s3://', '').split('/', 1)
        try:
            response = s3.get_object(Bucket=bucket, Key=key)
            body = response['Body'].read()
            df = pd.read_csv(io.BytesIO(body))
            return df
        except Exception as e:
            print(f"Failed to extract data from S3: {str(e)}")
            return None

class DataCleaning:

    @staticmethod
    def clean_invalid_date(df, column_name):
        df[column_name] = pd.to_datetime(df[column_name], format='%Y-%m-%d', errors='ignore')
        df[column_name] = pd.to_datetime(df[column_name], format='%Y %B %d', errors='ignore')
        df[column_name] = pd.to_datetime(df[column_name], format='%B %Y %d', errors='ignore')
        df[column_name] = pd.to_datetime(df[column_name], errors='coerce')
        df.dropna(subset=[column_name], how='any', inplace=True)
        return df

    @staticmethod
    def clean_user_data(user_data):
        user_data = DataCleaning.clean_invalid_date(user_data, 'date_of_birth')
        user_data = DataCleaning.clean_invalid_date(user_data, 'join_date')
        user_data['address'] = user_data['address'].str.replace('\n', ', ')
        user_data = user_data[user_data['email_address'].str.match(r'^[\w\.-]+@[\w\.-]+\.\w+$')]
        user_data.loc[:, 'phone_number'] = user_data['phone_number'].str.replace(r'\D', '', regex=True)
        return user_data

    @staticmethod
    def clean_card_data(card_data):
        card_data = DataExtractor.retrieve_pdf_data(card_data)
        card_data['card_number'] = card_data['card_number'].apply(str)
        card_data = DataCleaning.clean_invalid_date(card_data, 'date_payment_confirmed')
        card_data = DataCleaning.clean_invalid_date(card_data, 'expiry_date')
        return card_data

    @staticmethod
    def clean_continent(store_df):
        store_df['continent'] = store_df['continent'].apply(lambda continent: "Europe" if "Europe" in continent else ("America" if "America" in continent else None))
        store_df.dropna(subset=['continent'], inplace=True)
        return store_df

    @staticmethod
    def clean_store_data(store_df):
        store_df.drop(columns='lat', inplace=True)
        store_df = DataCleaning.clean_invalid_date(store_df, "opening_date")
        store_df['address'] = store_df['address'].str.replace('\n', ', ')
        store_df = DataCleaning.clean_continent(store_df)
        return store_df

    @staticmethod
    def convert_multiple_weights(products):
        pattern = r'(\d+(\.\d+)?)\s*([kKgGmMoOzZlL]*)'
        products['weight'] = products['weight'].astype(str)
        for i, product in products.iterrows():
            if "x" in product['weight']:
                matches = re.findall(pattern, product['weight'])
                new_weights = []
                for match in matches:
                    quantity = float(match[0])
                    unit_weight = float(match[1]) if match[1] else 1.0
                    unit = match[2].lower()
                    new_weight = quantity * unit_weight
                    new_weights.append(f"{new_weight:.2f}{unit}")
                products.at[i, 'weight'] = ' x '.join(new_weights)
        return products

    @staticmethod
    def convert_product_weights(products):
        products = DataCleaning.convert_multiple_weights(products)
        for index, row in products.iterrows():
            weight = str(row['weight'])
            if "kg" in weight:
                numeric_part = weight.replace("kg", "").strip()
                numeric_value = float(numeric_part)
                products.at[index, 'weight'] = numeric_value
            elif "oz" in weight:
                numeric_part = weight.replace("oz", "").strip()
                numeric_value = float(numeric_part)
                products.at[index, 'weight'] = numeric_value * 0.0283495
            elif "ml" in weight:
                numeric_part = weight.replace("ml", "").strip()
                numeric_value = float(numeric_part)
                products.at[index, 'weight'] = numeric_value * 1.28
            elif "g" in weight:
                numeric_part = weight.replace("g", "").strip()
                numeric_value = float(numeric_part)
                products.at[index, 'weight'] = numeric_value / 1000
        return products

    @staticmethod
    def clean_products_data(products):
        products.dropna(inplace=True)
        products = DataCleaning.convert_product_weights(products)
        products = DataCleaning.clean_invalid_date(products, 'date_added')
        return products

    @staticmethod
    def clean_orders_data(orders_table):
        columns_to_drop = ['first_name', 'last_name', '1']
        orders_table.drop(columns=columns_to_drop, inplace=True)
        return orders_table

    @staticmethod
    def clean_sales_data(sales_table):
        sales_table['timestamp'] = pd.to_datetime(sales_table['timestamp'], format='%H:%M:%S')
        return sales_table

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  user_data.loc[:, 'phone_number'] = user_data['phone_number'].str.replace(r'\D', '', regex=True)


Table 'dim_users' has been created in the database.
