In [34]:
!pip install cassandra-driver




In [35]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

# Load credentials from JSON file
with open("/content/sample_data/Shoyab-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

# Connect to Cassandra
auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cloud_config = {"secure_connect_bundle": "/content/sample_data/secure-connect-shoyab.zip"}
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

# Set keyspace
session.set_keyspace('bdt')

# Confirm connection
if session:
    print("Connected!")
else:
    print("Connection failed.")




Connected!


In [36]:
import pandas as pd
sales_data = pd.read_csv("/content/sample_data/sales_100.csv");

In [37]:
sales_data.head()

Unnamed: 0,Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,UnitsSold,UnitPrice,UnitCost,TotalRevenue,TotalCost,TotalProfit
0,Sub-Saharan Africa,South Africa,Fruits,Offline,M,7/27/2012,443368995,7/28/2012,1593,9.33,6.92,14862.69,11023.56,3839.13
1,Middle East and North Africa,Morocco,Clothes,Online,M,9/14/2013,667593514,10/19/2013,4611,109.28,35.84,503890.08,165258.24,338631.84
2,Australia and Oceania,Papua New Guinea,Meat,Offline,M,5/15/2015,940995585,6/4/2015,360,421.89,364.69,151880.4,131288.4,20592.0
3,Sub-Saharan Africa,Djibouti,Clothes,Offline,H,5/17/2017,880811536,7/2/2017,562,109.28,35.84,61415.36,20142.08,41273.28
4,Europe,Slovakia,Beverages,Offline,L,10/26/2016,174590194,12/4/2016,3973,47.45,31.79,188518.85,126301.67,62217.18


In [38]:
sales_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 99 entries, 0 to 98
Data columns (total 14 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   Region          99 non-null     object 
 1   Country         99 non-null     object 
 2   Item Type       99 non-null     object 
 3   Sales Channel   99 non-null     object 
 4   Order Priority  99 non-null     object 
 5   Order Date      99 non-null     object 
 6   Order ID        99 non-null     int64  
 7   Ship Date       99 non-null     object 
 8   UnitsSold       99 non-null     int64  
 9   UnitPrice       99 non-null     float64
 10  UnitCost        99 non-null     float64
 11  TotalRevenue    99 non-null     float64
 12  TotalCost       99 non-null     float64
 13  TotalProfit     99 non-null     float64
dtypes: float64(5), int64(2), object(7)
memory usage: 11.0+ KB


In [39]:
import pandas as pd
from datetime import datetime
for _, row in sales_data.iterrows():
    row["Order Date"] =datetime.strptime( row["Order Date"] , "%m/%d/%Y").date() # Converting string to date

    row["Ship Date"] =datetime.strptime(row["Ship Date"], "%m/%d/%Y").date()   # Converting string to timestamp

# Create Bronze table for raw data

In [40]:
session.execute("DROP TABLE IF EXISTS bronze_sales_data ")

session.execute("""
    CREATE TABLE IF NOT EXISTS bronze_sales_data (
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date TEXT,
        order_id BIGINT PRIMARY KEY,
        ship_date TEXT,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    )
""")

# Insert raw data (assuming `data` is your pandas DataFrame)
for _, row in sales_data.iterrows():
    session.execute("""
        INSERT INTO bronze_sales_data (region, country, item_type, sales_channel, order_priority,
            order_date, order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row['Region'], row['Country'], row['Item Type'], row['Sales Channel'], row['Order Priority'],
        row['Order Date'], row['Order ID'], row['Ship Date'], row['UnitsSold'], row['UnitPrice'],
        row['UnitCost'], row['TotalRevenue'], row['TotalCost'], row['TotalProfit']
    ))


In [41]:

rows = session.execute("SELECT order_id, COUNT(*) AS count FROM bronze_sales_data GROUP BY order_id")

duplicate_order_ids = [row.order_id for row in rows if row.count > 1]

print(f"Duplicate order IDs: {duplicate_order_ids}")



Duplicate order IDs: []


# **SILVER TABLE**

In [42]:
session.execute("""CREATE TABLE IF NOT EXISTS silver_sales_data (
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date TEXT,
        order_id BIGINT PRIMARY KEY,
        ship_date TEXT,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
)""");


In [43]:
row=session.execute("select * from bronze_sales_data ");
row.column_names, row.column_types

(['order_id',
  'country',
  'item_type',
  'order_date',
  'order_priority',
  'region',
  'sales_channel',
  'ship_date',
  'total_cost',
  'total_profit',
  'total_revenue',
  'unit_cost',
  'unit_price',
  'units_sold'],
 [cassandra.cqltypes.LongType,
  cassandra.cqltypes.VarcharType,
  cassandra.cqltypes.VarcharType,
  cassandra.cqltypes.VarcharType,
  cassandra.cqltypes.VarcharType,
  cassandra.cqltypes.VarcharType,
  cassandra.cqltypes.VarcharType,
  cassandra.cqltypes.VarcharType,
  cassandra.cqltypes.FloatType,
  cassandra.cqltypes.FloatType,
  cassandra.cqltypes.FloatType,
  cassandra.cqltypes.FloatType,
  cassandra.cqltypes.FloatType,
  cassandra.cqltypes.Int32Type])

In [44]:
row.column_types

[cassandra.cqltypes.LongType,
 cassandra.cqltypes.VarcharType,
 cassandra.cqltypes.VarcharType,
 cassandra.cqltypes.VarcharType,
 cassandra.cqltypes.VarcharType,
 cassandra.cqltypes.VarcharType,
 cassandra.cqltypes.VarcharType,
 cassandra.cqltypes.VarcharType,
 cassandra.cqltypes.FloatType,
 cassandra.cqltypes.FloatType,
 cassandra.cqltypes.FloatType,
 cassandra.cqltypes.FloatType,
 cassandra.cqltypes.FloatType,
 cassandra.cqltypes.Int32Type]

In [45]:
row=session.execute("select * from bronze_sales_data ");
row.column_names

['order_id',
 'country',
 'item_type',
 'order_date',
 'order_priority',
 'region',
 'sales_channel',
 'ship_date',
 'total_cost',
 'total_profit',
 'total_revenue',
 'unit_cost',
 'unit_price',
 'units_sold']

In [48]:
rows = session.execute("SELECT * FROM bronze_sales_data")

for row in rows:
    session.execute("""
        INSERT INTO silver_sales_data (region, country, item_type, sales_channel, order_id,
            units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit,
            order_date, order_priority, ship_date)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row.region, row.country, row.item_type, row.sales_channel, row.order_id,
        row.units_sold, row.unit_price, row.unit_cost, row.total_revenue,
        row.total_cost, row.total_profit,
        row.order_date, row.order_priority, row.ship_date  # Added values here
    ))

In [49]:
session.execute("""
  ALTER TABLE silver_sales_data DROP order_date;

""")

<cassandra.cluster.ResultSet at 0x7cb740685cc0>

In [50]:
session.execute("""
  ALTER TABLE silver_sales_data DROP order_priority;

""")

<cassandra.cluster.ResultSet at 0x7cb788b6e4a0>

In [51]:
session.execute("""
  ALTER TABLE silver_sales_data DROP ship_date;

""")

<cassandra.cluster.ResultSet at 0x7cb741ca1c00>

In [52]:
session.execute("""
  ALTER TABLE silver_sales_data DROP region;

""")

<cassandra.cluster.ResultSet at 0x7cb740fa07c0>

In [53]:
session.execute("""CREATE TABLE IF NOT EXISTS Gold_data (

    country TEXT,
    item_type TEXT,
    sales_channel TEXT,


    order_id BIGINT PRIMARY KEY,

    units_sold INT,
    unit_price FLOAT,
    unit_cost FLOAT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT
)""");

In [54]:
rows = session.execute("SELECT * FROM silver_sales_data")

for row in rows:
    session.execute("""
        INSERT INTO Gold_data (country, item_type, sales_channel, order_id,
            units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row.country, row.item_type, row.sales_channel, row.order_id,
        row.units_sold, row.unit_price, row.unit_cost, row.total_revenue,
        row.total_cost, row.total_profit
    ))

# Analysis 1:

In [55]:
from collections import defaultdict

rows = session.execute("SELECT sales_channel, total_profit FROM silver_sales_data ")


profit_per_channel = defaultdict(float)
for row in rows:
    profit_per_channel[row.sales_channel] += row.total_profit


for channel, profit in profit_per_channel.items():
    print(f"Sales Channel: {channel}, Total Profit: {profit}")



Sales Channel: Online, Total Profit: 24963806.923095703
Sales Channel: Offline, Total Profit: 16363385.622558594


In [62]:
for channel, profit in profit_per_channel.items():
    session.execute("""
        INSERT INTO gold_sales_channel_profit (sales_channel, total_profit)
        VALUES (%s, %s)
    """, (channel, profit))


# Analysis 2: Total Profit by Country

In [64]:
session.execute("""CREATE TABLE IF NOT EXISTS gold_country_profit (
    country TEXT PRIMARY KEY,
    total_profit FLOAT
);""");

query1 = "SELECT country, total_profit FROM Gold_data"
results1 = session.execute(query1)

# Step 3: Aggregate total profit by country
profit_by_country = defaultdict(float)
for row in results1:
    profit_by_country[row.country] += row.total_profit

# Step 4: Insert aggregated data into the new table
for country, profit in profit_by_country.items():
    session.execute("""
        INSERT INTO gold_country_profit (country, total_profit)
        VALUES (%s, %s)
    """, (country, profit))

# Step 5: Optional: Print the results
for country, profit in profit_by_country.items():
    print(f"Country: {country}, Total Profit: {profit}")


Country: Italy, Total Profit: 1271915.78125
Country: Dominica, Total Profit: 1266075.78125
Country: Taiwan, Total Profit: 851842.189453125
Country: New Zealand, Total Profit: 90640.078125
Country: France, Total Profit: 1001143.4375
Country: Switzerland, Total Profit: 711923.75
Country: Zimbabwe, Total Profit: 1214903.75
Country: United Arab Emirates, Total Profit: 1212378.75
Country: Mauritius , Total Profit: 405388.8125
Country: The Bahamas, Total Profit: 240396.619140625
Country: Canada, Total Profit: 1297765.625
Country: Vanuatu, Total Profit: 517101.130859375
Country: South Africa, Total Profit: 3839.1298828125
Country: East Timor, Total Profit: 22944.810546875
Country: Ethiopia, Total Profit: 115101.9375
Country: Indonesia, Total Profit: 336458.1484375
Country: Serbia, Total Profit: 539637.125
Country: Belgium, Total Profit: 70844.6171875
Country: Togo, Total Profit: 835619.25
Country: Tonga, Total Profit: 530201.6875
Country: Pakistan, Total Profit: 570226.8125
Country: Netherlan

# Analysis 3: Average Unit Price by Item Type and Sales Channel.

In [65]:
session.execute("""CREATE TABLE IF NOT EXISTS gold_avg_price_by_item_channel (
    item_type TEXT,
    sales_channel TEXT,
    avg_unit_price FLOAT,
    PRIMARY KEY (item_type, sales_channel)
);
""");


query2 = "SELECT item_type, sales_channel, unit_price FROM Gold_data"
results2 = session.execute(query2)

# Step 3: Aggregate average price by item type and sales channel
avg_price_by_item_channel = defaultdict(lambda: defaultdict(list))
for row in results2:
    avg_price_by_item_channel[row.item_type][row.sales_channel].append(row.unit_price)

# Step 4: Compute averages and insert data into the new table
for item_type, channels in avg_price_by_item_channel.items():
    for channel, prices in channels.items():
        avg_price = sum(prices) / len(prices) if prices else 0  # Avoid division by zero
        session.execute("""
            INSERT INTO gold_avg_price_by_item_channel (item_type, sales_channel, avg_unit_price)
            VALUES (%s, %s, %s)
        """, (item_type, channel, avg_price))

# Step 5: Optional: Print the results
for item_type, channels in avg_price_by_item_channel.items():
    for channel, prices in channels.items():
        avg_price = sum(prices) / len(prices) if prices else 0
        print(f"Item Type: {item_type}, Sales Channel: {channel}, Average Unit Price: {avg_price}")



Item Type: Cereal, Sales Channel: Online, Average Unit Price: 205.6999969482422
Item Type: Cereal, Sales Channel: Offline, Average Unit Price: 205.6999969482422
Item Type: Household, Sales Channel: Offline, Average Unit Price: 668.27001953125
Item Type: Household, Sales Channel: Online, Average Unit Price: 668.27001953125
Item Type: Beverages, Sales Channel: Online, Average Unit Price: 47.45000076293945
Item Type: Beverages, Sales Channel: Offline, Average Unit Price: 47.45000076293945
Item Type: Cosmetics, Sales Channel: Online, Average Unit Price: 437.20001220703125
Item Type: Cosmetics, Sales Channel: Offline, Average Unit Price: 437.20001220703125
Item Type: Office Supplies, Sales Channel: Offline, Average Unit Price: 651.2100219726562
Item Type: Office Supplies, Sales Channel: Online, Average Unit Price: 651.2100219726562
Item Type: Clothes, Sales Channel: Offline, Average Unit Price: 109.27999877929688
Item Type: Clothes, Sales Channel: Online, Average Unit Price: 109.27999877929