## Creating Medallian architecture with Python and Cassandra

In [1]:
#required Libraries
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json
import uuid
import pandas as pd
from datetime import datetime

#### 1. Establish Connection to Cassandra

In [10]:
cloud_config= {
  'secure_connect_bundle': 'secure-connect-sales-100.zip'
}

with open("sales_100-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()
session.set_keyspace('lokesh_ks')
if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


## Creating Bronze Layer




In [3]:
session.execute("""
    CREATE TABLE IF NOT EXISTS lokesh_ks.bronze_table (
        Id UUID PRIMARY KEY,
        Region TEXT,
        Country TEXT,
        Item_Type TEXT,
        Sales_Channel TEXT,
        Order_Priority TEXT,
        Order_Date TEXT,
        Order_Id TEXT,
        Ship_Date TEXT,
        Units_Sold INT,
        Unit_Price FLOAT,
        Unit_Cost FLOAT,
        Total_Revenue FLOAT,
        Total_Cost FLOAT,
        Total_Profit FLOAT
    );
    """)

<cassandra.cluster.ResultSet at 0x7a7ab051db50>

In [4]:
sales_data = pd.read_csv('sales_data.csv')
for _, row in sales_data.iterrows():
        session.execute("""
        INSERT INTO lokesh_ks.bronze_table (Id, Region, Country, Item_Type, Sales_Channel, Order_Priority, Order_Date, Order_Id, Ship_Date, Units_Sold, Unit_Price, Unit_Cost, Total_Revenue, Total_Cost, Total_Profit)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
        """, (uuid.uuid4(), row['Region'], row['Country'], row['Item Type'], row['Sales Channel'], row['Order Priority'], row['Order Date'], str(row['Order ID']), row['Ship Date'], row['UnitsSold'], row['UnitPrice'], row['UnitCost'], row['TotalRevenue'], row['TotalCost'], row['TotalProfit']))

In [5]:
data = session.execute("SELECT * FROM lokesh_ks.bronze_table;")
row_count = sum(1 for _ in data)
print(f"Bronze table row count: {row_count}")
rows = session.execute("SELECT * FROM lokesh_ks.bronze_table limit 5")
for row in rows:
    print(row)


Bronze table row count: 198
Row(id=UUID('adea50d1-6eb0-4ef4-8000-cd1084acc5c8'), country='Madagascar', item_type='Clothes', order_date='10/17/2016', order_id='494570004', order_priority='L', region='Sub-Saharan Africa', sales_channel='Online', ship_date='10/26/2016', total_cost=111462.3984375, total_profit=228398.40625, total_revenue=339860.8125, unit_cost=35.84000015258789, unit_price=109.27999877929688, units_sold=3110)
Row(id=UUID('52394bdf-7658-4acc-bfa1-c96e1e2f6810'), country='Belize', item_type='Personal Care', order_date='6/14/2015', order_id='315402734', order_priority='H', region='Central America and the Caribbean', sales_channel='Online', ship_date='8/2/2015', total_cost=200215.109375, total_profit=88536.9765625, total_revenue=288752.09375, unit_cost=56.66999816894531, unit_price=81.7300033569336, units_sold=3533)
Row(id=UUID('a989e8ae-a616-454e-a45e-3869764e66a4'), country='Taiwan', item_type='Fruits', order_date='2/9/2014', order_id='732588374', order_priority='M', region=

## Data Profiling

In [6]:
data = session.execute("SELECT * FROM lokesh_ks.bronze_table;")
df = pd.DataFrame(list(data))
df = df.drop(columns=['id'])

In [7]:
print(df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 198 entries, 0 to 197
Data columns (total 14 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   country         198 non-null    object 
 1   item_type       198 non-null    object 
 2   order_date      198 non-null    object 
 3   order_id        198 non-null    object 
 4   order_priority  198 non-null    object 
 5   region          198 non-null    object 
 6   sales_channel   198 non-null    object 
 7   ship_date       198 non-null    object 
 8   total_cost      198 non-null    float64
 9   total_profit    198 non-null    float64
 10  total_revenue   198 non-null    float64
 11  unit_cost       198 non-null    float64
 12  unit_price      198 non-null    float64
 13  units_sold      198 non-null    int64  
dtypes: float64(5), int64(1), object(8)
memory usage: 21.8+ KB
None


In [8]:
print(df.describe(include='all'))

       country item_type order_date   order_id order_priority  \
count      198       198        198        198            198   
unique      80        12         96         99              4   
top      Ghana    Fruits   9/8/2013  494570004              M   
freq         6        26          4          2             68   
mean       NaN       NaN        NaN        NaN            NaN   
std        NaN       NaN        NaN        NaN            NaN   
min        NaN       NaN        NaN        NaN            NaN   
25%        NaN       NaN        NaN        NaN            NaN   
50%        NaN       NaN        NaN        NaN            NaN   
75%        NaN       NaN        NaN        NaN            NaN   
max        NaN       NaN        NaN        NaN            NaN   

                    region sales_channel   ship_date    total_cost  \
count                  198           198         198  1.980000e+02   
unique                   7             2          98           NaN   
top     S

### Data has no nulls, data types are transformed in silver_table table date column, order id


## Creating Silver Layer

In [9]:
session.execute("""
    CREATE TABLE IF NOT EXISTS lokesh_ks.silver_table (
        id UUID PRIMARY KEY,
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date DATE,
        order_id BIGINT,
        ship_date DATE,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    );
    """)

<cassandra.cluster.ResultSet at 0x7a7ab0513590>

In [11]:
rows = session.execute("SELECT * FROM bronze_table;")
sales= []
for row in rows:
    sales.append(row._asdict())
sales_bronze = pd.DataFrame(sales)

for _, row in sales_bronze.iterrows():
    order_date = datetime.strptime(row['order_date'], "%m/%d/%Y").date()
    ship_date = datetime.strptime(row['ship_date'], "%m/%d/%Y").date()
    order_id = int(row['order_id'])

    session.execute("""
    INSERT INTO silver_table (
        id, region, country, item_type, sales_channel, order_priority, order_date,
        order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit
    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
    """, (
        uuid.uuid4(),
        row['region'],
        row['country'],
        row['item_type'],
        row['sales_channel'],
        row['order_priority'],
        order_date,
        order_id,
        ship_date,
        row['units_sold'],
        row['unit_price'],
        row['unit_cost'],
        row['total_revenue'],
        row['total_cost'],
        row['total_profit']
    ))


In [12]:
silver_rows = session.execute("SELECT * FROM silver_table limit 5")
for row in silver_rows:
    print(row)

Row(id=UUID('4cae57b7-f217-472b-aca8-29c498edb0f9'), country='Thailand', item_type='Meat', order_date=Date(16470), order_id=252889239, order_priority='C', region='Asia', sales_channel='Online', ship_date=Date(16485), total_cost=1605000.75, total_profit=251737.203125, total_revenue=1856737.875, unit_cost=364.69000244140625, unit_price=421.8900146484375, units_sold=4401)
Row(id=UUID('3185e8d1-ee21-4d30-b830-558434d32987'), country='Nicaragua', item_type='Baby Food', order_date=Date(17116), order_id=343752610, order_priority='L', region='Central America and the Caribbean', sales_channel='Online', ship_date=Date(17121), total_cost=460883.21875, total_profit=277131.25, total_revenue=738014.5, unit_cost=159.4199981689453, unit_price=255.27999877929688, units_sold=2891)
Row(id=UUID('27e2be60-29cb-4dbe-8fa2-8bf4fa5519c0'), country='Sweden', item_type='Beverages', order_date=Date(15596), order_id=265081918, order_priority='H', region='Europe', sales_channel='Online', ship_date=Date(15625), tota

## Creating Gold Layer

#### Gold 1 Total Profit by Sales Channel

In [18]:
session.execute("""
    CREATE TABLE IF NOT EXISTS lokesh_ks.gold_total_profit_by_sales_channel (
        sales_channel TEXT PRIMARY KEY,
        total_profit FLOAT
    );
""")
sales_channel_profits = {}
for row in session.execute("SELECT sales_channel, total_profit FROM silver_table"):
    sales_channel = row.sales_channel
    profit = row.total_profit
    sales_channel_profits[sales_channel] = sales_channel_profits.get(sales_channel, 0) + profit

for sales_channel, total_profit in sales_channel_profits.items():
    session.execute("""
        INSERT INTO gold_total_profit_by_sales_channel (sales_channel, total_profit)
        VALUES (%s, %s);
    """, (sales_channel, total_profit))

In [20]:
gold1_rows = session.execute("SELECT * FROM gold_total_profit_by_sales_channel limit 5")
for row in gold1_rows:
    print(row)

Row(sales_channel='Online', total_profit=49927612.0)
Row(sales_channel='Offline', total_profit=32726772.0)


#### Gold 2 Average Unit Price by Item Type and Region

In [22]:
session.execute("""
    CREATE TABLE IF NOT EXISTS lokesh_ks.gold_avg_unit_price_by_item_region (
        item_type TEXT,
        region TEXT,
        avg_unit_price FLOAT,
        PRIMARY KEY (item_type, region)
    );
""")
rows = session.execute("""
    SELECT unit_price, item_type, region
    FROM silver_table;
""")

unit_prices = {}
for row in rows:
    key = (row.item_type, row.region)
    if key not in unit_prices:
        unit_prices[key] = []
    unit_prices[key].append(row.unit_price)

for (item_type, region), prices in unit_prices.items():
    avg_unit_price = sum(prices) / len(prices)
    session.execute("""
        INSERT INTO gold_avg_unit_price_by_item_region (item_type, region, avg_unit_price)
        VALUES (%s, %s, %s);
    """, (item_type, region, avg_unit_price))

In [23]:
gold2_rows = session.execute("SELECT * FROM gold_avg_unit_price_by_item_region limit 5")
for row in gold2_rows:
    print(row)

Row(item_type='Household', region='Asia', avg_unit_price=668.27001953125)
Row(item_type='Household', region='Australia and Oceania', avg_unit_price=668.27001953125)
Row(item_type='Household', region='Central America and the Caribbean', avg_unit_price=668.27001953125)
Row(item_type='Household', region='Europe', avg_unit_price=668.27001953125)
Row(item_type='Household', region='Middle East and North Africa', avg_unit_price=668.27001953125)


#### Gold 3 Total Units Sold by Order Priority and Country

In [30]:
session.execute("""
    CREATE TABLE IF NOT EXISTS lokesh_ks.gold_total_units_sold_by_priority_country (
        order_priority TEXT,
        country TEXT,
        total_units_sold INT,
        PRIMARY KEY (order_priority, country)
    );
""")

rows = session.execute("""
    SELECT order_priority, country, units_sold
    FROM silver_table;
""")

units_sold = {}
for row in rows:
    key = (row.order_priority, row.country)
    if key not in units_sold:
        units_sold[key] = 0
    units_sold[key] += row.units_sold

for (order_priority, country), total_units in units_sold.items():
    session.execute("""
        INSERT INTO gold_total_units_sold_by_priority_country (order_priority, country, total_units_sold)
        VALUES (%s, %s, %s);
    """, (order_priority, country, total_units))

gold3_rows = session.execute("SELECT * FROM gold_total_units_sold_by_priority_country limit 5")
for row in gold3_rows:
  print(row)


Row(order_priority='C', country='Antigua and Barbuda ', total_units_sold=12594)
Row(order_priority='C', country='Dominica', total_units_sold=14088)
Row(order_priority='C', country='Egypt', total_units_sold=2384)
Row(order_priority='C', country='Finland', total_units_sold=1396)
Row(order_priority='C', country='Haiti', total_units_sold=4104)
