In [1]:
# Install the Cassandra python driver
!pip install cassandra-driver

Collecting cassandra-driver
  Downloading cassandra_driver-3.29.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.2 kB)
Collecting geomet<0.3,>=0.1 (from cassandra-driver)
  Downloading geomet-0.2.1.post1-py3-none-any.whl.metadata (1.0 kB)
Downloading cassandra_driver-3.29.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m28.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.29.2 geomet-0.2.1.post1


In [2]:
# Import the necessary libraries
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

In [3]:
# This secure connect bundle is autogenerated when you download your SCB,
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': 'secure-connect-cassandra-assignment.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("Cassandra_Assignment-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


### Creating Bronze Table

In [5]:
def create_tables(session):
    # Bronze Table
    session.execute("""
    CREATE TABLE IF NOT EXISTS eluri.bronze_sales (
        id UUID PRIMARY KEY,
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date TEXT,
        order_id TEXT,
        ship_date TEXT,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    );
    """)



    print("Tables created successfully.")

create_tables(session)

Tables created successfully.


In [6]:
# Load the CSV file
import uuid
import pandas as pd

file_path = 'sales_100.csv'
data = pd.read_csv(file_path)

def load_bronze_table(session, data):
    for _, row in data.iterrows():
        session.execute("""
        INSERT INTO eluri.bronze_sales (id, region, country, item_type, sales_channel, order_priority, order_date, order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
        """, (uuid.uuid4(), row['Region'], row['Country'], row['Item Type'], row['Sales Channel'], row['Order Priority'], row['Order Date'], str(row['Order ID']), row['Ship Date'], row['UnitsSold'], row['UnitPrice'], row['UnitCost'], row['TotalRevenue'], row['TotalCost'], row['TotalProfit']))
    print("Bronze table loaded successfully.")

load_bronze_table(session, data)


Bronze table loaded successfully.


In [7]:
# Select the data from the users table
session = cluster.connect()
rowo = session.execute("SELECT * FROM eluri.bronze_sales;")

# Count the number of rows
row_count = sum(1 for _ in rowo)
print(f"Total Rows in eluri.bronze_sales: {row_count}")

rows = session.execute("SELECT * FROM eluri.bronze_sales")
for row in rows:
    print(row)

# Close the session
session.shutdown()

Total Rows in eluri.bronze_sales: 99
Row(id=UUID('eed9b386-ff7e-498e-b30e-3c7248862e62'), country='Egypt', item_type='Clothes', order_date='8/9/2016', order_id='882908809', order_priority='C', region='Middle East and North Africa', sales_channel='Online', ship_date='8/24/2016', total_cost=42721.28125, total_profit=87540.4765625, total_revenue=130261.7578125, unit_cost=35.84000015258789, unit_price=109.27999877929688, units_sold=1192)
Row(id=UUID('08a2350b-be1f-4f56-b36e-4fbe02b02809'), country='Dominica', item_type='Household', order_date='11/19/2011', order_id='274930989', order_priority='C', region='Central America and the Caribbean', sales_channel='Offline', ship_date='12/13/2011', total_cost=3539891.75, total_profit=1167402.125, total_revenue=4707294.0, unit_cost=502.5400085449219, unit_price=668.27001953125, units_sold=7044)
Row(id=UUID('e7d0397f-f4c4-45a0-bac7-9f77b4dea110'), country='United States of America', item_type='Personal Care', order_date='1/21/2016', order_id='19077786

In [9]:
session = cluster.connect()
session.set_keyspace('eluri')  # Replace 'example' with your keyspace

# Query data from Bronze table
rows = session.execute("SELECT * FROM bronze_sales;")

# Convert the data to a Pandas DataFrame
data = []
for row in rows:
    data.append(row._asdict())  # Convert Cassandra rows to dictionary
df = pd.DataFrame(data)

# Display the first few rows for verification
print("Bronze Data:\n", df.head())

Bronze Data:
                                      id                   country  \
0  eed9b386-ff7e-498e-b30e-3c7248862e62                     Egypt   
1  08a2350b-be1f-4f56-b36e-4fbe02b02809                  Dominica   
2  e7d0397f-f4c4-45a0-bac7-9f77b4dea110  United States of America   
3  b5b5c2d2-76b5-466c-9ed0-43bd7504a89e                  Djibouti   
4  98aed96c-cc1b-407f-b540-edfd9f58c13e                   Romania   

       item_type  order_date   order_id order_priority  \
0        Clothes    8/9/2016  882908809              C   
1      Household  11/19/2011  274930989              C   
2  Personal Care   1/21/2016  190777862              H   
3        Clothes   5/17/2017  880811536              H   
4      Beverages   2/19/2012  756839835              M   

                              region sales_channel   ship_date    total_cost  \
0       Middle East and North Africa        Online   8/24/2016  4.272128e+04   
1  Central America and the Caribbean       Offline  12/13/2011

In [10]:
# Check for null values
print("\nNull Value Counts:")
print(df.isnull().sum())

# Check for zero values in numeric columns
numeric_cols = ['units_sold', 'unit_price', 'unit_cost', 'total_revenue', 'total_cost', 'total_profit']
zero_counts = (df[numeric_cols] == 0).sum()
print("\nZero Value Counts in Numeric Columns:")
print(zero_counts)

# Descriptive statistics
print("\nDescriptive Statistics:")
print(df.describe())


Null Value Counts:
id                0
country           0
item_type         0
order_date        0
order_id          0
order_priority    0
region            0
sales_channel     0
ship_date         0
total_cost        0
total_profit      0
total_revenue     0
unit_cost         0
unit_price        0
units_sold        0
dtype: int64

Zero Value Counts in Numeric Columns:
units_sold       0
unit_price       0
unit_cost        0
total_revenue    0
total_cost       0
total_profit     0
dtype: int64

Descriptive Statistics:
         total_cost  total_profit  total_revenue   unit_cost  unit_price  \
count  9.900000e+01  9.900000e+01   9.900000e+01   99.000000   99.000000   
mean   1.044069e+06  4.174464e+05   1.461515e+06  191.126769  271.181016   
std    1.337149e+06  4.503887e+05   1.724090e+06  185.365590  233.458916   
min    9.155160e+03  3.188430e+03   1.234359e+04    6.920000    9.330000   
25%    9.318011e+04  5.877198e+04   1.597606e+05   35.840000   81.730003   
50%    3.924920e+05 

### Creating Silver table

In [11]:
def create_tables(session):
    session.execute("""
    CREATE TABLE IF NOT EXISTS eluri.silver_sales (
        id UUID PRIMARY KEY,
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date DATE,
        order_id BIGINT,
        ship_date DATE,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    );
    """)



    print("Tables created successfully.")

create_tables(session)

Tables created successfully.


In [12]:
import uuid
from datetime import datetime

# Convert string to date (YYYY-MM-DD format)
def convert_to_date(date_string):
    try:
        return datetime.strptime(date_string, "%m/%d/%Y").date()  # Assuming format is MM/DD/YYYY
    except ValueError:
        return None  # Return None if the date format is incorrect

# Convert order_id to BIGINT (long integer)
def convert_to_bigint(order_id_string):
    try:
        return int(order_id_string)
    except ValueError:
        return None  # Return None if order_id is invalid

# Insert cleaned data with converted order_id and dates into Silver table
for _, row in df.iterrows():
    order_date = convert_to_date(row['order_date'])
    ship_date = convert_to_date(row['ship_date'])
    order_id = convert_to_bigint(row['order_id'])  # Convert order_id to BIGINT

    session.execute("""
    INSERT INTO eluri.silver_sales (
        id, region, country, item_type, sales_channel, order_priority, order_date,
        order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit
    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
    """, (
        uuid.uuid4(),
        row['region'],
        row['country'],
        row['item_type'],
        row['sales_channel'],
        row['order_priority'],
        order_date,  # Store the converted date
        order_id,  # Store the converted BIGINT order_id
        ship_date,  # Store the converted date
        row['units_sold'],
        row['unit_price'],
        row['unit_cost'],
        row['total_revenue'],
        row['total_cost'],
        row['total_profit']
    ))

print("Data loaded to silver table")

Data loaded to silver table


In [13]:
rows = session.execute("SELECT * FROM eluri.silver_sales")
for row in rows:
    print(row)

Row(id=UUID('77c9dc38-525e-4e3a-9026-7f5185b83f19'), country='Uganda', item_type='Cosmetics', order_date=Date(16494), order_id=842238795, order_priority='M', region='Sub-Saharan Africa', sales_channel='Online', ship_date=Date(16509), total_cost=1588143.25, total_profit=1048610.0, total_revenue=2636753.25, unit_cost=263.3299865722656, unit_price=437.20001220703125, units_sold=6031)
Row(id=UUID('1940198d-04cc-4453-a046-f86ea08f6c5d'), country='Indonesia', item_type='Personal Care', order_date=Date(15635), order_id=678896129, order_priority='L', region='Asia', sales_channel='Offline', ship_date=Date(15644), total_cost=527371.0, total_profit=233208.359375, total_revenue=760579.375, unit_cost=56.66999816894531, unit_price=81.7300033569336, units_sold=9306)
Row(id=UUID('ab582ad3-071d-46ec-b7ba-947fcfcb81ad'), country='Oman', item_type='Cosmetics', order_date=Date(14942), order_id=358570849, order_priority='H', region='Middle East and North Africa', sales_channel='Online', ship_date=Date(1497

**Gold** **Tables**

In [14]:
session.execute("""
CREATE TABLE IF NOT EXISTS eluri.gold_sales_by_region (
    region TEXT PRIMARY KEY,
    total_revenue FLOAT,
    total_profit FLOAT
);
""")
print("Gold table 1: sales_by_region created successfully.")

Gold table 1: sales_by_region created successfully.


In [15]:
from collections import defaultdict

# Aggregate total revenue and profit by region
region_sales = defaultdict(lambda: {'total_revenue': 0, 'total_profit': 0})

# Retrieve data from Silver table
rows = session.execute("SELECT region, total_revenue, total_profit FROM eluri.silver_sales")
for row in rows:
    region_sales[row.region]['total_revenue'] += row.total_revenue
    region_sales[row.region]['total_profit'] += row.total_profit

# Insert aggregated data into Gold table
for region, sales_data in region_sales.items():
    session.execute("""
    INSERT INTO eluri.gold_sales_by_region (region, total_revenue, total_profit)
    VALUES (%s, %s, %s);
    """, (region, sales_data['total_revenue'], sales_data['total_profit']))

print("Gold table 1: Data for total_sales_by_region inserted successfully.")

Gold table 1: Data for total_sales_by_region inserted successfully.


In [16]:
rows = session.execute("SELECT * FROM eluri.gold_sales_by_region")
for row in rows:
    print(row.region, row.total_profit, row.total_revenue)

Australia and Oceania 3486940.0 10711258.0
Europe 11267281.0 34964748.0
Middle East and North Africa 6514262.0 24765128.0
Central America and the Caribbean 4252300.0 17570836.0
Asia 6749896.0 28840812.0
Sub-Saharan Africa 7651892.0 24225438.0
North America 1404621.5 3611757.5


In [17]:
session.execute("""
CREATE TABLE IF NOT EXISTS eluri.gold_top_selling_products (
    item_type TEXT PRIMARY KEY,
    total_revenue FLOAT,
    total_units_sold INT
);
""")
print("Gold table 2: top_selling_products created successfully.")

Gold table 2: top_selling_products created successfully.


In [18]:
top_selling = defaultdict(lambda: {'total_revenue': 0, 'total_units_sold': 0})

# Aggregate total revenue and units sold by product type
rows = session.execute("SELECT item_type, total_revenue, units_sold FROM eluri.silver_sales")
for row in rows:
    top_selling[row.item_type]['total_revenue'] += row.total_revenue
    top_selling[row.item_type]['total_units_sold'] += row.units_sold

# Insert aggregated data into Gold table
for item_type, sales_data in top_selling.items():
    session.execute("""
    INSERT INTO eluri.gold_top_selling_products (item_type, total_revenue, total_units_sold)
    VALUES (%s, %s, %s);
    """, (item_type, sales_data['total_revenue'], sales_data['total_units_sold']))

print("Gold table 2: Data for top_selling_products inserted successfully.")

Gold table 2: Data for top_selling_products inserted successfully.


In [19]:
rows = session.execute("SELECT * FROM eluri.gold_top_selling_products")
for row in rows:
    print(row.item_type, row.total_revenue, row.total_units_sold)

Household 38519084.0 57640
Office Supplies 27880904.0 42814
Vegetables 1135114.125 7368
Snacks 2193642.75 14377
Personal Care 3191147.75 39045
Meat 21278866.0 50437
Fruits 615033.625 65920
Beverages 2145024.75 45206
Cereal 9416123.0 45776
Cosmetics 28727100.0 65707
Baby Food 5200564.0 20372
Clothes 4387373.5 40148


In [20]:
session.execute("""
CREATE TABLE IF NOT EXISTS eluri.gold_sales_performance (
    year INT,
    month INT,
    total_revenue FLOAT,
    total_profit FLOAT,
    PRIMARY KEY (year, month)
);
""")
print("Gold table 3: sales_performance created successfully.")


Gold table 3: sales_performance created successfully.


In [21]:
from datetime import datetime

# Aggregate sales performance by year and month
monthly_sales = defaultdict(lambda: {'total_revenue': 0, 'total_profit': 0})

# Convert order_date to year and month, and aggregate revenue and profit
rows = session.execute("SELECT order_date, total_revenue, total_profit FROM eluri.silver_sales")
for row in rows:
    if row.order_date:
        # Convert order_date to a Python datetime object
        order_date = datetime.strptime(str(row.order_date), '%Y-%m-%d')  # Ensure correct format

        # Extract year and month
        year, month = order_date.year, order_date.month

        # Aggregate sales data by year and month
        monthly_sales[(year, month)]['total_revenue'] += row.total_revenue
        monthly_sales[(year, month)]['total_profit'] += row.total_profit

# Insert aggregated data into Gold table
for (year, month), sales_data in monthly_sales.items():
    session.execute("""
    INSERT INTO eluri.gold_sales_performance (year, month, total_revenue, total_profit)
    VALUES (%s, %s, %s, %s);
    """, (year, month, sales_data['total_revenue'], sales_data['total_profit']))

print("Gold table 3: Data for sales_performance inserted successfully.")

Gold table 3: Data for sales_performance inserted successfully.


In [22]:
rows = session.execute("SELECT * FROM eluri.gold_sales_performance")
for row in rows:
    print(row)

Row(year=2014, month=2, total_profit=562533.125, total_revenue=4081224.75)
Row(year=2014, month=3, total_profit=1592127.625, total_revenue=4003440.5)
Row(year=2014, month=6, total_profit=514581.84375, total_revenue=1205442.0)
Row(year=2014, month=7, total_profit=2102895.5, total_revenue=9280963.0)
Row(year=2014, month=10, total_profit=23133.58984375, total_revenue=89558.671875)
Row(year=2014, month=11, total_profit=143351.640625, total_revenue=434357.3125)
Row(year=2010, month=4, total_profit=1032559.3125, total_revenue=2585495.25)
Row(year=2010, month=6, total_profit=11423.400390625, total_revenue=44224.19921875)
Row(year=2010, month=8, total_profit=715456.4375, total_revenue=2884921.5)
Row(year=2010, month=10, total_profit=236007.3125, total_revenue=628499.375)
Row(year=2010, month=11, total_profit=1380006.25, total_revenue=3470056.5)
Row(year=2010, month=12, total_profit=959274.25, total_revenue=4159723.5)
Row(year=2012, month=2, total_profit=71738.4609375, total_revenue=217368.4531