In [1]:
import sys, os
import pandas as pd

project_root = os.path.abspath(".")
if project_root not in sys.path:
    sys.path.append(project_root)

In [2]:
from connectors.database import DatabaseConnector
from config.settings import DB_URL

connector = DatabaseConnector(DB_URL)
engine = connector.connect()

In [3]:
from load.to_db import DataBaseLoader
from load.to_file import FileLoader
from extract.from_db import DatabaseExtractor
from extract.from_file import FileExtractor
from extract.from_API import ExchangeRateFetcher
from transform.add_quality import ADDQuality
from transform.transformer import DataTransformer
from validate.quality_checks import DataValidator

In [4]:
csv_extractor = FileExtractor("/Users/mohamedmoaaz/Desktop/usecase")
source_data_frames = csv_extractor.extract_from_csv("source")
exchange_rate_extractor = ExchangeRateFetcher("https://openexchangerates.org/api/latest.json?app_id=a92f8bad8e044bc79949a676886da2c8")
exchange_rate_df = exchange_rate_extractor.fetch_exchange_rates()

2025-04-18 21:40:53 =>   Extracted 1445 rows from customers.csv in source
2025-04-18 21:40:53 =>   Extracted 7 rows from categories.csv in source
2025-04-18 21:40:53 =>   Extracted 334 rows from products.csv in source
2025-04-18 21:40:53 =>   Extracted 1615 rows from orders.csv in source
2025-04-18 21:40:53 =>   Extracted 10 rows from staffs.csv in source
2025-04-18 21:40:53 =>   Extracted 4764 rows from order_items.csv in source
2025-04-18 21:40:53 =>   Extracted 9 rows from brands.csv in source
2025-04-18 21:40:53 =>   Extracted 3 rows from stores.csv in source
2025-04-18 21:40:53 =>   Extracted 939 rows from stocks.csv in source
2025-04-18 21:40:54 =>   Fetched 169 exchange rates from openexchangerates


In [5]:
postgres_loader = DataBaseLoader(engine)
csv_loader = FileLoader("/Users/mohamedmoaaz/Desktop/usecase")
for file_name in source_data_frames.keys():
    if file_name == "orders.csv" or file_name == "order_items.csv":
        table_name = file_name.replace(".csv", "")
        postgres_loader.load_to_postgres(source_data_frames[file_name], table_name)
    else:
        csv_loader.load_to_csv(source_data_frames[file_name], "datalake", file_name)
csv_loader.load_to_csv(exchange_rate_df, "datalake", "exchange_rates.csv")

2025-04-18 21:40:54 =>   Loaded 1445 rows into customers.csv in datalake
2025-04-18 21:40:54 =>   Loaded 7 rows into categories.csv in datalake
2025-04-18 21:40:54 =>   Loaded 334 rows into products.csv in datalake
2025-04-18 21:40:54 =>   Loaded 1615 rows into orders in PostgreSQL
2025-04-18 21:40:54 =>   Loaded 10 rows into staffs.csv in datalake
2025-04-18 21:40:54 =>   Loaded 4764 rows into order_items in PostgreSQL
2025-04-18 21:40:54 =>   Loaded 9 rows into brands.csv in datalake
2025-04-18 21:40:54 =>   Loaded 3 rows into stores.csv in datalake
2025-04-18 21:40:54 =>   Loaded 939 rows into stocks.csv in datalake
2025-04-18 21:40:54 =>   Loaded 169 rows into exchange_rates.csv in datalake


In [6]:
csv_extractor = FileExtractor("/Users/mohamedmoaaz/Desktop/usecase")
data_lake_data = csv_extractor.extract_from_csv("datalake")

postgres_extractor = DatabaseExtractor(DB_URL)
query1 = "SELECT * FROM order_items"
order_data = postgres_extractor.extract_from_postgres(query1, engine)

query2 = "SELECT * FROM orders"
order_items_data = postgres_extractor.extract_from_postgres(query2, engine)

2025-04-18 21:40:54 =>   Extracted 1445 rows from customers.csv in datalake
2025-04-18 21:40:54 =>   Extracted 7 rows from categories.csv in datalake
2025-04-18 21:40:54 =>   Extracted 334 rows from products.csv in datalake
2025-04-18 21:40:54 =>   Extracted 169 rows from exchange_rates.csv in datalake
2025-04-18 21:40:54 =>   Extracted 10 rows from staffs.csv in datalake
2025-04-18 21:40:54 =>   Extracted 9 rows from brands.csv in datalake
2025-04-18 21:40:54 =>   Extracted 3 rows from stores.csv in datalake
2025-04-18 21:40:54 =>   Extracted 939 rows from stocks.csv in datalake
2025-04-18 21:40:54 =>   Extracted 4764 rows from PostgreSQL
2025-04-18 21:40:54 =>   Extracted 1615 rows from PostgreSQL


In [7]:
data_quality = ADDQuality()
csv_loader = FileLoader("/Users/mohamedmoaaz/Desktop/usecase")
for file_name in data_lake_data.keys():
    qualified_data = data_quality.add_source_and_date(data_lake_data[file_name], "data lake")
    csv_loader.load_to_csv(qualified_data, "landing", file_name)
qualified_data = data_quality.add_source_and_date(order_data, "postgres db")
csv_loader.load_to_csv(qualified_data, "landing", "orders.csv")
qualified_data = data_quality.add_source_and_date(order_items_data, "postgres db")
csv_loader.load_to_csv(qualified_data, "landing", "order_items.csv")

2025-04-18 21:40:55 =>    Added source and extraction date to data
2025-04-18 21:40:55 =>   Loaded 1445 rows into customers.csv in landing
2025-04-18 21:40:55 =>    Added source and extraction date to data
2025-04-18 21:40:55 =>   Loaded 7 rows into categories.csv in landing
2025-04-18 21:40:55 =>    Added source and extraction date to data
2025-04-18 21:40:55 =>   Loaded 334 rows into products.csv in landing
2025-04-18 21:40:55 =>    Added source and extraction date to data
2025-04-18 21:40:55 =>   Loaded 169 rows into exchange_rates.csv in landing
2025-04-18 21:40:55 =>    Added source and extraction date to data
2025-04-18 21:40:55 =>   Loaded 10 rows into staffs.csv in landing
2025-04-18 21:40:55 =>    Added source and extraction date to data
2025-04-18 21:40:55 =>   Loaded 9 rows into brands.csv in landing
2025-04-18 21:40:55 =>    Added source and extraction date to data
2025-04-18 21:40:55 =>   Loaded 3 rows into stores.csv in landing
2025-04-18 21:40:55 =>    Added source and e

In [8]:
csv_extractor = FileExtractor("/Users/mohamedmoaaz/Desktop/usecase")
landing_data = csv_extractor.extract_from_csv("landing")
for file_name, df in landing_data.items():
    data_validator = DataValidator(df)
    data_validator.handle_nulls()
    data_validator.remove_duplicates()
    data_validator.validate_data()
    csv_loader.load_to_csv(df, "staging", file_name)

2025-04-18 21:40:55 =>   Extracted 1445 rows from customers.csv in landing
2025-04-18 21:40:55 =>   Extracted 7 rows from categories.csv in landing
2025-04-18 21:40:55 =>   Extracted 334 rows from products.csv in landing
2025-04-18 21:40:55 =>   Extracted 4764 rows from orders.csv in landing
2025-04-18 21:40:55 =>   Extracted 169 rows from exchange_rates.csv in landing
2025-04-18 21:40:55 =>   Extracted 10 rows from staffs.csv in landing
2025-04-18 21:40:55 =>   Extracted 1615 rows from order_items.csv in landing
2025-04-18 21:40:55 =>   Extracted 9 rows from brands.csv in landing
2025-04-18 21:40:55 =>   Extracted 3 rows from stores.csv in landing
2025-04-18 21:40:55 =>   Extracted 939 rows from stocks.csv in landing
2025-04-18 21:40:55 =>   Handled null values in columns: None with fill value: 0
2025-04-18 21:40:55 =>   Removed 0 duplicate rows
2025-04-18 21:40:55 =>   Loaded 1445 rows into customers.csv in staging
2025-04-18 21:40:55 =>   Handled null values in columns: None with fi

In [10]:
csv_extractor = FileExtractor("/Users/mohamedmoaaz/Desktop/usecase")
staging_data = csv_extractor.extract_from_csv("staging")
for file_name in staging_data.keys():
    if file_name != "products.csv" or file_name != "order_items.csv" or file_name != "customers.csv":
        csv_loader.load_to_csv(staging_data[file_name], "business", file_name)

data_transformer = DataTransformer()
transformed_products_data = data_transformer.apply_currency_conversion(staging_data["products.csv"], staging_data["exchange_rates.csv"])
csv_loader.load_to_csv(transformed_products_data, "business", "products.csv")
transformed_stocks_data = data_transformer.calculate_delivery_metrics(staging_data["order_items.csv"])
csv_loader.load_to_csv(transformed_stocks_data, "business", "order_items.csv")
transformed_customers_data = data_transformer.determine_locality_flag(
    staging_data['customers.csv'], 
    staging_data['stores.csv'], 
    staging_data['order_items.csv']
)
csv_loader.load_to_csv(transformed_customers_data, "business", "customers.csv")


2025-04-18 21:44:41 =>   Extracted 1445 rows from customers.csv in staging
2025-04-18 21:44:41 =>   Extracted 7 rows from categories.csv in staging
2025-04-18 21:44:41 =>   Extracted 321 rows from products.csv in staging
2025-04-18 21:44:41 =>   Extracted 4730 rows from orders.csv in staging
2025-04-18 21:44:41 =>   Extracted 169 rows from exchange_rates.csv in staging
2025-04-18 21:44:41 =>   Extracted 10 rows from staffs.csv in staging
2025-04-18 21:44:41 =>   Extracted 1615 rows from order_items.csv in staging
2025-04-18 21:44:41 =>   Extracted 9 rows from brands.csv in staging
2025-04-18 21:44:41 =>   Extracted 3 rows from stores.csv in staging
2025-04-18 21:44:41 =>   Extracted 939 rows from stocks.csv in staging
2025-04-18 21:44:41 =>   Loaded 1445 rows into customers.csv in business
2025-04-18 21:44:41 =>   Loaded 7 rows into categories.csv in business
2025-04-18 21:44:41 =>   Loaded 321 rows into products.csv in business
2025-04-18 21:44:41 =>   Loaded 4730 rows into orders.csv

In [17]:
csv_extractor = FileExtractor("/Users/mohamedmoaaz/Desktop/usecase")
business_data = csv_extractor.extract_from_csv("business")

merged = pd.merge(business_data['orders.csv'], business_data['order_items.csv'], on='order_id', how='left')
merged = pd.merge(merged, business_data['products.csv'], on='product_id', how='left')
merged = pd.merge(merged, business_data['brands.csv'], on='brand_id', how='left', 
                suffixes=('', '_brand'))
merged = pd.merge(merged, business_data['customers.csv'], on='customer_id', how='left', 
                suffixes=('', '_customer'))
merged = pd.merge(merged, business_data['stores.csv'], on='store_id', how='left', 
                suffixes=('', '_store'))
merged = pd.merge(merged, business_data['categories.csv'], on='category_id', how='left',
                    suffixes=('', '_category'))
merged = pd.merge(merged, business_data['staffs.csv'], on='staff_id', how='left', 
                    suffixes= ('', '_staff'))
bigtable = pd.merge(merged, business_data['stocks.csv'], on=['store_id', 'product_id'], 
                    how='left', suffixes=('', '_stock'))
csv_loader = FileLoader("/Users/mohamedmoaaz/Desktop/usecase")
csv_loader.load_to_csv(bigtable, "formation_mart", "bigtable.csv")

2025-04-18 21:56:10 =>   Extracted 1445 rows from customers.csv in business
2025-04-18 21:56:10 =>   Extracted 7 rows from categories.csv in business
2025-04-18 21:56:10 =>   Extracted 321 rows from products.csv in business
2025-04-18 21:56:10 =>   Extracted 4730 rows from orders.csv in business
2025-04-18 21:56:10 =>   Extracted 169 rows from exchange_rates.csv in business
2025-04-18 21:56:10 =>   Extracted 10 rows from staffs.csv in business
2025-04-18 21:56:10 =>   Extracted 1615 rows from order_items.csv in business
2025-04-18 21:56:10 =>   Extracted 9 rows from brands.csv in business
2025-04-18 21:56:10 =>   Extracted 3 rows from stores.csv in business
2025-04-18 21:56:10 =>   Extracted 939 rows from stocks.csv in business
2025-04-18 21:56:10 =>   Loaded 4730 rows into bigtable.csv in formation_mart
