# Cassandra Medallion Architecture: Bronze, Silver, and Gold Tables

In [3]:
# Install the Cassandra python driver
!pip install cassandra-driver



In [4]:

# Import necessary libraries
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import pandas as pd
import uuid
import json

# Connect to Cassandra
def connect_to_cassandra():
    cloud_config = {
        'secure_connect_bundle': 'secure-connect-cassandra-assignment-big-data.zip'
    }
    with open("cassandra_assignment_big_data-token.json") as f:
        secrets = json.load(f)

    CLIENT_ID = secrets["clientId"]
    CLIENT_SECRET = secrets["secret"]

    auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
    cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
    session = cluster.connect()
    return session

session = connect_to_cassandra()
print("Connected to Cassandra!")




Connected to Cassandra!


## Bronze Table: Raw Data Ingestion

In [7]:

# Load the dataset
url = "https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv"
df = pd.read_csv(url)
print(df.head())

# Create the Bronze table
session.execute("""
CREATE TABLE IF NOT EXISTS medallionarchitecture.bronze_sales (
    id UUID PRIMARY KEY,
    Region text,
    Country text,
    Item_Type text,
    Sales_Channel text,
    Order_Priority text,
    Order_Date text,
    Order_ID int,
    Units_Sold int,
    Unit_Price float,
    Unit_Cost float,
    Total_Revenue float,
    Total_Cost float,
    Total_Profit float
);
""")

# Insert raw data into the Bronze table
insert_query = session.prepare("""
    INSERT INTO medallionarchitecture.bronze_sales (id, Region, Country, Item_Type, Sales_Channel, Order_Priority, Order_Date, Order_ID, Units_Sold, Unit_Price, Unit_Cost, Total_Revenue, Total_Cost, Total_Profit)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""")

for _, row in df.iterrows():
    session.execute(insert_query, (
        uuid.uuid4(), row['Region'], row['Country'], row['Item Type'], row['Sales Channel'],
        row['Order Priority'], row['Order Date'], row['Order ID'], row['UnitsSold'],
        row['UnitPrice'], row['UnitCost'], row['TotalRevenue'], row['TotalCost'],
        row['TotalProfit']
    ))

print("Bronze table populated with raw data.")


                         Region           Country  Item Type Sales Channel  \
0            Sub-Saharan Africa      South Africa     Fruits       Offline   
1  Middle East and North Africa           Morocco    Clothes        Online   
2         Australia and Oceania  Papua New Guinea       Meat       Offline   
3            Sub-Saharan Africa          Djibouti    Clothes       Offline   
4                        Europe          Slovakia  Beverages       Offline   

  Order Priority  Order Date   Order ID   Ship Date  UnitsSold  UnitPrice  \
0              M   7/27/2012  443368995   7/28/2012       1593       9.33   
1              M   9/14/2013  667593514  10/19/2013       4611     109.28   
2              M   5/15/2015  940995585    6/4/2015        360     421.89   
3              H   5/17/2017  880811536    7/2/2017        562     109.28   
4              L  10/26/2016  174590194   12/4/2016       3973      47.45   

   UnitCost  TotalRevenue  TotalCost  TotalProfit  
0      6.92     

## Silver Table: Data Cleaning and Structuring

In [10]:

# Perform data cleaning
df_cleaned = df.drop_duplicates()
print(df_cleaned.head())

# Create the Silver table
session.execute("""
CREATE TABLE IF NOT EXISTS medallionarchitecture.silver_sales (
    id UUID PRIMARY KEY,
    Region text,
    Country text,
    Item_Type text,
    Sales_Channel text,
    Order_Priority text,
    Order_Date text,
    Order_ID int,
    Units_Sold int,
    Unit_Price float,
    Unit_Cost float,
    Total_Revenue float,
    Total_Cost float,
    Total_Profit float
);
""")

# Insert cleaned data into the Silver table
for _, row in df_cleaned.iterrows():
    session.execute(insert_query, (
        uuid.uuid4(), row['Region'], row['Country'], row['Item Type'], row['Sales Channel'],
        row['Order Priority'], row['Order Date'], row['Order ID'], row['UnitsSold'],
        row['UnitPrice'], row['UnitCost'], row['TotalRevenue'], row['TotalCost'],
        row['TotalProfit']
    ))

print("Silver table populated with cleaned data.")


                         Region           Country  Item Type Sales Channel  \
0            Sub-Saharan Africa      South Africa     Fruits       Offline   
1  Middle East and North Africa           Morocco    Clothes        Online   
2         Australia and Oceania  Papua New Guinea       Meat       Offline   
3            Sub-Saharan Africa          Djibouti    Clothes       Offline   
4                        Europe          Slovakia  Beverages       Offline   

  Order Priority  Order Date   Order ID   Ship Date  UnitsSold  UnitPrice  \
0              M   7/27/2012  443368995   7/28/2012       1593       9.33   
1              M   9/14/2013  667593514  10/19/2013       4611     109.28   
2              M   5/15/2015  940995585    6/4/2015        360     421.89   
3              H   5/17/2017  880811536    7/2/2017        562     109.28   
4              L  10/26/2016  174590194   12/4/2016       3973      47.45   

   UnitCost  TotalRevenue  TotalCost  TotalProfit  
0      6.92     

## Gold Tables: Aggregated Analytics

In [13]:

# Create and populate Gold Table 1: Aggregated by Region
region_df = df_cleaned.groupby('Region').sum()[['TotalRevenue', 'TotalProfit']].reset_index()

session.execute("""
CREATE TABLE IF NOT EXISTS medallionarchitecture.gold_sales_region (
    Region text PRIMARY KEY,
    Total_Revenue float,
    Total_Profit float
);
""")

for _, row in region_df.iterrows():
    session.execute(session.prepare("""
        INSERT INTO medallionarchitecture.gold_sales_region (Region, Total_Revenue, Total_Profit) VALUES (?, ?, ?)
    """), (row['Region'], row['TotalRevenue'], row['TotalProfit']))

print("Gold table 1 populated with regional aggregates.")


Gold table 1 populated with regional aggregates.


In [17]:
# Check the column names
# print(df_cleaned.columns)

# Ensure column names match
if 'Country' in df_cleaned.columns and 'TotalProfit' in df_cleaned.columns and 'TotalRevenue' in df_cleaned.columns:
    # Group by Country, aggregate total revenue and profit, and sort by TotalProfit
    top_countries_df = (
        df_cleaned.groupby('Country')
        .agg({'TotalProfit': 'sum', 'TotalRevenue': 'sum'})
        .sort_values('TotalProfit', ascending=False)
        .head(5)
        .reset_index()
    )
    print(top_countries_df)

    # Create Gold Table 2 in Cassandra
    session.execute("""
    CREATE TABLE IF NOT EXISTS medallionarchitecture.gold_sales_top_countries (
        Country text PRIMARY KEY,
        Total_Revenue float,
        Total_Profit float
    );
    """)

    # Insert data into the Gold Table
    for _, row in top_countries_df.iterrows():
        session.execute(session.prepare("""
            INSERT INTO medallionarchitecture.gold_sales_top_countries (Country, Total_Revenue, Total_Profit)
            VALUES (?, ?, ?)
        """), (row['Country'], row['TotalRevenue'], row['TotalProfit']))

    print("Gold table 2 populated with top 5 profitable countries.")
else:
    print("Column names do not match. Check column names in df_cleaned.")


          Country  TotalProfit  TotalRevenue
0         Algeria   1681149.03    4227286.80
1         Estonia   1653322.48    6666661.52
2  Czech Republic   1592127.59    4003440.40
3       Nicaragua   1568333.69    5944506.05
4        Tanzania   1396869.91    3555764.47
Gold table 2 populated with top 5 profitable countries.


In [24]:
# print(df_cleaned.columns)

# Strip whitespace from column names
df_cleaned.columns = df_cleaned.columns.str.strip()

# Check if 'Order Date' exists
if 'Order Date' in df_cleaned.columns:
    # Convert 'Order Date' to datetime
    df_cleaned['Order Date'] = pd.to_datetime(df_cleaned['Order Date'], errors='coerce')

    # Log and exclude rows with invalid dates
    if df_cleaned['Order Date'].isna().sum() > 0:
        print(f"Warning: Some rows have invalid dates: {df_cleaned[df_cleaned['Order Date'].isna()]}")
        df_cleaned = df_cleaned.dropna(subset=['Order Date'])

    # Add 'Month' column for monthly aggregation
    df_cleaned['Month'] = df_cleaned['Order Date'].dt.strftime('%Y-%m')

    # Ensure 'TotalRevenue' exists and is numeric
    if 'TotalRevenue' in df_cleaned.columns:
        # Group by 'Month' and calculate Total Revenue
        monthly_df = df_cleaned.groupby('Month')[['TotalRevenue']].sum().reset_index()
        print(monthly_df)

        # Create the Gold Table for Monthly Sales Trends
        session.execute("""
        CREATE TABLE IF NOT EXISTS medallionarchitecture.gold_sales_monthly (
            Month text PRIMARY KEY,
            TotalRevenue float
        );
        """)

        # Insert data into the Gold Table
        for _, row in monthly_df.iterrows():
            session.execute(session.prepare("""
                INSERT INTO medallionarchitecture.gold_sales_monthly (Month, TotalRevenue) VALUES (?, ?)
            """), (row['Month'], row['TotalRevenue']))

        print("Gold table 3 populated with monthly sales trends.")
    else:
        print("Error: 'TotalRevenue' column not found in the dataset.")
else:
    print("Error: 'Order Date' column not found in the dataset.")


      Month  TotalRevenue
0   2010-04    2585495.22
1   2010-06      44224.20
2   2010-08    2884921.59
3   2010-10     628499.36
4   2010-11    3470056.40
5   2010-12    4159723.67
6   2011-01     746767.01
7   2011-02    4227286.80
8   2011-03    8331209.41
9   2011-05    3263260.80
10  2011-06    1973257.58
11  2011-07     289426.40
12  2011-08      69946.72
13  2011-09    7082993.73
14  2011-10    4100669.37
15  2011-11    6907926.41
16  2012-02     217368.45
17  2012-06    6552552.08
18  2012-07    3100161.89
19  2012-09     188951.87
20  2012-10    3256076.68
21  2012-11     603225.60
22  2013-01    1234388.61
23  2013-05     868465.40
24  2013-09     933620.29
25  2013-11      53507.55
26  2013-12    4205821.41
27  2014-02    4081224.66
28  2014-03    4003440.40
29  2014-06    1205441.93
30  2014-07    9280963.30
31  2014-10      89558.67
32  2014-11     434357.30
33  2015-02    4493491.09
34  2015-04    2536801.10
35  2015-05     605694.00
36  2015-06     626742.81
37  2015-08 

In [39]:
# Query the Gold Table 3 and inspect rows
rows = session.execute("SELECT * FROM medallionarchitecture.gold_sales_monthly;")

# Access attributes based on inspection
# Use attribute access (row.month) or integer indexing (row[0]) to access row elements
for row in rows:
    # Access columns using their actual case as returned by Cassandra:
    print(f"Month: {row.month}, Total Revenue: {row.totalrevenue}")

Month: 2013-09, Total Revenue: 933620.3125
Month: 2010-12, Total Revenue: 4159723.75
Month: 2012-09, Total Revenue: 188951.875
Month: 2016-09, Total Revenue: 231345.765625
Month: 2016-02, Total Revenue: 3897266.5
Month: 2015-06, Total Revenue: 626742.8125
Month: 2016-11, Total Revenue: 900296.375
Month: 2017-04, Total Revenue: 2196359.25
Month: 2011-01, Total Revenue: 746767.0
Month: 2015-09, Total Revenue: 2101183.25
Month: 2016-08, Total Revenue: 130261.7578125
Month: 2014-11, Total Revenue: 434357.3125
Month: 2013-05, Total Revenue: 868465.375
Month: 2013-12, Total Revenue: 4205821.5
Month: 2011-07, Total Revenue: 289426.40625
Month: 2011-11, Total Revenue: 6907926.5
Month: 2011-08, Total Revenue: 69946.71875
Month: 2015-12, Total Revenue: 2604084.75
Month: 2011-06, Total Revenue: 1973257.625
Month: 2014-03, Total Revenue: 4003440.5
Month: 2015-04, Total Revenue: 2536801.0
Month: 2010-11, Total Revenue: 3470056.5
Month: 2012-07, Total Revenue: 3100162.0
Month: 2012-11, Total Revenue