# Creating Medallian architecture with Python and Cassandra

## Pre-requisite

In [4]:
# Install the Cassandra python driver
!pip install cassandra-driver



In [5]:
# Import the necessary libraries
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json


#### Reference taken from youtube video link provided in assignment

## 1. Setting up the Connection

In [16]:
# This secure connect bundle is autogenerated when you download your SCB,
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': 'secure-connect-learner.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("learner-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


## Loading and Reading Data from Cassandra

##  Load CSV data into Cassandra

### Creating Bronze Table

In [17]:
def create_tables(session):
    # Bronze Table
    session.execute("""
    CREATE TABLE IF NOT EXISTS example.bronze_sales (
        id UUID PRIMARY KEY,
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date TEXT,
        order_id TEXT,
        ship_date TEXT,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    );
    """)



    print("Tables created successfully.")

create_tables(session)


Tables created successfully.


### Inserting data to created bronze table

In [22]:
# Load the CSV file
import uuid
import pandas as pd

file_path = 'sales_100.csv'
data = pd.read_csv(file_path)

def load_bronze_table(session, data):
    for _, row in data.iterrows():
        session.execute("""
        INSERT INTO example.bronze_sales (id, region, country, item_type, sales_channel, order_priority, order_date, order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
        """, (uuid.uuid4(), row['Region'], row['Country'], row['Item Type'], row['Sales Channel'], row['Order Priority'], row['Order Date'], str(row['Order ID']), row['Ship Date'], row['UnitsSold'], row['UnitPrice'], row['UnitCost'], row['TotalRevenue'], row['TotalCost'], row['TotalProfit']))
    print("Bronze table loaded successfully.")

load_bronze_table(session, data)


Bronze table loaded successfully.


### reading from bronze table

In [31]:
# Select the data from the users table
session = cluster.connect()
rowo = session.execute("SELECT * FROM example.bronze_sales;")

# Count the number of rows
row_count = sum(1 for _ in rowo)
print(f"Total Rows in example.bronze_sales: {row_count}")

rows = session.execute("SELECT * FROM example.bronze_sales")
for row in rows:
    print(row)

# Close the session
session.shutdown()

Total Rows in example.bronze_sales: 99
Row(id=UUID('f15c28a0-2bf4-433a-9315-1a0f06d5848f'), country='Montenegro', item_type='Clothes', order_date='9/4/2016', order_id='902511680', order_priority='M', region='Europe', sales_channel='Offline', ship_date='9/8/2016', total_cost=75873.28125, total_profit=155472.484375, total_revenue=231345.765625, unit_cost=35.84000015258789, unit_price=109.27999877929688, units_sold=2117)
Row(id=UUID('e76a83cb-faa0-4876-938e-a480089062be'), country='Turkmenistan', item_type='Vegetables', order_date='4/13/2015', order_id='116205585', order_priority='M', region='Asia', sales_channel='Online', ship_date='6/2/2015', total_cost=606503.125, total_profit=421077.09375, total_revenue=1027580.1875, unit_cost=90.93000030517578, unit_price=154.05999755859375, units_sold=6670)
Row(id=UUID('207e881f-1d87-44d1-a6bf-f5a26e1b0da3'), country='Egypt', item_type='Clothes', order_date='8/9/2016', order_id='882908809', order_priority='C', region='Middle East and North Africa', 

### Data Profiing

In [32]:
session = cluster.connect()
session.set_keyspace('example')  # Replace 'example' with your keyspace

# Query data from Bronze table
rows = session.execute("SELECT * FROM bronze_sales;")

# Convert the data to a Pandas DataFrame
data = []
for row in rows:
    data.append(row._asdict())  # Convert Cassandra rows to dictionary
df = pd.DataFrame(data)

# Display the first few rows for verification
print("Bronze Data:\n", df.head())

Bronze Data:
                                      id       country   item_type order_date  \
0  f15c28a0-2bf4-433a-9315-1a0f06d5848f    Montenegro     Clothes   9/4/2016   
1  e76a83cb-faa0-4876-938e-a480089062be  Turkmenistan  Vegetables  4/13/2015   
2  207e881f-1d87-44d1-a6bf-f5a26e1b0da3         Egypt     Clothes   8/9/2016   
3  84d2a280-22a0-4db8-a7ac-d596251b3dcc        Samoa    Household  12/5/2016   
4  73ea680e-9ce1-4eb9-896b-25d79a469953     Singapore      Snacks  1/28/2013   

    order_id order_priority                        region sales_channel  \
0  902511680              M                        Europe       Offline   
1  116205585              M                          Asia        Online   
2  882908809              C  Middle East and North Africa        Online   
3  937431466              L         Australia and Oceania        Online   
4  176461303              C                          Asia        Online   

   ship_date    total_cost   total_profit  total_reven

In [33]:
# Check for null values
print("\nNull Value Counts:")
print(df.isnull().sum())

# Check for zero values in numeric columns
numeric_cols = ['units_sold', 'unit_price', 'unit_cost', 'total_revenue', 'total_cost', 'total_profit']
zero_counts = (df[numeric_cols] == 0).sum()
print("\nZero Value Counts in Numeric Columns:")
print(zero_counts)

# Descriptive statistics
print("\nDescriptive Statistics:")
print(df.describe())


Null Value Counts:
id                0
country           0
item_type         0
order_date        0
order_id          0
order_priority    0
region            0
sales_channel     0
ship_date         0
total_cost        0
total_profit      0
total_revenue     0
unit_cost         0
unit_price        0
units_sold        0
dtype: int64

Zero Value Counts in Numeric Columns:
units_sold       0
unit_price       0
unit_cost        0
total_revenue    0
total_cost       0
total_profit     0
dtype: int64

Descriptive Statistics:
         total_cost  total_profit  total_revenue   unit_cost  unit_price  \
count  9.900000e+01  9.900000e+01   9.900000e+01   99.000000   99.000000   
mean   1.044069e+06  4.174464e+05   1.461515e+06  191.126769  271.181016   
std    1.337149e+06  4.503887e+05   1.724090e+06  185.365590  233.458916   
min    9.155160e+03  3.188430e+03   1.234359e+04    6.920000    9.330000   
25%    9.318011e+04  5.877198e+04   1.597606e+05   35.840000   81.730003   
50%    3.924920e+05 

I do not see any nulls or zeros and any edge cases here so im moving to change the data types and creating silver tables.

### Creating Silver table

In [42]:
def create_tables(session):
    session.execute("""
    CREATE TABLE IF NOT EXISTS example.silver_sales (
        id UUID PRIMARY KEY,
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date DATE,
        order_id BIGINT,
        ship_date DATE,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    );
    """)



    print("Tables created successfully.")

create_tables(session)


Tables created successfully.


### loading data to silver table

In [61]:
import uuid
from datetime import datetime

# Convert string to date (YYYY-MM-DD format)
def convert_to_date(date_string):
    try:
        return datetime.strptime(date_string, "%m/%d/%Y").date()  # Assuming format is MM/DD/YYYY
    except ValueError:
        return None  # Return None if the date format is incorrect

# Convert order_id to BIGINT (long integer)
def convert_to_bigint(order_id_string):
    try:
        return int(order_id_string)
    except ValueError:
        return None  # Return None if order_id is invalid

# Insert cleaned data with converted order_id and dates into Silver table
for _, row in df.iterrows():
    order_date = convert_to_date(row['order_date'])
    ship_date = convert_to_date(row['ship_date'])
    order_id = convert_to_bigint(row['order_id'])  # Convert order_id to BIGINT

    session.execute("""
    INSERT INTO example.silver_sales (
        id, region, country, item_type, sales_channel, order_priority, order_date,
        order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit
    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
    """, (
        uuid.uuid4(),
        row['region'],
        row['country'],
        row['item_type'],
        row['sales_channel'],
        row['order_priority'],
        order_date,  # Store the converted date
        order_id,  # Store the converted BIGINT order_id
        ship_date,  # Store the converted date
        row['units_sold'],
        row['unit_price'],
        row['unit_cost'],
        row['total_revenue'],
        row['total_cost'],
        row['total_profit']
    ))

print("Data loaded to silver table")

Data loaded to silver table


### Data in silver table

In [47]:
rows = session.execute("SELECT * FROM example.silver_sales")
for row in rows:
    print(row)

Row(id=UUID('eaa3e33c-5d1e-4e12-9601-92dcfef59149'), country='Thailand', item_type='Fruits', order_date=Date(15613), order_id=434008300, order_priority='C', region='Asia', sales_channel='Offline', ship_date=Date(15658), total_cost=10760.599609375, total_profit=3747.550048828125, total_revenue=14508.150390625, unit_cost=6.920000076293945, unit_price=9.329999923706055, units_sold=1555)
Row(id=UUID('fc8f3f64-7f1f-4f0a-bb7e-ae598a9e0a1d'), country='Pakistan', item_type='Meat', order_date=Date(16067), order_id=500371730, order_priority='M', region='Middle East and North Africa', sales_channel='Online', ship_date=Date(16116), total_cost=3635594.5, total_profit=570226.8125, total_revenue=4205821.5, unit_cost=364.69000244140625, unit_price=421.8900146484375, units_sold=9969)
Row(id=UUID('df091210-e821-4d46-ab25-846b60c83e86'), country='Slovakia', item_type='Beverages', order_date=Date(17100), order_id=174590194, order_priority='L', region='Europe', sales_channel='Offline', ship_date=Date(17139

**Gold** **Tables**

In [48]:
session.execute("""
CREATE TABLE IF NOT EXISTS example.gold_sales_by_region (
    region TEXT PRIMARY KEY,
    total_revenue FLOAT,
    total_profit FLOAT
);
""")
print("Gold table 1: sales_by_region created successfully.")

Gold table 1: sales_by_region created successfully.


In [49]:
from collections import defaultdict

# Aggregate total revenue and profit by region
region_sales = defaultdict(lambda: {'total_revenue': 0, 'total_profit': 0})

# Retrieve data from Silver table
rows = session.execute("SELECT region, total_revenue, total_profit FROM example.silver_sales")
for row in rows:
    region_sales[row.region]['total_revenue'] += row.total_revenue
    region_sales[row.region]['total_profit'] += row.total_profit

# Insert aggregated data into Gold table
for region, sales_data in region_sales.items():
    session.execute("""
    INSERT INTO example.gold_sales_by_region (region, total_revenue, total_profit)
    VALUES (%s, %s, %s);
    """, (region, sales_data['total_revenue'], sales_data['total_profit']))

print("Gold table 1: Data for total_sales_by_region inserted successfully.")


Gold table 1: Data for total_sales_by_region inserted successfully.


In [51]:
rows = session.execute("SELECT * FROM example.gold_sales_by_region")
for row in rows:
    print(row.region, row.total_profit, row.total_revenue)

Australia and Oceania 3486940.0 10711258.0
Europe 11267281.0 34964748.0
Middle East and North Africa 6514262.0 24765128.0
Central America and the Caribbean 4252300.0 17570836.0
Asia 6749896.0 28840812.0
Sub-Saharan Africa 7651892.0 24225438.0
North America 1404621.5 3611757.5


In [52]:
session.execute("""
CREATE TABLE IF NOT EXISTS example.gold_top_selling_products (
    item_type TEXT PRIMARY KEY,
    total_revenue FLOAT,
    total_units_sold INT
);
""")
print("Gold table 2: top_selling_products created successfully.")


Gold table 2: top_selling_products created successfully.


In [53]:
top_selling = defaultdict(lambda: {'total_revenue': 0, 'total_units_sold': 0})

# Aggregate total revenue and units sold by product type
rows = session.execute("SELECT item_type, total_revenue, units_sold FROM example.silver_sales")
for row in rows:
    top_selling[row.item_type]['total_revenue'] += row.total_revenue
    top_selling[row.item_type]['total_units_sold'] += row.units_sold

# Insert aggregated data into Gold table
for item_type, sales_data in top_selling.items():
    session.execute("""
    INSERT INTO example.gold_top_selling_products (item_type, total_revenue, total_units_sold)
    VALUES (%s, %s, %s);
    """, (item_type, sales_data['total_revenue'], sales_data['total_units_sold']))

print("Gold table 2: Data for top_selling_products inserted successfully.")


Gold table 2: Data for top_selling_products inserted successfully.


In [56]:
rows = session.execute("SELECT * FROM example.gold_top_selling_products")
for row in rows:
    print(row.item_type, row.total_revenue, row.total_units_sold)

Household 38519084.0 57640
Office Supplies 27880904.0 42814
Vegetables 1135114.125 7368
Snacks 2193642.75 14377
Personal Care 3191147.75 39045
Meat 21278866.0 50437
Fruits 615033.625 65920
Beverages 2145024.75 45206
Cereal 9416123.0 45776
Cosmetics 28727100.0 65707
Baby Food 5200564.0 20372
Clothes 4387373.5 40148


In [57]:
session.execute("""
CREATE TABLE IF NOT EXISTS example.gold_sales_performance (
    year INT,
    month INT,
    total_revenue FLOAT,
    total_profit FLOAT,
    PRIMARY KEY (year, month)
);
""")
print("Gold table 3: sales_performance created successfully.")


Gold table 3: sales_performance created successfully.


In [59]:
from datetime import datetime

# Aggregate sales performance by year and month
monthly_sales = defaultdict(lambda: {'total_revenue': 0, 'total_profit': 0})

# Convert order_date to year and month, and aggregate revenue and profit
rows = session.execute("SELECT order_date, total_revenue, total_profit FROM example.silver_sales")
for row in rows:
    if row.order_date:
        # Convert order_date to a Python datetime object
        order_date = datetime.strptime(str(row.order_date), '%Y-%m-%d')  # Ensure correct format

        # Extract year and month
        year, month = order_date.year, order_date.month

        # Aggregate sales data by year and month
        monthly_sales[(year, month)]['total_revenue'] += row.total_revenue
        monthly_sales[(year, month)]['total_profit'] += row.total_profit

# Insert aggregated data into Gold table
for (year, month), sales_data in monthly_sales.items():
    session.execute("""
    INSERT INTO example.gold_sales_performance (year, month, total_revenue, total_profit)
    VALUES (%s, %s, %s, %s);
    """, (year, month, sales_data['total_revenue'], sales_data['total_profit']))

print("Gold table 3: Data for sales_performance inserted successfully.")



Gold table 3: Data for sales_performance inserted successfully.


In [60]:
rows = session.execute("SELECT * FROM example.gold_sales_performance")
for row in rows:
    print(row)

Row(year=2014, month=2, total_profit=562533.125, total_revenue=4081224.75)
Row(year=2014, month=3, total_profit=1592127.625, total_revenue=4003440.5)
Row(year=2014, month=6, total_profit=514581.84375, total_revenue=1205442.0)
Row(year=2014, month=7, total_profit=2102895.5, total_revenue=9280963.0)
Row(year=2014, month=10, total_profit=23133.58984375, total_revenue=89558.671875)
Row(year=2014, month=11, total_profit=143351.640625, total_revenue=434357.3125)
Row(year=2010, month=4, total_profit=1032559.3125, total_revenue=2585495.25)
Row(year=2010, month=6, total_profit=11423.400390625, total_revenue=44224.19921875)
Row(year=2010, month=8, total_profit=715456.4375, total_revenue=2884921.5)
Row(year=2010, month=10, total_profit=236007.3125, total_revenue=628499.375)
Row(year=2010, month=11, total_profit=1380006.25, total_revenue=3470056.5)
Row(year=2010, month=12, total_profit=959274.25, total_revenue=4159723.5)
Row(year=2012, month=2, total_profit=71738.4609375, total_revenue=217368.4531