In [58]:
!pip install pandas

!pip install cassandra-driver



In [59]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

# Secure Connect Bundle Path
cloud_config= {
  'secure_connect_bundle': '/content/drive/MyDrive/Colab Notebooks/secure-connect-vinay-db.zip'
}

# Token for the database
with open("/content/drive/MyDrive/Colab Notebooks/vinay_db-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()
if session:
  print('Connected!')
else:
  print("An error occurred.")





Connected!


In [60]:
# Use the keyspace
session.set_keyspace('medallion_architecture_sales')
print(f"Connected to keyspace: {session.keyspace}")

Connected to keyspace: medallion_architecture_sales


In [62]:
# Create Bronze Level Table
session.execute("""
    CREATE TABLE IF NOT EXISTS bronze_sales (
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date DATE,
        order_id UUID PRIMARY KEY,
        ship_date DATE,
        units_sold INT,
        unit_price DECIMAL,
        unit_cost DECIMAL,
        total_revenue DECIMAL,
        total_cost DECIMAL,
        total_profit DECIMAL
    );
""")
print("Bronze Level Table created successfully.")

# Create Silver Level Table
session.execute("""
    CREATE TABLE IF NOT EXISTS silver_sales (
        order_id UUID PRIMARY KEY,
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        units_sold INT,
        unit_price DECIMAL,
        unit_cost DECIMAL,
        total_revenue DECIMAL,
        total_cost DECIMAL,
        total_profit DECIMAL
    );
""")
print("Silver Level Table created successfully.")

# Create Gold Level Table 1: Sales by Region
session.execute("""
    CREATE TABLE IF NOT EXISTS gold_sales_by_region (
        region TEXT PRIMARY KEY,
        total_sales DECIMAL
    );
""")
print("Gold Level Table (Sales by Region) created successfully.")

# Create Gold Level Table 2: Sales by Item Type
session.execute("""
    CREATE TABLE IF NOT EXISTS gold_sales_by_item_type (
        item_type TEXT PRIMARY KEY,
        total_sales DECIMAL
    );
""")
print("Gold Level Table (Sales by Item Type) created successfully.")

# Create Gold Level Table 3: Sales by Channel
session.execute("""
    CREATE TABLE IF NOT EXISTS gold_sales_by_channel (
        sales_channel TEXT PRIMARY KEY,
        total_sales DECIMAL
    );
""")
print("Gold Level Table (Sales by Channel) created successfully.")

Bronze Level Table created successfully.
Silver Level Table created successfully.
Gold Level Table (Sales by Region) created successfully.
Gold Level Table (Sales by Item Type) created successfully.
Gold Level Table (Sales by Channel) created successfully.


In [66]:
#Checking if tables are created
tables = session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name = 'medallion_architecture_sales'")
print(f"Existing tables in the keyspace: {session.keyspace}")
for row in tables:
    print(row.table_name)

Existing tables in the keyspace: medallion_architecture_sales
bronze_sales
gold_sales_by_channel
gold_sales_by_item_type
gold_sales_by_region
silver_sales


In [None]:
import pandas as pd
import uuid

csv_file_path = '/content/drive/MyDrive/Colab Notebooks/sales_100.csv' 
df = pd.read_csv(csv_file_path)

df['Order Date'] = pd.to_datetime(df['Order Date'], format='%m/%d/%Y').dt.strftime('%Y-%m-%d')
df['Ship Date'] = pd.to_datetime(df['Ship Date'], format='%m/%d/%Y').dt.strftime('%Y-%m-%d')

# Insert data into the Bronze table
for _, row in df.iterrows():
    session.execute("""
        INSERT INTO bronze_sales (region, country, item_type, sales_channel, order_priority,
                                  order_date, order_id, ship_date, units_sold, unit_price,
                                  unit_cost, total_revenue, total_cost, total_profit)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row['Region'], row['Country'], row['Item Type'], row['Sales Channel'],
        row['Order Priority'], row['Order Date'], uuid.uuid4(),  # Generate UUID for each order
        row['Ship Date'], row['UnitsSold'], row['UnitPrice'],
        row['UnitCost'], row['TotalRevenue'], row['TotalCost'], row['TotalProfit']
    ))

print("Data inserted into bronze_sales.")

rows = session.execute("SELECT * FROM bronze_sales LIMIT 10;")
for row in rows:
    print(row)


Data inserted into bronze_sales.
Row(order_id=UUID('e55f107c-c91c-4446-b649-b4c205ba7774'), country='Ghana', item_type='Clothes', order_date=Date(15280), order_priority='M', region='Sub-Saharan Africa', sales_channel='Online', ship_date=Date(15309), total_cost=Decimal('137482.24'), total_profit=Decimal('281715.84'), total_revenue=Decimal('419198.08'), unit_cost=Decimal('35.84'), unit_price=Decimal('109.28'), units_sold=3836)
Row(order_id=UUID('c276cc42-7792-4194-912e-780d5b094440'), country='Dominican Republic', item_type='Baby Food', order_date=Date(15211), order_priority='H', region='Central America and the Caribbean', sales_channel='Offline', ship_date=Date(15241), total_cost=Decimal('43681.08'), total_profit=Decimal('26265.64'), total_revenue=Decimal('69946.72'), unit_cost=Decimal('159.42'), unit_price=Decimal('255.28'), units_sold=274)
Row(order_id=UUID('1a6811a3-9c2d-4e09-be4d-b7e1bb201caa'), country='Togo', item_type='Cosmetics', order_date=Date(16686), order_priority='M', regio

In [None]:
rows_bronze = session.execute("SELECT * FROM bronze_sales;")
bronze_data = list(rows_bronze)

columns = ['region', 'country', 'item_type', 'sales_channel', 'order_priority',
           'order_date', 'order_id', 'ship_date', 'units_sold', 'unit_price',
           'unit_cost', 'total_revenue', 'total_cost', 'total_profit']

df_bronze = pd.DataFrame([{col: getattr(row, col, None) for col in columns} for row in bronze_data])

df_cleaned = df_bronze.dropna(subset=['region', 'country', 'item_type', 'sales_channel',
                                      'order_date', 'ship_date', 'units_sold',
                                      'unit_price', 'unit_cost', 'total_revenue',
                                      'total_cost', 'total_profit'])

print(f"Rows before cleaning: {len(df_bronze)}")
print(f"Rows after cleaning: {len(df_cleaned)}")

for _, row in df_cleaned.iterrows():
    session.execute("""
        INSERT INTO silver_sales (order_id, region, country, item_type, sales_channel, units_sold, unit_price,
                                  unit_cost, total_revenue, total_cost, total_profit)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row['order_id'],  
        row['region'], row['country'], row['item_type'], row['sales_channel'],
        row['units_sold'], row['unit_price'], row['unit_cost'], row['total_revenue'],
        row['total_cost'], row['total_profit']
    ))

rows = session.execute("SELECT * FROM silver_sales LIMIT 10;")
for row in rows:
    print(row)


Rows before cleaning: 99
Rows after cleaning: 99
Row(order_id=UUID('e55f107c-c91c-4446-b649-b4c205ba7774'), country='Ghana', item_type='Clothes', region='Sub-Saharan Africa', sales_channel='Online', total_cost=Decimal('137482.24'), total_profit=Decimal('281715.84'), total_revenue=Decimal('419198.08'), unit_cost=Decimal('35.84'), unit_price=Decimal('109.28'), units_sold=3836)
Row(order_id=UUID('c276cc42-7792-4194-912e-780d5b094440'), country='Dominican Republic', item_type='Baby Food', region='Central America and the Caribbean', sales_channel='Offline', total_cost=Decimal('43681.08'), total_profit=Decimal('26265.64'), total_revenue=Decimal('69946.72'), unit_cost=Decimal('159.42'), unit_price=Decimal('255.28'), units_sold=274)
Row(order_id=UUID('1a6811a3-9c2d-4e09-be4d-b7e1bb201caa'), country='Togo', item_type='Cosmetics', region='Sub-Saharan Africa', sales_channel='Online', total_cost=Decimal('1265563.98'), total_profit=Decimal('835619.22'), total_revenue=Decimal('2101183.2'), unit_cost

In [71]:
rows_silver_sales = session.execute("SELECT * FROM silver_sales;")

region_sales = {}
item_type_sales = {}
channel_sales = {}

for row in rows_silver_sales:
    # Aggregate by Region
    if row.region in region_sales:
        region_sales[row.region] += row.total_revenue
    else:
        region_sales[row.region] = row.total_revenue

    # Aggregate by Item Type
    if row.item_type in item_type_sales:
        item_type_sales[row.item_type] += row.total_revenue
    else:
        item_type_sales[row.item_type] = row.total_revenue

    # Aggregate by Sales Channel
    if row.sales_channel in channel_sales:
        channel_sales[row.sales_channel] += row.total_revenue
    else:
        channel_sales[row.sales_channel] = row.total_revenue

# Insert aggregated data into Gold Level Table 1: Sales by Region
for region, total_sales in region_sales.items():
    session.execute("""
        INSERT INTO gold_sales_by_region (region, total_sales)
        VALUES (%s, %s)
    """, (region, total_sales))

print("Data inserted into gold_sales_by_region.")

# Insert aggregated data into Gold Level Table 2: Sales by Item Type
for item_type, total_sales in item_type_sales.items():
    session.execute("""
        INSERT INTO gold_sales_by_item_type (item_type, total_sales)
        VALUES (%s, %s)
    """, (item_type, total_sales))

print("Data inserted into gold_sales_by_item_type.")

# Insert aggregated data into Gold Level Table 3: Sales by Channel
for sales_channel, total_sales in channel_sales.items():
    session.execute("""
        INSERT INTO gold_sales_by_channel (sales_channel, total_sales)
        VALUES (%s, %s)
    """, (sales_channel, total_sales))

print("Data inserted into gold_sales_by_channel.")


Data inserted into gold_sales_by_region.
Data inserted into gold_sales_by_item_type.
Data inserted into gold_sales_by_channel.


In [72]:
# Querying the table gold_sales_by_region table to verify
rows = session.execute("SELECT * FROM gold_sales_by_region;")
for row in rows:
    print(row)

Row(region='Australia and Oceania', total_sales=Decimal('10711258.13'))
Row(region='Europe', total_sales=Decimal('34964749.83'))
Row(region='Middle East and North Africa', total_sales=Decimal('24765127.25'))
Row(region='Central America and the Caribbean', total_sales=Decimal('17570835.42'))
Row(region='Asia', total_sales=Decimal('28840812.19'))
Row(region='Sub-Saharan Africa', total_sales=Decimal('24225437.42'))
Row(region='North America', total_sales=Decimal('3611757.52'))


In [73]:
# Querying the table gold_sales_by_item_type table to verify
rows = session.execute("SELECT * FROM gold_sales_by_item_type;")
for row in rows:
    print(row)


Row(item_type='Household', total_sales=Decimal('38519082.8'))
Row(item_type='Office Supplies', total_sales=Decimal('27880904.94'))
Row(item_type='Vegetables', total_sales=Decimal('1135114.08'))
Row(item_type='Snacks', total_sales=Decimal('2193642.66'))
Row(item_type='Personal Care', total_sales=Decimal('3191147.85'))
Row(item_type='Meat', total_sales=Decimal('21278865.93'))
Row(item_type='Fruits', total_sales=Decimal('615033.6'))
Row(item_type='Beverages', total_sales=Decimal('2145024.7'))
Row(item_type='Cereal', total_sales=Decimal('9416123.2'))
Row(item_type='Cosmetics', total_sales=Decimal('28727100.4'))
Row(item_type='Baby Food', total_sales=Decimal('5200564.16'))
Row(item_type='Clothes', total_sales=Decimal('4387373.44'))


In [74]:
# Querying the table gold_sales_by_channel table to verify
rows = session.execute("SELECT * FROM gold_sales_by_channel;")
for row in rows:
    print(row)

Row(sales_channel='Online', total_sales=Decimal('84628184.37'))
Row(sales_channel='Offline', total_sales=Decimal('60061793.39'))
