In [1]:
# Install the Cassandra python driver
!pip install cassandra-driver

Collecting cassandra-driver
  Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.2 kB)
Collecting geomet<0.3,>=0.1 (from cassandra-driver)
  Downloading geomet-0.2.1.post1-py3-none-any.whl.metadata (1.0 kB)
Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.29.2 geomet-0.2.1.post1


In [3]:
# Import the necessary libraries
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

In [5]:
# This secure connect bundle is autogenerated when you download your SCB,
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': 'secure-connect-sales-data.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("kandul42@students.rowan.edu-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


In [6]:
import requests
import pandas as pd
from cassandra.cluster import Cluster

# Download the CSV file
url = "https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv"
csv_file = "sales_100.csv"
response = requests.get(url)

# Save the file locally
with open(csv_file, 'wb') as file:
    file.write(response.content)

# Read the CSV file
df = pd.read_csv(csv_file)
print(df.head())  # Display the first few rows

                         Region           Country  Item Type Sales Channel  \
0            Sub-Saharan Africa      South Africa     Fruits       Offline   
1  Middle East and North Africa           Morocco    Clothes        Online   
2         Australia and Oceania  Papua New Guinea       Meat       Offline   
3            Sub-Saharan Africa          Djibouti    Clothes       Offline   
4                        Europe          Slovakia  Beverages       Offline   

  Order Priority  Order Date   Order ID   Ship Date  UnitsSold  UnitPrice  \
0              M   7/27/2012  443368995   7/28/2012       1593       9.33   
1              M   9/14/2013  667593514  10/19/2013       4611     109.28   
2              M   5/15/2015  940995585    6/4/2015        360     421.89   
3              H   5/17/2017  880811536    7/2/2017        562     109.28   
4              L  10/26/2016  174590194   12/4/2016       3973      47.45   

   UnitCost  TotalRevenue  TotalCost  TotalProfit  
0      6.92     

In [46]:
session.set_keyspace('bronze')

session.execute("""
    CREATE TABLE IF NOT EXISTS sales_raw (
        order_id UUID PRIMARY KEY,
        date TEXT,
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        ship_date TEXT,
        units_sold INT,
        unit_price FLOAT,
        total_revenue FLOAT
    )
""")
print("Bronze table created.")


Bronze table created.


In [47]:
# Normalize column names
df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_')

# Insert raw data into Bronze Table
for _, row in df.iterrows():
    session.execute("""
        INSERT INTO sales_raw (
            order_id, date, region, country, item_type, sales_channel,
            order_priority, ship_date, units_sold, unit_price, total_revenue
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        uuid4(),
        row['order_date'],
        row['region'],
        row['country'],
        row['item_type'],
        row['sales_channel'],
        row['order_priority'],
        row['ship_date'],
        int(row['unitssold']),
        float(row['unitprice']),
        float(row['totalrevenue'])
    ))

print("Data inserted into Bronze table.")
print(df.columns)

Data inserted into Bronze table.
Index(['region', 'country', 'item_type', 'sales_channel', 'order_priority',
       'order_date', 'order_id', 'ship_date', 'unitssold', 'unitprice',
       'unitcost', 'totalrevenue', 'totalcost', 'totalprofit'],
      dtype='object')


In [48]:
# Create the Silver keyspace and table

session.set_keyspace('silver')

session.execute("""
    CREATE TABLE IF NOT EXISTS sales_cleaned (
        order_id UUID PRIMARY KEY,
        date TIMESTAMP,
        region TEXT,
        country TEXT,
        item_type TEXT,
        units_sold INT,
        unit_price FLOAT,
        total_revenue FLOAT
    )
""")
print("Silver table created.")


Silver table created.


In [49]:
# Fetch raw data from Bronze table
rows = session.execute("SELECT * FROM bronze.sales_raw")

# Clean and insert data into Silver table
for row in rows:
    transaction_date = datetime.strptime(row.date, '%m/%d/%Y')

    session.execute("""
        INSERT INTO sales_cleaned (
            order_id, date, region, country, item_type,
            units_sold, unit_price, total_revenue
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row.order_id,
        transaction_date,
        row.region,
        row.country,
        row.item_type,
        row.units_sold,
        row.unit_price,
        row.total_revenue
    ))

print("Data moved from Bronze to Silver.")


Data moved from Bronze to Silver.


In [53]:
session.set_keyspace('gold')

# Gold Table 1: Total Sales by Region
session.execute("""
    CREATE TABLE IF NOT EXISTS gold_sales_by_region (
        region TEXT PRIMARY KEY,
        total_revenue DECIMAL
    )
""")
print("Gold table 'gold_sales_by_region' created.")

# Gold Table 2: Total Sales by Product
session.execute("""
    CREATE TABLE IF NOT EXISTS gold_sales_by_product (
        item_type TEXT PRIMARY KEY,
        total_revenue DECIMAL
    )
""")
print("Gold table 'gold_sales_by_product' created.")

# Gold Table 3: Daily Sales Summary
session.execute("""
    CREATE TABLE IF NOT EXISTS gold_daily_sales (
        date DATE PRIMARY KEY,
        total_revenue DECIMAL
    )
""")
print("Gold table 'gold_daily_sales' created.")





Gold table 'gold_sales_by_region' created.
Gold table 'gold_sales_by_product' created.
Gold table 'gold_daily_sales' created.
Gold tables created.


In [54]:
from collections import defaultdict

# Query data from Silver table
query = "SELECT region, total_revenue FROM silver.sales_cleaned"
rows = session.execute(query)

# Aggregate total sales by region
region_sales = defaultdict(float)
for row in rows:
    region_sales[row.region] += float(row.total_revenue)

# Insert aggregated data into gold_sales_by_region
for region, total_revenue in region_sales.items():
    session.execute("""
        INSERT INTO gold_sales_by_region (region, total_revenue)
        VALUES (%s, %s)
    """, (region, total_revenue))

print("Data inserted into 'gold_sales_by_region'.")


Data inserted into 'gold_sales_by_region'.


In [55]:
# Query data from Silver table
query = "SELECT item_type, total_revenue FROM silver.sales_cleaned"
rows = session.execute(query)

# Aggregate total sales by product
product_sales = defaultdict(float)
for row in rows:
    product_sales[row.item_type] += float(row.total_revenue)

# Insert aggregated data into gold_sales_by_product
for item_type, total_revenue in product_sales.items():
    session.execute("""
        INSERT INTO gold_sales_by_product (item_type, total_revenue)
        VALUES (%s, %s)
    """, (item_type, total_revenue))

print("Data inserted into 'gold_sales_by_product'.")


Data inserted into 'gold_sales_by_product'.


In [57]:
# Query data from Silver table
query = "SELECT date, total_revenue FROM silver.sales_cleaned"
rows = session.execute(query)

# Aggregate total sales by date
daily_sales = defaultdict(float)
for row in rows:
    # Convert the date to a string in the format 'YYYY-MM-DD'
    date_str = row.date.strftime('%Y-%m-%d')
    daily_sales[date_str] += float(row.total_revenue)

# Insert aggregated data into gold_daily_sales
for date, total_revenue in daily_sales.items():
    session.execute("""
        INSERT INTO gold_daily_sales (date, total_revenue)
        VALUES (%s, %s)
    """, (date, total_revenue))

print("Data inserted into 'gold_daily_sales'.")

Data inserted into 'gold_daily_sales'.


In [60]:
# Gold Table 1: Daily Sales Summary
result_daily_sales = session.execute("SELECT * FROM gold_daily_sales")
print("Gold Table: Daily Sales Summary")
for row in result_daily_sales:
    print(row)

Gold Table: Daily Sales Summary
Row(date=Date(16267), total_revenue=Decimal('16826370.0'))
Row(date=Date(15042), total_revenue=Decimal('502922.53125'))
Row(date=Date(15162), total_revenue=Decimal('868279.21875'))
Row(date=Date(17303), total_revenue=Decimal('184246.078125'))
Row(date=Date(16781), total_revenue=Decimal('100232.19140625'))
Row(date=Date(16769), total_revenue=Decimal('1929054.5625'))
Row(date=Date(14710), total_revenue=Decimal('5798888.625'))
Row(date=Date(15293), total_revenue=Decimal('4369068.0'))
Row(date=Date(15962), total_revenue=Decimal('1511670.28125'))
Row(date=Date(16151), total_revenue=Decimal('12010321.5'))
Row(date=Date(16110), total_revenue=Decimal('224871.65625'))
Row(date=Date(15218), total_revenue=Decimal('19999984.5'))
Row(date=Date(15389), total_revenue=Decimal('652105.359375'))
Row(date=Date(17135), total_revenue=Decimal('210108.609375'))
Row(date=Date(17048), total_revenue=Decimal('694037.296875'))
Row(date=Date(16240), total_revenue=Decimal('110580.691

In [58]:
# Gold Table 2: Total Sales by Region
result_region = session.execute("SELECT * FROM gold_sales_by_region")
print("Gold Table: Total Sales by Region")
for row in result_region:
    print(row)

Gold Table: Total Sales by Region
Row(region='Australia and Oceania', total_revenue=Decimal('32133774.6796875'))
Row(region='Europe', total_revenue=Decimal('104894249.68359375'))
Row(region='Middle East and North Africa', total_revenue=Decimal('74295382.3828125'))
Row(region='Central America and the Caribbean', total_revenue=Decimal('52712505.9140625'))
Row(region='Asia', total_revenue=Decimal('86522435.81835938'))
Row(region='Sub-Saharan Africa', total_revenue=Decimal('72676312.98339844'))
Row(region='North America', total_revenue=Decimal('10835272.40625'))


In [59]:
# Gold Table 3: Total Sales by Product
result_product = session.execute("SELECT * FROM gold_sales_by_product")
print("Gold Table: Total Sales by Product")
for row in result_product:
    print(row)

Gold Table: Total Sales by Product
Row(item_type='Household', total_revenue=Decimal('115557248.34375'))
Row(item_type='Office Supplies', total_revenue=Decimal('83642714.4375'))
Row(item_type='Vegetables', total_revenue=Decimal('3405342.2109375'))
Row(item_type='Snacks', total_revenue=Decimal('6580928.15625'))
Row(item_type='Personal Care', total_revenue=Decimal('9573443.47265625'))
Row(item_type='Meat', total_revenue=Decimal('63836598.09375'))
Row(item_type='Fruits', total_revenue=Decimal('1845100.8076171875'))
Row(item_type='Beverages', total_revenue=Decimal('6435074.044921875'))
Row(item_type='Cereal', total_revenue=Decimal('28248369.33984375'))
Row(item_type='Cosmetics', total_revenue=Decimal('86181301.875'))
Row(item_type='Baby Food', total_revenue=Decimal('15601692.5625'))
Row(item_type='Clothes', total_revenue=Decimal('13162120.5234375'))
