In [10]:
# Install the Cassandra python driver
!pip install cassandra-driver



In [11]:
# Import the necessary libraries
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

In [12]:
# This secure connect bundle is autogenerated when you download your SCB,
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': 'secure-connect-assaignment.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("Assaignment-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


In [13]:
   import pandas as pd

   # Load the CSV from GitHub URL
   url = "https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv"
   data = pd.read_csv(url)

   print(data.head())

                         Region           Country  Item Type Sales Channel  \
0            Sub-Saharan Africa      South Africa     Fruits       Offline   
1  Middle East and North Africa           Morocco    Clothes        Online   
2         Australia and Oceania  Papua New Guinea       Meat       Offline   
3            Sub-Saharan Africa          Djibouti    Clothes       Offline   
4                        Europe          Slovakia  Beverages       Offline   

  Order Priority  Order Date   Order ID   Ship Date  UnitsSold  UnitPrice  \
0              M   7/27/2012  443368995   7/28/2012       1593       9.33   
1              M   9/14/2013  667593514  10/19/2013       4611     109.28   
2              M   5/15/2015  940995585    6/4/2015        360     421.89   
3              H   5/17/2017  880811536    7/2/2017        562     109.28   
4              L  10/26/2016  174590194   12/4/2016       3973      47.45   

   UnitCost  TotalRevenue  TotalCost  TotalProfit  
0      6.92     

In [14]:
rows = session.execute("SELECT release_version FROM system.local;")
for row in rows:
    print(f"Cassandra Version: {row.release_version}")


Cassandra Version: 4.0.0.6816


In [17]:
# Set the keyspace
session.set_keyspace("example")  # Replace 'your_keyspace_name' with your actual keyspace

# Create the Bronze Table
session.execute("""
CREATE TABLE IF NOT EXISTS bronze_sales (
    order_id TEXT PRIMARY KEY,
        order_date TEXT,
            customer_name TEXT,  -- Placeholder column since it's not in your dataset
                product TEXT,
                    quantity INT,
                        price FLOAT,
                            total FLOAT
                            );
                            """)

print("Bronze table created successfully.")


Bronze table created successfully.


In [18]:
from cassandra.query import SimpleStatement

# Insert query for the Bronze table
insert_query = SimpleStatement("""
INSERT INTO bronze_sales (
    order_id, region, country, item_type, sales_channel,
    order_priority, order_date, ship_date, units_sold,
    unit_price, unit_cost, total_revenue, total_cost, total_profit
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""")

# Loop through each row of the DataFrame and insert into the Bronze table
for index, row in data.iterrows():
    try:
        # Explicitly cast all data to the correct types
        order_id = str(row['Order ID'])
        region = str(row['Region'])
        country = str(row['Country'])
        item_type = str(row['Item Type'])
        sales_channel = str(row['Sales Channel'])
        order_priority = str(row['Order Priority'])
        order_date = str(row['Order Date'])
        ship_date = str(row['Ship Date'])
        units_sold = int(row['UnitsSold'])
        unit_price = float(row['UnitPrice'])
        unit_cost = float(row['UnitCost'])
        total_revenue = float(row['TotalRevenue'])
        total_cost = float(row['TotalCost'])
        total_profit = float(row['TotalProfit'])

        # Insert data into the table
        session.execute(insert_query, (
            order_id, region, country, item_type, sales_channel,
            order_priority, order_date, ship_date, units_sold,
            unit_price, unit_cost, total_revenue, total_cost, total_profit
        ))
    except Exception as e:
        print(f"Error inserting row at index {index}: {e}")
        print(f"Row data: {row}")

print("Data inserted into Bronze table successfully.")


Error inserting row at index 0: not all arguments converted during string formatting
Row data: Region            Sub-Saharan Africa
Country                 South Africa
Item Type                     Fruits
Sales Channel                Offline
Order Priority                     M
Order Date                 7/27/2012
Order ID                   443368995
Ship Date                  7/28/2012
UnitsSold                       1593
UnitPrice                       9.33
UnitCost                        6.92
TotalRevenue                14862.69
TotalCost                   11023.56
TotalProfit                  3839.13
Name: 0, dtype: object
Error inserting row at index 1: not all arguments converted during string formatting
Row data: Region            Middle East and North Africa
Country                                Morocco
Item Type                              Clothes
Sales Channel                           Online
Order Priority                               M
Order Date                        

In [19]:
# Create the Silver table
session.execute("""
CREATE TABLE IF NOT EXISTS silver_sales (
    order_id TEXT PRIMARY KEY,
    region TEXT,
    country TEXT,
    product TEXT,
    sales_channel TEXT,
    order_priority TEXT,
    order_date DATE,
    ship_date DATE,
    units_sold INT,
    unit_price FLOAT,
    unit_cost FLOAT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT
);
""")

print("Silver table created successfully.")

Silver table created successfully.


In [20]:
from datetime import datetime
from cassandra.query import SimpleStatement

# Insert query for the Silver table
silver_insert_query = SimpleStatement("""
INSERT INTO silver_sales (
    order_id, region, country, product, sales_channel,
    order_priority, order_date, ship_date, units_sold,
    unit_price, unit_cost, total_revenue, total_cost, total_profit
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""")

# Loop through each row of the DataFrame and insert into the Silver table
for index, row in data.iterrows():
    try:
        # Explicitly cast all data to the correct types
        order_id = str(row['Order ID']) if not pd.isnull(row['Order ID']) else 'Unknown'
        region = str(row['Region']) if not pd.isnull(row['Region']) else 'Unknown'
        country = str(row['Country']) if not pd.isnull(row['Country']) else 'Unknown'
        item_type = str(row['Item Type']) if not pd.isnull(row['Item Type']) else 'Unknown'
        sales_channel = str(row['Sales Channel']) if not pd.isnull(row['Sales Channel']) else 'Unknown'
        order_priority = str(row['Order Priority']) if not pd.isnull(row['Order Priority']) else 'Unknown'
        order_date = datetime.strptime(row['Order Date'], '%m/%d/%Y').date() if not pd.isnull(row['Order Date']) else datetime(1970, 1, 1).date()
        ship_date = datetime.strptime(row['Ship Date'], '%m/%d/%Y').date() if not pd.isnull(row['Ship Date']) else datetime(1970, 1, 1).date()
        units_sold = int(row['UnitsSold']) if not pd.isnull(row['UnitsSold']) else 0
        unit_price = float(row['UnitPrice']) if not pd.isnull(row['UnitPrice']) else 0.0
        unit_cost = float(row['UnitCost']) if not pd.isnull(row['UnitCost']) else 0.0
        total_revenue = float(row['TotalRevenue']) if not pd.isnull(row['TotalRevenue']) else 0.0
        total_cost = float(row['TotalCost']) if not pd.isnull(row['TotalCost']) else 0.0
        total_profit = float(row['TotalProfit']) if not pd.isnull(row['TotalProfit']) else 0.0

        # Insert data into the Silver table
        session.execute(silver_insert_query, (
            order_id, region, country, item_type, sales_channel,
            order_priority, order_date, ship_date, units_sold,
            unit_price, unit_cost, total_revenue, total_cost, total_profit
        ))

    except Exception as e:
        print(f"Error inserting row at index {index}: {e}")
        print(f"Row data: {row}")

print("Data inserted into Silver table successfully.")


Error inserting row at index 0: not all arguments converted during string formatting
Row data: Region            Sub-Saharan Africa
Country                 South Africa
Item Type                     Fruits
Sales Channel                Offline
Order Priority                     M
Order Date                 7/27/2012
Order ID                   443368995
Ship Date                  7/28/2012
UnitsSold                       1593
UnitPrice                       9.33
UnitCost                        6.92
TotalRevenue                14862.69
TotalCost                   11023.56
TotalProfit                  3839.13
Name: 0, dtype: object
Error inserting row at index 1: not all arguments converted during string formatting
Row data: Region            Middle East and North Africa
Country                                Morocco
Item Type                              Clothes
Sales Channel                           Online
Order Priority                               M
Order Date                        

In [21]:
session.execute("""
CREATE TABLE IF NOT EXISTS gold_sales_by_product (
    product TEXT PRIMARY KEY,
    total_units_sold INT,
    total_revenue FLOAT
);
""")
print("Gold table for Sales by Product created successfully.")


Gold table for Sales by Product created successfully.


In [22]:
from cassandra.query import SimpleStatement

# Aggregate data by product
product_sales = data.groupby('Item Type').agg({
    'UnitsSold': 'sum',
    'TotalRevenue': 'sum'
}).reset_index()

# Rename columns to match the query parameters
product_sales.rename(columns={
    'Item Type': 'product',
    'UnitsSold': 'total_units_sold',
    'TotalRevenue': 'total_revenue'
}, inplace=True)

# Ensure proper data types
product_sales['product'] = product_sales['product'].astype(str)
product_sales['total_units_sold'] = product_sales['total_units_sold'].astype(int)
product_sales['total_revenue'] = product_sales['total_revenue'].astype(float)

# Fill missing values
product_sales.fillna({
      'product': 'Unknown',
      'total_units_sold': 0,
      'total_revenue': 0.0
}, inplace=True)

# Create the Gold table for Sales by Product
session.execute("""
CREATE TABLE IF NOT EXISTS gold_sales_by_product (
    product TEXT PRIMARY KEY,
    total_units_sold INT,
    total_revenue FLOAT
);
""")
print("Gold table for Sales by Product created successfully.")

# Prepare the insert query
gold_insert_query = SimpleStatement("""
INSERT INTO gold_sales_by_product (product, total_units_sold, total_revenue)
VALUES (%s, %s, %s)
""")

# Insert rows into the Gold table
for _, row in product_sales.iterrows():
    try:
        # Print the data being inserted
        print("Inserting:", row['product'], int(row['total_units_sold']), float(row['total_revenue']))

        # Execute the insert query
        session.execute(gold_insert_query, (
            row['product'],                # Product name
            int(row['total_units_sold']),  # Total Units Sold
            float(row['total_revenue'])    # Total Revenue
        ))
    except Exception as e:
        print(f"Error inserting row: {row}, Error: {e}")

print("Data inserted into Gold table for Sales by Product successfully.")

rows = session.execute("SELECT * FROM gold_sales_by_product;")
for row in rows:
    print(row)


Gold table for Sales by Product created successfully.
Inserting: Baby Food 20372 5200564.16
Inserting: Beverages 45206 2145024.7
Inserting: Cereal 45776 9416123.2
Inserting: Clothes 40148 4387373.44
Inserting: Cosmetics 65707 28727100.4
Inserting: Fruits 65920 615033.6
Inserting: Household 57640 38519082.8
Inserting: Meat 50437 21278865.93
Inserting: Office Supplies 42814 27880904.94
Inserting: Personal Care 39045 3191147.8499999996
Inserting: Snacks 14377 2193642.66
Inserting: Vegetables 7368 1135114.08
Data inserted into Gold table for Sales by Product successfully.
Row(product='Household', total_revenue=38519084.0, total_units_sold=57640)
Row(product='Office Supplies', total_revenue=27880904.0, total_units_sold=42814)
Row(product='Vegetables', total_revenue=1135114.125, total_units_sold=7368)
Row(product='Snacks', total_revenue=2193642.75, total_units_sold=14377)
Row(product='Personal Care', total_revenue=3191147.75, total_units_sold=39045)
Row(product='Meat', total_revenue=21278866

In [23]:
session.execute("""
CREATE TABLE IF NOT EXISTS gold_sales_by_region (
    region TEXT PRIMARY KEY,
    total_units_sold INT,
    total_revenue FLOAT
);
""")
print("Gold table for Sales by Region created successfully.")


Gold table for Sales by Region created successfully.


In [24]:
from cassandra.query import SimpleStatement

# Aggregate data by region
region_sales = data.groupby('Region').agg({
    'UnitsSold': 'sum',
    'TotalRevenue': 'sum'
}).reset_index()

# Ensure proper data types
region_sales['Region'] = region_sales['Region'].astype(str)
region_sales['UnitsSold'] = region_sales['UnitsSold'].astype(int)
region_sales['TotalRevenue'] = region_sales['TotalRevenue'].astype(float)

# Create the Gold table for Sales by Region
session.execute("""
CREATE TABLE IF NOT EXISTS gold_sales_by_region (
    region TEXT PRIMARY KEY,
    total_units_sold INT,
    total_revenue FLOAT
);
""")
print("Gold table for Sales by Region created successfully.")

# Insert query for Gold table
gold_insert_query = SimpleStatement("""
INSERT INTO gold_sales_by_region (region, total_units_sold, total_revenue)
VALUES (%s, %s, %s)
""")

# Insert rows into the Gold table
for _, row in region_sales.iterrows():
    try:
        # Print the data being inserted
        print("Inserting:", row['Region'], int(row['UnitsSold']), float(row['TotalRevenue']))

        # Execute the query
        session.execute(gold_insert_query, (
            row['Region'],                # Region
            int(row['UnitsSold']),        # Total Units Sold
            float(row['TotalRevenue'])    # Total Revenue
        ))
    except Exception as e:
        print(f"Error inserting row: {row}, Error: {e}")

print("Data inserted into Gold table for Sales by Region successfully.")

rows = session.execute("SELECT * FROM gold_sales_by_region;")
for row in rows:
    print(row)


Gold table for Sales by Region created successfully.
Inserting: Asia 113129 28840812.19
Inserting: Australia and Oceania 42328 10711258.129999999
Inserting: Central America and the Caribbean 53641 17570835.419999998
Inserting: Europe 121002 34964749.83
Inserting: Middle East and North Africa 60376 24765127.25
Inserting: North America 11728 3611757.5199999996
Inserting: Sub-Saharan Africa 92606 24225437.42
Data inserted into Gold table for Sales by Region successfully.
Row(region='Australia and Oceania', total_revenue=10711258.0, total_units_sold=42328)
Row(region='Europe', total_revenue=34964748.0, total_units_sold=121002)
Row(region='Middle East and North Africa', total_revenue=24765128.0, total_units_sold=60376)
Row(region='Central America and the Caribbean', total_revenue=17570836.0, total_units_sold=53641)
Row(region='Asia', total_revenue=28840812.0, total_units_sold=113129)
Row(region='Sub-Saharan Africa', total_revenue=24225438.0, total_units_sold=92606)
Row(region='North America

In [25]:
session.execute("""
CREATE TABLE IF NOT EXISTS gold_total_sales (
    total_units_sold INT PRIMARY KEY,
    total_revenue FLOAT
);
""")
print("Gold table for Total Sales created successfully.")

Gold table for Total Sales created successfully.


In [26]:
total_units_sold = data['UnitsSold'].sum()
total_revenue = data['TotalRevenue'].sum()

session.execute("""
INSERT INTO gold_total_sales (total_units_sold, total_revenue)
VALUES (%s, %s)
""", (total_units_sold, total_revenue))
print("Data inserted into Gold table for Total Sales successfully.")

# Example for Sales by Producthm
rows = session.execute("SELECT * FROM gold_sales_by_product;")
for row in rows:
    print(row)


Data inserted into Gold table for Total Sales successfully.
Row(product='Household', total_revenue=38519084.0, total_units_sold=57640)
Row(product='Office Supplies', total_revenue=27880904.0, total_units_sold=42814)
Row(product='Vegetables', total_revenue=1135114.125, total_units_sold=7368)
Row(product='Snacks', total_revenue=2193642.75, total_units_sold=14377)
Row(product='Personal Care', total_revenue=3191147.75, total_units_sold=39045)
Row(product='Meat', total_revenue=21278866.0, total_units_sold=50437)
Row(product='Fruits', total_revenue=615033.625, total_units_sold=65920)
Row(product='Beverages', total_revenue=2145024.75, total_units_sold=45206)
Row(product='Cereal', total_revenue=9416123.0, total_units_sold=45776)
Row(product='Cosmetics', total_revenue=28727100.0, total_units_sold=65707)
Row(product='Baby Food', total_revenue=5200564.0, total_units_sold=20372)
Row(product='Clothes', total_revenue=4387373.5, total_units_sold=40148)


In [27]:
# Query for golden table: Sales by Product
rows_product = session.execute("SELECT * FROM gold_sales_by_product;")
print("Gold Table: Sales by Product Data:")
for row in rows_product:
    print(row)

# Query for golden table: Sales by Region
rows_region = session.execute("SELECT * FROM gold_sales_by_region;")
print("Gold Table: Sales by Region Data:")
for row in rows_region:
    print(row)

# Query for golden table: Total Sales
rows_total_sales = session.execute("SELECT * FROM gold_total_sales;")
print("Gold Table: Total Sales Data:")
for row in rows_total_sales:
    print(row)


Gold Table: Sales by Product Data:
Row(product='Household', total_revenue=38519084.0, total_units_sold=57640)
Gold Table: Sales by Region Data:
Row(region='Australia and Oceania', total_revenue=10711258.0, total_units_sold=42328)
Gold Table: Total Sales Data:
Row(total_units_sold=494810, total_revenue=144689984.0)
Row(region='Europe', total_revenue=34964748.0, total_units_sold=121002)
Gold Table: Total Sales Data:
Row(total_units_sold=494810, total_revenue=144689984.0)
Row(region='Middle East and North Africa', total_revenue=24765128.0, total_units_sold=60376)
Gold Table: Total Sales Data:
Row(total_units_sold=494810, total_revenue=144689984.0)
Row(region='Central America and the Caribbean', total_revenue=17570836.0, total_units_sold=53641)
Gold Table: Total Sales Data:
Row(total_units_sold=494810, total_revenue=144689984.0)
Row(region='Asia', total_revenue=28840812.0, total_units_sold=113129)
Gold Table: Total Sales Data:
Row(total_units_sold=494810, total_revenue=144689984.0)
Row(reg