In [5]:
pip install faker

Collecting faker
  Using cached faker-37.1.0-py3-none-any.whl.metadata (15 kB)
Downloading faker-37.1.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hInstalling collected packages: faker
Successfully installed faker-37.1.0
Note: you may need to restart the kernel to use updated packages.


In [6]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.


In [36]:
from faker import Faker
import pandas as pd
import random
from datetime import datetime, timedelta
import os
import numpy as np

fake = Faker()

# Configuration
NUM_CUSTOMERS = 2000
NUM_PRODUCTS = 300
NUM_TRANSACTIONS = 30000
NUM_REVIEWS = 7500
START_DATE = datetime(2021, 1, 1)
END_DATE = datetime(2023, 1, 1)


def generate_dimensions():
    # Locations
    locations = [(i, fake.city(), fake.state(), fake.country())
                 for i in range(1, 501)]
    location_df = pd.DataFrame(locations, columns=['location_id', 'city', 'state', 'country'])
    location_df.to_csv('data/raw/dim_location.csv', index=False)

    # Categories
    categories = [
        (1, 'Electronics'), (2, 'Clothing'),
        (3, 'Home Appliances'), (4, 'Books'),
        (5, 'Sports'), (6, 'Beauty'),
        (7, 'Toys'), (8, 'Groceries')
    ]
    category_df = pd.DataFrame(categories, columns=['category_id', 'category_name'])
    category_df.to_csv('work/data/raw/dim_category.csv', index=False)

    # Products
    products = []
    for i in range(1, NUM_PRODUCTS + 1):
        products.append((
            i,
            fake.word().title() + " " + fake.word().title(),
            random.choice(categories)[0],
            round(random.uniform(5, 500), 2)
        ))

    # Introduce NaN values randomly into the `product_name` column
    product_df = pd.DataFrame(products, columns=['product_id', 'product_name', 'category_id', 'price'])
    product_df['product_name'] = product_df['product_name'].apply(
        lambda x: x if random.random() > 0.05 else None
    )

    # Introduce some duplicate products to simulate data issues
    product_df = pd.concat([product_df, product_df.sample(10)], ignore_index=True)

    # Introduce an outlier in the price column (e.g., price > 1000)
    product_df.loc[product_df.sample(5).index, 'price'] = random.randint(1000, 5000)

    product_df.to_csv('/work/data/raw/dim_product.csv', index=False)

    # Customers
    customers = []
    for i in range(1, NUM_CUSTOMERS + 1):
        customers.append((
            i,
            fake.name(),
            fake.email(),
            fake.date_between(START_DATE, END_DATE),
            random.choice(locations)[0]
        ))

    customer_df = pd.DataFrame(customers, columns=['customer_id', 'name', 'email', 'signup_date', 'location_id'])

    # Introduce NaN values randomly into the `email` and `signup_date` columns
    customer_df['email'] = customer_df['email'].apply(
        lambda x: x if random.random() > 0.05 else None
    )
    customer_df['signup_date'] = customer_df['signup_date'].apply(
        lambda x: x if random.random() > 0.05 else None
    )

    # Introduce duplicate customers to simulate data issues
    customer_df = pd.concat([customer_df, customer_df.sample(5)], ignore_index=True)

    customer_df.to_csv('/work/data/raw/dim_customer.csv', index=False)
def generate_facts():
    # Date dimension
    dates = []
    current_date = START_DATE
    while current_date <= END_DATE:
        dates.append((
            current_date.date(),
            current_date.day,
            current_date.month,
            (current_date.month - 1) // 3 + 1,
            current_date.year,
            current_date.weekday() + 1,
            current_date.weekday() >= 5
        ))
        current_date += timedelta(days=1)

    date_df = pd.DataFrame(dates, columns=['date_id', 'day', 'month', 'quarter', 'year', 'day_of_week', 'is_weekend'])
    date_df.to_csv('/work/data/raw/dim_date.csv', index=False)

    # Transactions
    transactions = []
    products = pd.read_csv('/work/data/raw/dim_product.csv')
    for i in range(1, NUM_TRANSACTIONS + 1):
        product = products.sample(1).iloc[0]
        transactions.append((
            i,
            random.randint(1, NUM_CUSTOMERS),
            product['product_id'],
            fake.date_between(START_DATE, END_DATE).isoformat(),
            round(product['price'] * random.uniform(0.8, 1.2) * random.randint(1, 5), 2),
            random.randint(1, 5)
        ))

    transaction_df = pd.DataFrame(transactions,
                                  columns=['transaction_id', 'customer_id', 'product_id', 'date_id', 'amount',
                                           'quantity'])

    # Introduce NaN values randomly into the `amount` column
    transaction_df['amount'] = transaction_df['amount'].apply(
        lambda x: x if random.random() > 0.05 else None
    )

    # Introduce duplicate transactions to simulate data issues
    transaction_df = pd.concat([transaction_df, transaction_df.sample(10)], ignore_index=True)

    # Introduce outliers in the `amount` column (e.g., extremely high values)
    transaction_df.loc[transaction_df.sample(5).index, 'amount'] = random.randint(1000, 10000)

    transaction_df.to_csv('/work/data/raw/fact_transactions.csv', index=False)

    # Reviews
    reviews = []
    for i in range(1, NUM_REVIEWS + 1):
        transaction = random.choice(transactions)
        reviews.append((
            i,
            transaction[1],  # customer_id
            transaction[2],  # product_id
            (datetime.strptime(transaction[3], '%Y-%m-%d') + timedelta(days=random.randint(1, 30))).date(),
            random.randint(1, 5),
            fake.paragraph()
        ))

    review_df = pd.DataFrame(reviews,
                             columns=['review_id', 'customer_id', 'product_id', 'date_id', 'rating', 'review_text'])

    # Introduce NaN values randomly into the `review_text` column
    review_df['review_text'] = review_df['review_text'].apply(
        lambda x: x if random.random() > 0.05 else None
    )

    # Introduce duplicate reviews to simulate data issues
    review_df = pd.concat([review_df, review_df.sample(5)], ignore_index=True)

    review_df.to_csv('/work/data/raw/fact_reviews.csv', index=False)


if __name__ == "__main__":
    os.makedirs('/work/data/raw', exist_ok=True)
    generate_dimensions()
    generate_facts()
    print("Data generation complete! Files saved to data/raw/")


PermissionError: [Errno 13] Permission denied: '/work'

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev
from pyspark.sql.types import FloatType, DoubleType, IntegerType

# 1️⃣ Initialize Spark Session
spark = SparkSession.builder.appName("Data_Cleaning").getOrCreate()

# 2️⃣ List of CSV files
files = {
    "dim_category": "data/raw/dim_category.csv",
    "dim_customer": "data/raw/dim_customer.csv",
    "dim_date": "data/raw/dim_date.csv",
    "dim_location": "data/raw/dim_location.csv",
    "dim_product": "data/raw/dim_product.csv",
    "fact_reviews": "data/raw/fact_reviews.csv",
    "fact_transaction": "data/raw/fact_transactions.csv"
}

# 3️⃣ Load CSV Files
dataframes = {name: spark.read.csv(path, header=True, inferSchema=True) for name, path in files.items()}

# 4️⃣ Data Cleaning Functions
def clean_dataframe(df):
    # Remove rows with nulls in any column
    df = df.dropna(how='any')
    
    # Remove duplicates
    df = df.dropDuplicates()
    
    # Remove outliers (Z-score method for numeric columns)
    numeric_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, (FloatType, DoubleType, IntegerType))]
    for col_name in numeric_cols:
        mean_val = df.select(mean(col(col_name))).collect()[0][0]
        stddev_val = df.select(stddev(col(col_name))).collect()[0][0]
        
        if mean_val is not None and stddev_val is not None and stddev_val != 0:
            df = df.filter((col(col_name) - mean_val) / stddev_val < 3)
    
    return df

# 5️⃣ Apply Cleaning Function
cleaned_dataframes = {name: clean_dataframe(df) for name, df in dataframes.items()}

# 6️⃣ Save Cleaned Data (optional)
for name, df in cleaned_dataframes.items():
    df.write.mode('overwrite').csv(f"data/refined/cleaned_{name}.csv", header=True)

# 7️⃣ Show Cleaned Data (optional)
for name, df in cleaned_dataframes.items():
    print(f"Cleaned Data for {name}:")
    df.show(5)

Cleaned Data for dim_category:
+-----------+-------------+
|category_id|category_name|
+-----------+-------------+
|          8|    Groceries|
|          2|     Clothing|
|          7|         Toys|
|          6|       Beauty|
|          4|        Books|
+-----------+-------------+
only showing top 5 rows

Cleaned Data for dim_customer:
+-----------+-----------------+--------------------+-----------+-----------+
|customer_id|             name|               email|signup_date|location_id|
+-----------+-----------------+--------------------+-----------+-----------+
|        160|   Jennifer Riggs|sellersjonathan@e...| 2022-05-08|        448|
|        413|Victoria Gonzalez|kellerjoy@example...| 2022-07-02|        139|
|        452|    James Freeman|mayertonya@exampl...| 2021-07-27|        261|
|        940|        Erik Dunn|banksjohn@example...| 2022-04-01|        202|
|        980|  Christopher Lam|josephlarson@exam...| 2021-03-15|        107|
+-----------+-----------------+--------------

In [12]:
# Adding refined data into postgress
!pip install sqlalchemy 



In [15]:
!pip install psycopg2-binary


Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (2.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.9/2.9 MB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.10


In [17]:
from sqlalchemy import create_engine
import pandas as pd

# Define PostgreSQL credentials
POSTGRES_USER = "postgres"
POSTGRES_PASSWORD = "password"  # Replace with actual password
POSTGRES_DB = "myapp"  # Replace with actual database name
POSTGRES_HOST = "db"  # Container name
POSTGRES_PORT = "5432"

# Create SQLAlchemy connection string
DATABASE_URL = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"

# Establish connection
engine = create_engine(DATABASE_URL)

# Test connection
try:
    with engine.connect() as connection:
        print("✅ Successfully connected to PostgreSQL!")
except Exception as e:
    print("❌ Connection failed:", e)


✅ Successfully connected to PostgreSQL!


In [33]:
# Loading data into postgress

df_category = spark.read.csv("/home/jovyan/data/refined/cleaned_dim_category.csv", header=True, inferSchema=True)
df_customer = spark.read.csv("/home/jovyan/data/refined/cleaned_dim_customer.csv", header=True, inferSchema=True)
df_date = spark.read.csv("/home/jovyan/data/refined/cleaned_dim_date.csv", header=True, inferSchema=True)
df_location = spark.read.csv("/home/jovyan/data/refined/cleaned_dim_location.csv", header=True, inferSchema=True)
df_product = spark.read.csv("/home/jovyan/data/refined/cleaned_dim_product.csv", header=True, inferSchema=True)
df_fact_reviews = spark.read.csv("/home/jovyan/data/refined/cleaned_fact_reviews.csv", header=True, inferSchema=True)
df_fact_transaction = spark.read.csv("/home/jovyan/data/refined/cleaned_fact_transaction.csv", header=True, inferSchema=True)


def save_to_postgres(spark_df, table_name, engine):
    # Convert PySpark DataFrame to Pandas DataFrame
    pandas_df = spark_df.toPandas()
    
    # Save to PostgreSQL
    pandas_df.to_sql(table_name, engine, if_exists='replace', index=False)
    print(f"Saved {table_name} to PostgreSQL")


save_to_postgres(df_category, "dim_category", engine)
save_to_postgres(df_customer, "dim_customer", engine)
save_to_postgres(df_date, "dim_date", engine)
save_to_postgres(df_location, "dim_location", engine)
save_to_postgres(df_product, "dim_product", engine)
save_to_postgres(df_fact_reviews, "fact_reviews", engine)
save_to_postgres(df_fact_transaction, "fact_transaction", engine)
# # Store in PostgreSQL table
# df.to_sql("category", con=engine, if_exists="replace", index=False)

# print("✅ Data loaded successfully into PostgreSQL!")

Saved dim_category to PostgreSQL
Saved dim_customer to PostgreSQL
Saved dim_date to PostgreSQL
Saved dim_location to PostgreSQL
Saved dim_product to PostgreSQL
Saved fact_reviews to PostgreSQL
Saved fact_transaction to PostgreSQL


In [35]:
df = pd.read_sql("SELECT * FROM dim_customer;", engine)
df.head()

Unnamed: 0,customer_id,name,email,signup_date,location_id
0,160,Jennifer Riggs,sellersjonathan@example.org,2022-05-08,448
1,413,Victoria Gonzalez,kellerjoy@example.com,2022-07-02,139
2,452,James Freeman,mayertonya@example.org,2021-07-27,261
3,940,Erik Dunn,banksjohn@example.org,2022-04-01,202
4,980,Christopher Lam,josephlarson@example.com,2021-03-15,107
