In [49]:
pip install cassandra-driver



In [50]:
pip install --upgrade astrapy



In [51]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json
import uuid
import csv
with open("/content/Bigdata-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

# Connect to Cassandra
auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cloud_config = {"secure_connect_bundle": "/content/secure-connect-bigdata.zip"}
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

# Set keyspace
session.set_keyspace('bdt')

# Confirm connection
if session:
    print("Connected!")
else:
    print("Connection failed.")



Connected!


##```Bronze lever table```

In [52]:
session.execute("""
CREATE TABLE IF NOT EXISTS bronze_sales (
    region TEXT,
    country TEXT,
    item_type TEXT,
    sales_channel TEXT,
    order_priority TEXT,
    order_date TEXT,
    order_id TEXT PRIMARY KEY,
    ship_date TEXT,
    units_sold INT,
    unit_price FLOAT,
    unit_cost FLOAT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT
);
""")

<cassandra.cluster.ResultSet at 0x7f9cc7ea6bf0>

##```Silver level table```

In [53]:
session.execute("""
CREATE TABLE IF NOT EXISTS silver_sales (
    order_id TEXT PRIMARY KEY,
    product_name TEXT,
    category TEXT,
    quantity INT,
    price FLOAT,
    total_price FLOAT,
    sales_date TIMESTAMP,
    region TEXT,
    country TEXT
);
""")

<cassandra.cluster.ResultSet at 0x7f9cc7e17eb0>

##```Gold level table```

Total sales by category

In [54]:
session.execute("""
CREATE TABLE IF NOT EXISTS gold_sales_by_category (
    category TEXT PRIMARY KEY,
    total_sales FLOAT
);
""")

<cassandra.cluster.ResultSet at 0x7f9cc652d1e0>

Monthly sales trends

In [55]:
session.execute("""
CREATE TABLE IF NOT EXISTS gold_monthly_sales (
    month TEXT PRIMARY KEY,
    total_sales FLOAT
);
""")

<cassandra.cluster.ResultSet at 0x7f9cc65360b0>

Top selling products

In [56]:
session.execute("""
CREATE TABLE IF NOT EXISTS gold_top_products (
    product_name TEXT PRIMARY KEY,
    total_sales FLOAT
);
""")

<cassandra.cluster.ResultSet at 0x7f9cc652f130>

##Loading data

In [57]:
import pandas as pd

data = pd.read_csv("https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv")

data = pd.read_csv(
    "https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv",
    delimiter=',',
    encoding='utf-8',
    skipinitialspace=True  # Removes leading/trailing spaces
)

# Rename columns to match expected schema
data.rename(columns={
    'Order ID': 'order_id',
    'Item Type': 'product_name',
    'Order Date': 'sales_date',
    'UnitsSold': 'quantity',
    'UnitPrice': 'price',
    'TotalRevenue': 'total_price'
}, inplace=True)

# Verify column names
print(data.columns)

Index(['Region', 'Country', 'product_name', 'Sales Channel', 'Order Priority',
       'sales_date', 'order_id', 'Ship Date', 'quantity', 'price', 'UnitCost',
       'total_price', 'TotalCost', 'TotalProfit'],
      dtype='object')


In [58]:
from datetime import datetime

# Convert the sales_date column to ISO 8601 format
data['sales_date'] = pd.to_datetime(data['sales_date'], format='%m/%d/%Y').dt.strftime('%Y-%m-%d')

# (Optional) If there are other date columns like ship_date, convert them as well
data['ship_date'] = pd.to_datetime(data['Ship Date'], format='%m/%d/%Y').dt.strftime('%Y-%m-%d')

In [59]:
print(data[['sales_date', 'ship_date']].head())

   sales_date   ship_date
0  2012-07-27  2012-07-28
1  2013-09-14  2013-10-19
2  2015-05-15  2015-06-04
3  2017-05-17  2017-07-02
4  2016-10-26  2016-12-04


In [60]:
data['order_id'] = data['order_id'].astype(str)

In [61]:
#Bronze

for _, row in data.iterrows():
    session.execute("""
    INSERT INTO bronze_sales (region, country, item_type, sales_channel, order_priority, order_date, order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row['Region'],
        row['Country'],
        row['product_name'],
        row['Sales Channel'],
        row['Order Priority'],
        row['sales_date'],
        row['order_id'],
        row['Ship Date'],
        row['quantity'],
        row['price'],
        row['UnitCost'],
        row['total_price'],
        row['TotalCost'],
        row['TotalProfit']
    ))

In [62]:
#Silver

silver_insert_query = """
INSERT INTO silver_sales (
    order_id, product_name, category, quantity, price, total_price, sales_date, region, country
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""


for _, row in data.iterrows():
    category = "default_category"
    session.execute(silver_insert_query, (
        row['order_id'],
        row['product_name'],
        category,
        int(row['quantity']),
        float(row['price']),
        float(row['total_price']),
        row['sales_date'],
        row['Region'],
        row['Country']
    ))

In [63]:
#Gold 1

gold_sales_by_category_insert_query = """
INSERT INTO gold_sales_by_category (
    category, total_sales
) VALUES (%s, %s)
"""

category_sales = data.groupby('product_name')['total_price'].sum().reset_index()

# Insert data into the Gold table for sales by category
for _, row in category_sales.iterrows():
    session.execute(gold_sales_by_category_insert_query, (
        row['product_name'],
        float(row['total_price'])
    ))

In [64]:
#Gold 2

gold_monthly_sales_insert_query = """
INSERT INTO gold_monthly_sales (
    month, total_sales
) VALUES (%s, %s)
"""

data['month'] = data['sales_date'].str[:7]

monthly_sales = data.groupby('month')['total_price'].sum().reset_index()

for _, row in monthly_sales.iterrows():
    session.execute(gold_monthly_sales_insert_query, (
        row['month'],
        float(row['total_price'])
    ))

In [65]:
#Gold 3

gold_top_products_insert_query = """
INSERT INTO gold_top_products (
    product_name, total_sales
) VALUES (%s, %s)
"""

top_products_sales = data.groupby('product_name')['total_price'].sum().reset_index()

top_products_sales = top_products_sales.sort_values(by='total_price', ascending=False)

top_n = 10
top_products_sales = top_products_sales.head(top_n)

# Insert data into the Gold table for top-selling products
for _, row in top_products_sales.iterrows():
    session.execute(gold_top_products_insert_query, (
        row['product_name'],
        float(row['total_price'])
    ))

##```Verifying tables```

In [66]:
rows = session.execute("SELECT * FROM bronze_sales")

# Loop through the rows and print each row
for row in rows:
    print(row)

Row(order_id='940980136', country='New Zealand', item_type='Beverages', order_date='2012-10-11', order_priority='M', region='Australia and Oceania', sales_channel='Online', ship_date='11/4/2012', total_cost=184000.515625, total_profit=90640.078125, total_revenue=274640.59375, unit_cost=31.790000915527344, unit_price=47.45000076293945, units_sold=5788)
Row(order_id='363086831', country='Mali', item_type='Household', order_date='2010-08-19', order_priority='M', region='Sub-Saharan Africa', sales_channel='Offline', ship_date='9/7/2010', total_cost=2169465.25, total_profit=715456.4375, total_revenue=2884921.5, unit_cost=502.5400085449219, unit_price=668.27001953125, units_sold=4317)
Row(order_id='176461303', country='Singapore', item_type='Snacks', order_date='2013-01-28', order_priority='C', region='Asia', sales_channel='Online', ship_date='2/7/2013', total_cost=747949.4375, total_profit=423254.625, total_revenue=1171204.125, unit_cost=97.44000244140625, unit_price=152.5800018310547, unit

In [67]:
rows = session.execute("SELECT * FROM silver_sales")

# Loop through the rows and print each row
for row in rows:
    print(row)

Row(order_id='940980136', category='default_category', country='New Zealand', price=47.45000076293945, product_name='Beverages', quantity=5788, region='Australia and Oceania', sales_date=datetime.datetime(2012, 10, 11, 0, 0), total_price=274640.59375)
Row(order_id='363086831', category='default_category', country='Mali', price=668.27001953125, product_name='Household', quantity=4317, region='Sub-Saharan Africa', sales_date=datetime.datetime(2010, 8, 19, 0, 0), total_price=2884921.5)
Row(order_id='176461303', category='default_category', country='Singapore', price=152.5800018310547, product_name='Snacks', quantity=7676, region='Asia', sales_date=datetime.datetime(2013, 1, 28, 0, 0), total_price=1171204.125)
Row(order_id='438011872', category='default_category', country='Dominica', price=47.45000076293945, product_name='Beverages', quantity=6301, region='Central America and the Caribbean', sales_date=datetime.datetime(2012, 6, 12, 0, 0), total_price=298982.4375)
Row(order_id='135178029',

In [68]:
rows = session.execute("SELECT * FROM gold_sales_by_category;")

# Loop through the rows and print each row
for row in rows:
    print(row)

Row(category='Household', total_sales=38519084.0)
Row(category='Office Supplies', total_sales=27880904.0)
Row(category='Vegetables', total_sales=1135114.125)
Row(category='Snacks', total_sales=2193642.75)
Row(category='Personal Care', total_sales=3191147.75)
Row(category='Meat', total_sales=21278866.0)
Row(category='Fruits', total_sales=615033.625)
Row(category='Beverages', total_sales=2145024.75)
Row(category='Cereal', total_sales=9416123.0)
Row(category='Cosmetics', total_sales=28727100.0)
Row(category='Baby Food', total_sales=5200564.0)
Row(category='Clothes', total_sales=4387373.5)


In [69]:
rows = session.execute("SELECT * FROM gold_monthly_sales;")

# Loop through the rows and print each row
for row in rows:
    print(row)

Row(month='2013-09', total_sales=933620.3125)
Row(month='2010-12', total_sales=4159723.75)
Row(month='2012-09', total_sales=188951.875)
Row(month='2016-09', total_sales=231345.765625)
Row(month='2016-02', total_sales=3897266.5)
Row(month='2015-06', total_sales=626742.8125)
Row(month='2016-11', total_sales=900296.375)
Row(month='2017-04', total_sales=2196359.25)
Row(month='2011-01', total_sales=746767.0)
Row(month='2015-09', total_sales=2101183.25)
Row(month='2016-08', total_sales=130261.7578125)
Row(month='2014-11', total_sales=434357.3125)
Row(month='2013-05', total_sales=868465.375)
Row(month='2013-12', total_sales=4205821.5)
Row(month='2011-07', total_sales=289426.40625)
Row(month='2011-11', total_sales=6907926.5)
Row(month='2011-08', total_sales=69946.71875)
Row(month='2015-12', total_sales=2604084.75)
Row(month='2011-06', total_sales=1973257.625)
Row(month='2014-03', total_sales=4003440.5)
Row(month='2015-04', total_sales=2536801.0)
Row(month='2010-11', total_sales=3470056.5)
Row(

In [70]:
rows = session.execute("SELECT * FROM gold_top_products;")

# Loop through the rows and print each row
for row in rows:
    print(row)

Row(product_name='Household', total_sales=38519084.0)
Row(product_name='Office Supplies', total_sales=27880904.0)
Row(product_name='Snacks', total_sales=2193642.75)
Row(product_name='Personal Care', total_sales=3191147.75)
Row(product_name='Meat', total_sales=21278866.0)
Row(product_name='Beverages', total_sales=2145024.75)
Row(product_name='Cereal', total_sales=9416123.0)
Row(product_name='Cosmetics', total_sales=28727100.0)
Row(product_name='Baby Food', total_sales=5200564.0)
Row(product_name='Clothes', total_sales=4387373.5)
