# KONTOUDAKIS NIKOS 283024

## libraries and setup

In [1]:
#required in order to run im my linux pc
import sys
sys.path.append('/usr/lib/python3.13/site-packages') 

In [2]:
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
import logging
import time
from io import StringIO

In [3]:
# error logger setup
error_logger = logging.getLogger('error_logger')
error_handler = logging.FileHandler('insert_errors.log')
error_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
error_logger.setLevel(logging.ERROR)
error_logger.addHandler(error_handler)

# timing logger setup
timing_logger = logging.getLogger('timing_logger')
timing_handler = logging.FileHandler('insert_timing.log')
timing_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
timing_logger.setLevel(logging.INFO)
timing_logger.addHandler(timing_handler)

# Data Ingestion and Cleaning

In [19]:
def check_dataframe_issues(df):
    issues = {}

    # missing data per column
    missing = df.isnull()
    cols_with_missing = missing.any()
    if cols_with_missing.any():
        issues['missing_data'] = {}
        for col in df.columns[cols_with_missing]:
            indices = missing.index[missing[col]].tolist()
            issues['missing_data'][col] = indices

    # eclude the id column when searching for duplicates
    duplicate_mask = df.iloc[:, 1:].duplicated()
    if duplicate_mask.any():
        duplicate_indices = df.index[duplicate_mask].tolist()
        issues['columns'] = df.columns.tolist()
        issues['duplicate_rows'] = duplicate_indices

    return issues

### If an input city is found to have a different value in the "StandardCity" column, it is considered an inconsistency

In [5]:
def find_inconsistent_cities(input_df, cities_lookup_df):
    issues = {}

    # filter mismatched raw and standard city names
    mismatched = cities_lookup_df[cities_lookup_df['RawCity'] != cities_lookup_df['StandardCity']]
    print(f"We are searching for the following errors: {mismatched["RawCity"].to_list()}")
    # get list of raw city names to search for
    raw_city_names = mismatched['RawCity'].unique()

    # check for matches in 'City' column
    if 'City' in input_df.columns:
        city_matches = input_df[input_df['City'].isin(raw_city_names)]
        if not city_matches.empty:
            issues['City'] = city_matches.index.tolist()

    # check for matches in 'Region' column
    if 'Region' in input_df.columns:
        region_matches = input_df[input_df['Region'].isin(raw_city_names)]
        if not region_matches.empty:
            issues['Region'] = region_matches.index.tolist()

    return issues

In [6]:
calendar_df = pd.read_csv("./cityretail_dataset/calendar.csv")
cities_lookup_df = pd.read_csv("./cityretail_dataset/cities_lookup.csv")
products_df = pd.read_csv("./cityretail_dataset/products.csv")
sales_df = pd.read_csv("./cityretail_dataset/sales.csv")
stores_df = pd.read_csv("./cityretail_dataset/stores.csv")
all_data_dfs = [calendar_df, products_df, sales_df, stores_df]

In [20]:
for df in all_data_dfs:
    print(check_dataframe_issues(df))

{}
{}
{'columns': ['SalesID', 'DateID', 'ProductID', 'StoreID', 'QtySold', 'Revenue'], 'duplicate_rows': [1950, 2266, 2359, 2512, 2803, 2852, 2923, 3014, 3160, 3241, 3422, 3574, 3764, 3901, 4452, 4714]}
{}


In [8]:
inconsistent_cities = find_inconsistent_cities(stores_df, cities_lookup_df)
print(f"inconsistent cities found: {inconsistent_cities}")

We are searching for the following errors: ['Athns', 'Thess', 'Patrass', 'Herakleion', 'Larisa']
inconsistent cities found: {}


### since all data are valid and have no inconsistencies, they will be inserted as bellow

In [9]:
for df in all_data_dfs:
    display(df.head())

Unnamed: 0,DateID,Date,Year,Quarter,Month,Day,Weekday
0,1,2024-01-01,2024,1,1,1,Monday
1,2,2024-01-02,2024,1,1,2,Tuesday
2,3,2024-01-03,2024,1,1,3,Wednesday
3,4,2024-01-04,2024,1,1,4,Thursday
4,5,2024-01-05,2024,1,1,5,Friday


Unnamed: 0,ProductID,ProductName,Category,Subcategory,CostPrice,SalePrice
0,1001,Product_1,Dairy,Medium,16.03,25.61
1,1002,Product_2,Household,Low,5.19,7.2
2,1003,Product_3,Beverages,Medium,11.01,16.65
3,1004,Product_4,Dairy,Medium,12.46,19.04
4,1005,Product_5,Dairy,Medium,2.36,3.09


Unnamed: 0,SalesID,DateID,ProductID,StoreID,QtySold,Revenue
0,1,125,1015,208,3,40.35
1,2,71,1014,208,4,69.52
2,3,209,1011,203,2,57.72
3,4,269,1007,207,9,69.66
4,5,85,1008,202,1,4.76


Unnamed: 0,StoreID,StoreName,City,Region
0,201,Store_1,Heraklion,Crete
1,202,Store_2,Larissa,Thessaly
2,203,Store_3,Athens,Attica
3,204,Store_4,Larissa,Thessaly
4,205,Store_5,Larissa,Thessaly


# OLAP Schema

In [10]:
# IT IS REQUIRED TO HAVE A DATABASE CALLED "PROJECT" before executing the script
db_params = {
    'host': 'localhost',
    'port': 5432,
    'dbname': 'project',
    'user': 'postgres',
    'password': ''
}

### Read the ddl and create tables/constraints

In [11]:
with open("./cityretail_dataset/star_schema.sql", 'r') as f:
    sql_commands = f.read()

# split statements by semicolon and strip whitespace
statements = [s.strip() for s in sql_commands.split(';') if s.strip()]

with psycopg2.connect(**db_params) as conn:
    with conn.cursor() as cur:
        for stmt in statements:
            try:
                cur.execute(stmt)
            except Exception as e:
                print(f"error executing statement:\n{stmt}\n{e}")
    conn.commit()

In [12]:
def insert_df_batch(df, table_name, db_params, batch_size=200, slow_threshold=2.0):
    if df.empty:
        timing_logger.info(f'{table_name} - Skipped (empty dataframe)')
        return

    cols = ','.join(df.columns)
    values_template = '(' + ','.join(['%s'] * len(df.columns)) + ')'
    rows = df.values.tolist()

    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cur:
            for i in range(0, len(rows), batch_size):
                batch = rows[i:i + batch_size]
                start = time.perf_counter()
                try:
                    execute_values(
                        cur,
                        f'INSERT INTO {table_name} ({cols}) VALUES %s',
                        batch,
                        template=values_template
                    )
                except Exception as e:
                    conn.rollback()
                    error_logger.error(f'{table_name} batch {i}-{i+len(batch)} failed: {e}')
                else:
                    conn.commit()
                    elapsed = time.perf_counter() - start
                    msg = f'{table_name} batch {i}-{i+len(batch)} completed in {elapsed:.2f}s'
                    if elapsed >= slow_threshold:
                        timing_logger.warning(f'SLOW - {msg}')
                    else:
                        timing_logger.info(msg)

In [13]:
insert_df_batch(calendar_df, "dimdate", db_params= db_params)
insert_df_batch(products_df, "dimproduct", db_params= db_params)
insert_df_batch(stores_df, "dimstore", db_params= db_params)
insert_df_batch(sales_df, "factsales", db_params= db_params)

# Measures and Aggregations

In [14]:
# list of queries to create views and materialized views
queries = [

    # core sales view (no index needed)
    """
    CREATE OR REPLACE VIEW CoreSalesKPIs AS
    SELECT
        SUM(QtySold) AS TotalQuantitySold,
        SUM(Revenue) AS TotalRevenue,
        CASE 
            WHEN SUM(QtySold) > 0 THEN ROUND(SUM(Revenue) / SUM(QtySold), 2)
            ELSE 0
        END AS AverageSellingPrice
    FROM FactSales;
    """,

    # product-level materialized view
    """
    CREATE MATERIALIZED VIEW IF NOT EXISTS ProductPerformance AS
    SELECT
        p.ProductID,
        p.ProductName,
        p.Category,
        p.Subcategory,
        SUM(f.QtySold) AS TotalUnitsSold,
        SUM(f.Revenue) AS TotalRevenue,
        SUM(f.QtySold * p.CostPrice) AS TotalCost,
        SUM(f.Revenue - f.QtySold * p.CostPrice) AS TotalProfit,
        ROUND(
            CASE 
                WHEN SUM(f.Revenue) > 0 THEN 
                    (SUM(f.Revenue - f.QtySold * p.CostPrice) / SUM(f.Revenue)) * 100
                ELSE 0
            END, 2
        ) AS ProfitMarginPercentage
    FROM FactSales f
    JOIN DimProduct p ON f.ProductID = p.ProductID
    GROUP BY p.ProductID, p.ProductName, p.Category, p.Subcategory;
    """,

    # index for ProductPerformance
    """
    CREATE UNIQUE INDEX IF NOT EXISTS idx_product_performance_product_id 
    ON ProductPerformance(ProductID);
    """,

    # store/region materialized view
    """
    CREATE MATERIALIZED VIEW IF NOT EXISTS StoreRegionPerformance AS
    SELECT
        s.StoreID,
        s.StoreName,
        s.City,
        s.Region,
        SUM(f.Revenue) AS StoreRevenue,
        SUM(f.QtySold) AS StoreUnitsSold
    FROM FactSales f
    JOIN DimStore s ON f.StoreID = s.StoreID
    GROUP BY s.StoreID, s.StoreName, s.City, s.Region;
    """,

    # index for StoreRegionPerformance
    """
    CREATE UNIQUE INDEX IF NOT EXISTS idx_store_region_performance_store_id 
    ON StoreRegionPerformance(StoreID);
    """,

    # monthly revenue materialized view
    """
    CREATE MATERIALIZED VIEW IF NOT EXISTS MonthlyRevenue AS
    SELECT
        d.Year,
        d.Month,
        TO_DATE(d.Year || '-' || d.Month || '-01', 'YYYY-MM-DD') AS PeriodStart,
        SUM(f.Revenue) AS MonthlyRevenue
    FROM FactSales f
    JOIN DimDate d ON f.DateID = d.DateID
    GROUP BY d.Year, d.Month;
    """,

    # index for MonthlyRevenue
    """
    CREATE UNIQUE INDEX IF NOT EXISTS idx_monthly_revenue_period 
    ON MonthlyRevenue(PeriodStart);
    """,

    # MoM growth view (no index needed)
    """
    CREATE OR REPLACE VIEW MoMGrowth AS
    SELECT
        m1.Year,
        m1.Month,
        m1.MonthlyRevenue,
        m2.MonthlyRevenue AS PreviousMonthRevenue,
        ROUND(
            CASE
                WHEN m2.MonthlyRevenue IS NULL OR m2.MonthlyRevenue = 0 THEN NULL
                ELSE ((m1.MonthlyRevenue - m2.MonthlyRevenue) / m2.MonthlyRevenue) * 100
            END, 2
        ) AS MoMGrowthPercentage
    FROM MonthlyRevenue m1
    LEFT JOIN MonthlyRevenue m2
        ON (m1.Year = m2.Year AND m1.Month = m2.Month + 1)
        OR (m1.Year = m2.Year + 1 AND m1.Month = 1 AND m2.Month = 12);
    """,

    # seasonality view (no index needed)
    """
    CREATE OR REPLACE VIEW SeasonalityAnalysis AS
    SELECT
        d.Weekday,
        d.Month,
        d.Quarter,
        SUM(f.Revenue) AS TotalRevenue,
        SUM(f.QtySold) AS TotalUnitsSold
    FROM FactSales f
    JOIN DimDate d ON f.DateID = d.DateID
    GROUP BY d.Weekday, d.Month, d.Quarter;
    """,

    # monthly basket size materialized view
    """
    CREATE MATERIALIZED VIEW IF NOT EXISTS MonthlyBasketSize AS
    SELECT
        d.Year,
        d.Month,
        TO_DATE(d.Year || '-' || d.Month || '-01', 'YYYY-MM-DD') AS PeriodStart,
        COUNT(DISTINCT f.SalesID) AS TotalTransactions,
        SUM(f.QtySold) AS TotalItemsSold,
        ROUND(SUM(f.QtySold)::numeric / NULLIF(COUNT(DISTINCT f.SalesID), 0), 2) AS AverageBasketSize
    FROM FactSales f
    JOIN DimDate d ON f.DateID = d.DateID
    GROUP BY d.Year, d.Month;
    """,

    # index for MonthlyBasketSize
    """
    CREATE UNIQUE INDEX IF NOT EXISTS idx_monthly_basket_size_period 
    ON MonthlyBasketSize(PeriodStart);
    """
]



if __name__ == "__main__":
    with psycopg2.connect(**db_params) as conn:
        with conn.cursor() as cur:
            for q in queries:
                try:
                    cur.execute(q)
                except Exception as e:
                    print("error running query:\n", q[:100], "...\n", e)
        conn.commit()
        print("views and materialized views created successfully")

views and materialized views created successfully
