**Installing necessary drivers and importing the libraries**




In [None]:
# Install Cassandra driver
!pip install cassandra-driver pandas

# Imports
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from uuid import uuid4

Collecting cassandra-driver
  Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.2 kB)
Collecting geomet<0.3,>=0.1 (from cassandra-driver)
  Downloading geomet-0.2.1.post1-py3-none-any.whl.metadata (1.0 kB)
Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m34.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.29.2 geomet-0.2.1.post1


**Connection to Cassandra with DataStax through SCB and Token**

In [None]:
import json
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

# This secure connect bundle is autogenerated when you download your SCB,
cloud_config = {
    'secure_connect_bundle': 'secure-connect-cassandra.zip'  # Update with your SCB filename
}

# Load credentials from token JSON
with open("Cassandra-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

# Establish connection
try:
    auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
    cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
    session = cluster.connect()
    print("Connected to Cassandra successfully.")
except Exception as e:
    print(f"An error occurred while connecting to Cassandra: {e}")



Connected to Cassandra successfully.


**Uploading the CSV file and viewing the data**

In [None]:
import pandas as pd
from google.colab import files

print("Please upload your 'csv' file.")
uploaded = files.upload()  # This opens a file upload dialog

# Extract the uploaded file name (assuming a single file is uploaded)
file_name = list(uploaded.keys())[0]
print(f"File '{file_name}' uploaded successfully.")

# Load the CSV into a pandas DataFrame
try:
    df = pd.read_csv(file_name)
    print("Data loaded successfully:")
    print(df.head())  # Display the first few rows for confirmation
except Exception as e:
    print(f"An error occurred while loading the data: {e}")

Please upload your 'csv' file.


Saving sales_100.csv to sales_100.csv
File 'sales_100.csv' uploaded successfully.
Data loaded successfully:
                         Region           Country  Item Type Sales Channel  \
0            Sub-Saharan Africa      South Africa     Fruits       Offline   
1  Middle East and North Africa           Morocco    Clothes        Online   
2         Australia and Oceania  Papua New Guinea       Meat       Offline   
3            Sub-Saharan Africa          Djibouti    Clothes       Offline   
4                        Europe          Slovakia  Beverages       Offline   

  Order Priority  Order Date   Order ID   Ship Date  UnitsSold  UnitPrice  \
0              M   7/27/2012  443368995   7/28/2012       1593       9.33   
1              M   9/14/2013  667593514  10/19/2013       4611     109.28   
2              M   5/15/2015  940995585    6/4/2015        360     421.89   
3              H   5/17/2017  880811536    7/2/2017        562     109.28   
4              L  10/26/2016  17459019

**Creating Bronze table**

In [None]:
# Use the bronze keyspace
session.execute("USE bronze;")

# Create the 'bronze_layer' table with the correct column names (as per your CSV)
create_bronze_table_query = """
CREATE TABLE IF NOT EXISTS bronze.bronze_layer (
    "Region" TEXT,
    "Country" TEXT,
    "Item Type" TEXT,
    "Sales Channel" TEXT,
    "Order Priority" TEXT,
    "Order Date" TEXT,
    "Order ID" INT,
    "Ship Date" TEXT,
    "UnitsSold" INT,
    "UnitPrice" FLOAT,
    "UnitCost" FLOAT,
    "TotalRevenue" FLOAT,
    "TotalCost" FLOAT,
    "TotalProfit" FLOAT,
    PRIMARY KEY ("Region", "Country", "Item Type", "Order Date", "Order ID")
);
"""
session.execute(create_bronze_table_query)

<cassandra.cluster.ResultSet at 0x7d0152c9caf0>

**Inserting data into the Bronze Table**

In [None]:
import pandas as pd

# Load the CSV into a DataFrame
df = pd.read_csv('/content/sales_100.csv')  # Update the path with the correct file location

# Insert data into the bronze_layer table
for index, row in df.iterrows():
    insert_query = """
    INSERT INTO bronze.bronze_layer ("Region", "Country", "Item Type", "Sales Channel", "Order Priority", "Order Date",
    "Order ID", "Ship Date", "UnitsSold", "UnitPrice", "UnitCost", "TotalRevenue", "TotalCost", "TotalProfit")
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    session.execute(insert_query, tuple(row))

print("Data inserted successfully into the bronze_layer table.")

Data inserted successfully into the bronze_layer table.


In [None]:
# Verify data in the bronze_layer table
rows = session.execute("SELECT * FROM bronze.bronze_layer;")
for row in rows:
    print(row)

Row(Order_ID=571997869, Country='Vanuatu', Item_Type='Fruits', Order_Date='11/3/2013', Order_Priority='C', Region='Australia and Oceania', Sales_Channel='Online', Ship_Date='11/11/2013', TotalCost=39686.19921875, TotalProfit=13821.349609375, TotalRevenue=53507.55078125, UnitCost=6.920000076293945, UnitPrice=9.329999923706055, UnitsSold=5735)
Row(Order_ID=349235904, Country='Mauritius ', Item_Type='Clothes', Order_Date='11/17/2012', Order_Priority='M', Region='Sub-Saharan Africa', Sales_Channel='Offline', Ship_Date='12/2/2012', TotalCost=197836.796875, TotalProfit=405388.8125, TotalRevenue=603225.625, UnitCost=35.84000015258789, UnitPrice=109.27999877929688, UnitsSold=5520)
Row(Order_ID=440306556, Country='India', Item_Type='Snacks', Order_Date='10/10/2012', Order_Priority='L', Region='Asia', Sales_Channel='Online', Ship_Date='11/20/2012', TotalCost=521206.5625, TotalProfit=294943.875, TotalRevenue=816150.4375, UnitCost=97.44000244140625, UnitPrice=152.5800018310547, UnitsSold=5349)
Row

**Cleaning the data for Silver Table**

In [None]:
import pandas as pd

# Load the CSV into a DataFrame
df = pd.read_csv('/content/sales_100.csv')  # Update the path with the correct file location

# Rename columns: Replace spaces with underscores
df.columns = df.columns.str.replace(' ', '_')

# Ensure proper date formatting (convert from MM/DD/YYYY to YYYY-MM-DD)
df['Order_Date'] = pd.to_datetime(df['Order_Date'], format='%m/%d/%Y', errors='coerce').dt.strftime('%Y-%m-%d')
df['Ship_Date'] = pd.to_datetime(df['Ship_Date'], format='%m/%d/%Y', errors='coerce').dt.strftime('%Y-%m-%d')

# Handle any rows with invalid date formats by filling them with a default value ('0000-00-00')
df['Order_Date'] = df['Order_Date'].fillna('0000-00-00')
df['Ship_Date'] = df['Ship_Date'].fillna('0000-00-00')

# Handle missing numeric values by filling them with zeros
df['UnitsSold'] = df['UnitsSold'].fillna(0)
df['UnitPrice'] = df['UnitPrice'].fillna(0)
df['UnitCost'] = df['UnitCost'].fillna(0)
df['TotalRevenue'] = df['TotalRevenue'].fillna(0)
df['TotalCost'] = df['TotalCost'].fillna(0)
df['TotalProfit'] = df['TotalProfit'].fillna(0)

# Remove duplicates
df = df.drop_duplicates()

# Optional: Print the cleaned dataframe for verification
print(df.head())

                         Region           Country  Item_Type Sales_Channel  \
0            Sub-Saharan Africa      South Africa     Fruits       Offline   
1  Middle East and North Africa           Morocco    Clothes        Online   
2         Australia and Oceania  Papua New Guinea       Meat       Offline   
3            Sub-Saharan Africa          Djibouti    Clothes       Offline   
4                        Europe          Slovakia  Beverages       Offline   

  Order_Priority  Order_Date   Order_ID   Ship_Date  UnitsSold  UnitPrice  \
0              M  2012-07-27  443368995  2012-07-28       1593       9.33   
1              M  2013-09-14  667593514  2013-10-19       4611     109.28   
2              M  2015-05-15  940995585  2015-06-04        360     421.89   
3              H  2017-05-17  880811536  2017-07-02        562     109.28   
4              L  2016-10-26  174590194  2016-12-04       3973      47.45   

   UnitCost  TotalRevenue  TotalCost  TotalProfit  
0      6.92     

**Creating the Silver Table**

In [None]:
# Use the silver keyspace
session.execute("USE silver;")

# Create the 'silver_layer' table if not exists
create_table_query = """
CREATE TABLE IF NOT EXISTS silver.silver_layer (
    region TEXT,
    country TEXT,
    item_type TEXT,
    sales_channel TEXT,
    order_priority TEXT,
    order_date DATE,
    order_id INT,
    ship_date DATE,
    units_sold INT,
    unit_price FLOAT,
    unit_cost FLOAT,
    total_revenue DECIMAL,
    total_cost DECIMAL,
    total_profit DECIMAL,
    PRIMARY KEY (region, country, item_type, order_id)
);
"""
session.execute(create_table_query)
print("Silver table created successfully.")


Silver table created successfully.


**Inserting cleaned data into the Silver Table**

In [None]:
# Insert cleaned data into the silver_layer table
for index, row in df.iterrows():
    insert_query = """
    INSERT INTO silver.silver_layer (region, country, item_type, sales_channel, order_priority, order_date,
    order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    session.execute(insert_query, tuple(row))

print("Data inserted successfully into the silver_layer table.")


Data inserted successfully into the silver_layer table.


In [None]:
# Query the silver_layer table to view the data
select_query = "SELECT * FROM silver_layer;"
rows = session.execute(select_query)

# Display the result
for row in rows:
    print(row)

Row(region='Australia and Oceania', country='East Timor', item_type='Cereal', order_id=156295812, order_date=Date(16776), order_priority='M', sales_channel='Offline', ship_date=Date(16799), total_cost=Decimal('30331.49'), total_profit=Decimal('22944.81'), total_revenue=Decimal('53276.3'), unit_cost=117.11000061035156, unit_price=205.6999969482422, units_sold=259)
Row(region='Australia and Oceania', country='New Zealand', item_type='Beverages', order_id=940980136, order_date=Date(15624), order_priority='M', sales_channel='Online', ship_date=Date(15648), total_cost=Decimal('184000.52'), total_profit=Decimal('90640.08'), total_revenue=Decimal('274640.6'), unit_cost=31.790000915527344, unit_price=47.45000076293945, units_sold=5788)
Row(region='Australia and Oceania', country='Papua New Guinea', item_type='Clothes', order_id=647164094, order_date=Date(15145), order_priority='L', sales_channel='Offline', ship_date=Date(15169), total_cost=Decimal('325857.28'), total_profit=Decimal('667716.48'

In [None]:
import pandas as pd

# Use the silver keyspace
session.execute("USE silver;")

# Query the silver_layer table to view the data
select_query = "SELECT * FROM silver_layer;"  # Fetch all rows from the silver_layer table
rows = session.execute(select_query)

# Convert the rows to a DataFrame
df_silver = pd.DataFrame(rows)

# Display the full table
df_silver

Unnamed: 0,region,country,item_type,order_id,order_date,order_priority,sales_channel,ship_date,total_cost,total_profit,total_revenue,unit_cost,unit_price,units_sold
0,Australia and Oceania,East Timor,Cereal,156295812,2015-12-07,M,Offline,2015-12-30,30331.49,22944.81,53276.3,117.110001,205.699997,259
1,Australia and Oceania,New Zealand,Beverages,940980136,2012-10-11,M,Online,2012-11-04,184000.52,90640.08,274640.6,31.790001,47.450001,5788
2,Australia and Oceania,Papua New Guinea,Clothes,647164094,2011-06-20,L,Offline,2011-07-14,325857.28,667716.48,993573.76,35.840000,109.279999,9092
3,Australia and Oceania,Papua New Guinea,Meat,940995585,2015-05-15,M,Offline,2015-06-04,131288.4,20592.0,151880.4,364.690002,421.890015,360
4,Australia and Oceania,Samoa,Household,937431466,2016-12-05,L,Online,2016-12-08,2842868.78,937534.61,3780403.39,502.540009,668.270020,5657
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
94,Sub-Saharan Africa,Uganda,Cosmetics,842238795,2015-02-28,M,Online,2015-03-15,1588143.23,1048609.97,2636753.2,263.329987,437.200012,6031
95,Sub-Saharan Africa,Uganda,Personal Care,539471471,2014-06-19,M,Online,2014-07-21,25558.17,11302.06,36860.23,56.669998,81.730003,451
96,Sub-Saharan Africa,Zimbabwe,Office Supplies,953361213,2011-03-28,C,Offline,2011-04-08,5051690.08,1214903.75,6266593.83,524.960022,651.210022,9623
97,North America,Canada,Cosmetics,368977391,2011-05-09,H,Online,2011-06-02,1965495.12,1297765.68,3263260.8,263.329987,437.200012,7464


**Creating the Golden Tables**

**Golden Table 1: Aggregated Sales Data by Region**

In [48]:
# Golden Table:1
# Create the gold table
session.execute("""
CREATE TABLE IF NOT EXISTS gold.gold1_sales_by_region (
    region TEXT PRIMARY KEY,
    total_units_sold INT,
    total_revenue DECIMAL,
    total_profit DECIMAL
);
""")
print("Golden Table 1 created successfully.")

# Fetch distinct regions from the silver table
regions_query = "SELECT DISTINCT region FROM silver.silver_layer;"
regions = session.execute(regions_query)

# Process each region for aggregation
for region in regions:
    region_name = region.region
    query = f"""
    SELECT SUM(units_sold) AS total_units_sold,
           SUM(total_revenue) AS total_revenue,
           SUM(total_profit) AS total_profit
    FROM silver.silver_layer WHERE region = %s;
    """
    aggregated_data = session.execute(query, (region_name,)).one()

    # Insert aggregated data into the gold table
    session.execute("""
    INSERT INTO gold.gold1_sales_by_region (region, total_units_sold, total_revenue, total_profit)
    VALUES (%s, %s, %s, %s);
    """, (region_name, aggregated_data.total_units_sold, aggregated_data.total_revenue, aggregated_data.total_profit))

print("Data inserted into gold1_sales_by_region successfully.")

Golden Table 1 created successfully.
Data inserted into gold1_sales_by_region successfully.


**Golden Table 2: Total Units Sold by Item Type**

In [49]:
# Golden Table:2
from collections import defaultdict

# Create the golden table for total units sold by item type
session.execute("""
CREATE TABLE IF NOT EXISTS gold.gold2_total_units_by_item_type (
    item_type text PRIMARY KEY,
    total_units_sold int
);
""")
print("Golden Table 2 created successfully.")


# Query all rows from the silver table
query = "SELECT item_type, units_sold FROM silver.silver_layer;"
rows = session.execute(query)

# Aggregate total units sold by item_type using Python
item_type_units = defaultdict(int)
for row in rows:
    # Sum up the units_sold for each item_type
    item_type_units[row.item_type] += row.units_sold

# Insert the aggregated data into the golden table
for item_type, total_units_sold in item_type_units.items():
    session.execute("""
    INSERT INTO gold.gold2_total_units_by_item_type (item_type, total_units_sold)
    VALUES (%s, %s);
    """, (item_type, total_units_sold))

print("Data inserted into gold2_total_units_by_item_type successfully.")

Golden Table 2 created successfully.
Data inserted into gold2_total_units_by_item_type successfully.


**Golden Table 3: Total Revenue by Sales Channel**

In [50]:
# Golden Table:3
from collections import defaultdict
from decimal import Decimal

# Create the golden table (if not exists)
session.execute("""
CREATE TABLE IF NOT EXISTS gold.gold3_revenue_by_channel (
    sales_channel text PRIMARY KEY,
    total_revenue decimal
);
""")
print("Golden Table 3 created successfully.")

# Fetch raw data from the silver table
query = "SELECT sales_channel, total_revenue FROM silver.silver_layer;"
rows = session.execute(query)

# Aggregate total revenue by sales_channel
revenue_by_channel = defaultdict(Decimal)
for row in rows:
    # Ensure total_revenue is a Decimal (it may be in a different format)
    revenue_by_channel[row.sales_channel] += Decimal(row.total_revenue)

# Insert aggregated data into the gold table
for sales_channel, total_revenue in revenue_by_channel.items():
    session.execute("""
    INSERT INTO gold.gold3_revenue_by_channel (sales_channel, total_revenue)
    VALUES (%s, %s);
    """, (sales_channel, total_revenue))

print("Data inserted into gold3_revenue_by_channel successfully.")

Golden Table 3 created successfully.
Data inserted into gold3_revenue_by_channel successfully.


**Display Golden Table 1: gold1_sales_by_region**

In [51]:
# Display contents of gold1_sales_by_region table
print("Displaying contents of gold1_sales_by_region table:")

query = "SELECT * FROM gold.gold1_sales_by_region;"
rows = session.execute(query)

for row in rows:
    print(f"Region: {row.region}, Total Units Sold: {row.total_units_sold}, Total Revenue: {row.total_revenue}, Total Profit: {row.total_profit}")

Displaying contents of gold1_sales_by_region table:
Region: Australia and Oceania, Total Units Sold: 42328, Total Revenue: 10711258.13, Total Profit: 3486940.03
Region: Europe, Total Units Sold: 121002, Total Revenue: 34964749.83, Total Profit: 11267281.03
Region: Middle East and North Africa, Total Units Sold: 60376, Total Revenue: 24765127.25, Total Profit: 6514261.71
Region: Central America and the Caribbean, Total Units Sold: 53641, Total Revenue: 17570835.42, Total Profit: 4252300.17
Region: Asia, Total Units Sold: 113129, Total Revenue: 28840812.19, Total Profit: 6749896.24
Region: Sub-Saharan Africa, Total Units Sold: 92606, Total Revenue: 24225437.42, Total Profit: 7651891.85
Region: North America, Total Units Sold: 11728, Total Revenue: 3611757.52, Total Profit: 1404621.52


**Display Golden Table 2: gold2_total_units_by_item_type**

In [53]:
# Display contents of gold2_total_units_by_item_type table
print("Displaying contents of gold2_total_units_by_item_type table:")

query = "SELECT * FROM gold.gold2_total_units_by_item_type;"
rows = session.execute(query)

for row in rows:
    print(f"Item Type: {row.item_type}, Total Units Sold: {row.total_units_sold}")

Displaying contents of gold2_total_units_by_item_type table:
Item Type: Household, Total Units Sold: 57640
Item Type: Office Supplies, Total Units Sold: 42814
Item Type: Vegetables, Total Units Sold: 7368
Item Type: Snacks, Total Units Sold: 14377
Item Type: Personal Care, Total Units Sold: 39045
Item Type: Meat, Total Units Sold: 50437
Item Type: Fruits, Total Units Sold: 65920
Item Type: Beverages, Total Units Sold: 45206
Item Type: Cereal, Total Units Sold: 45776
Item Type: Cosmetics, Total Units Sold: 65707
Item Type: Baby Food, Total Units Sold: 20372
Item Type: Clothes, Total Units Sold: 40148


**Display Golden Table 3: gold3_revenue_by_channel**

In [54]:
# Display contents of gold_revenue_by_channel table
print("Displaying contents of gold3_revenue_by_channel table:")

query = "SELECT * FROM gold.gold3_revenue_by_channel;"
rows = session.execute(query)

for row in rows:
    print(f"Sales Channel: {row.sales_channel}, Total Revenue: {row.total_revenue}")

Displaying contents of gold3_revenue_by_channel table:
Sales Channel: Online, Total Revenue: 84628184.37
Sales Channel: Offline, Total Revenue: 60061793.39
