In [48]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [49]:
pip install cassandra-driver



In [50]:
import pandas as pd

In [51]:
df = pd.read_csv("/content/drive/MyDrive/BIG DATA TOOLS AND TECHNIQUES - 1/sales_100.csv")

In [52]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json
import pandas as pd

In [53]:
cloud_config= {
  'secure_connect_bundle': '/content/drive/MyDrive/BIG DATA TOOLS AND TECHNIQUES - 1/secure-connect-cassandra.zip'
}
with open("/content/drive/MyDrive/BIG DATA TOOLS AND TECHNIQUES - 1/Cassandra-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


In [33]:
df.columns

Index(['region', 'order_id', 'country', 'item_type', 'order_date',
       'order_priority', 'sales_channel', 'ship_date', 'total_cost',
       'total_profit', 'total_revenue', 'unit_cost', 'unit_price',
       'units_sold'],
      dtype='object')

In [54]:
session.set_keyspace('sales_medallion')

In [56]:
rows = session.execute("SELECT * FROM system_schema.columns WHERE keyspace_name = 'sales_medallion' AND table_name = 'bronze_sales'")
for row in rows:
    print(f"Column: {row.column_name}, Type: {row.type}")

Column: country, Type: text
Column: item_type, Type: text
Column: order_date, Type: text
Column: order_id, Type: bigint
Column: order_priority, Type: text
Column: region, Type: text
Column: sales_channel, Type: text
Column: ship_date, Type: text
Column: total_cost, Type: float
Column: total_profit, Type: float
Column: total_revenue, Type: float
Column: unit_cost, Type: float
Column: unit_price, Type: float
Column: units_sold, Type: int


In [55]:
session.execute("""
CREATE TABLE IF NOT EXISTS bronze_sales (
    order_id TEXT PRIMARY KEY,
    region TEXT,
    country TEXT,
    item_type TEXT,
    sales_channel TEXT,
    order_priority TEXT,
    order_date TEXT,
    ship_date TEXT,
    units_sold INT,
    unit_price DECIMAL,
    unit_cost DECIMAL,
    total_revenue DECIMAL,
    total_cost DECIMAL,
    total_profit DECIMAL
)
""")

<cassandra.cluster.ResultSet at 0x78fa0578ef50>

In [57]:
rows = session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name = 'sales_medallion'")
tables = [row.table_name for row in rows]
if 'bronze_sales' in tables:
    print("The bronze_sales table exists.")
else:
    print("The bronze_sales table does not exist.")

The bronze_sales table exists.


In [58]:
# Import data from CSV
df = pd.read_csv('/content/drive/MyDrive/BIG DATA TOOLS AND TECHNIQUES - 1/sales_100.csv')

# Prepare the insert statement
insert_statement = session.prepare("""
    INSERT INTO bronze_sales (
        order_id, region, country, item_type, sales_channel, order_priority,
        order_date, ship_date, units_sold, unit_price, unit_cost,
        total_revenue, total_cost, total_profit
    )
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""")

# Insert data row by row
for _, row in df.iterrows():
    session.execute(insert_statement, (
        int(row['Order ID']),  # Convert to int for bigint
        row['Region'],
        row['Country'],
        row['Item Type'],
        row['Sales Channel'],
        row['Order Priority'],
        row['Order Date'],
        row['Ship Date'],
        int(row['UnitsSold']),  # Ensure this is an int
        float(row['UnitPrice']),  # Convert to float
        float(row['UnitCost']),  # Convert to float
        float(row['TotalRevenue']),  # Convert to float
        float(row['TotalCost']),  # Convert to float
        float(row['TotalProfit'])  # Convert to float
    ))

print("Data inserted into bronze_sales table.")

Data inserted into bronze_sales table.


In [59]:
# Retrieve and display all rows from the bronze_sales table
print("Data in bronze_sales table:")
rows = session.execute("SELECT * FROM bronze_sales")
for row in rows:
    print(row)

Data in bronze_sales table:
Row(order_id=294530856, country='Italy', item_type='Cereal', order_date='11/15/2011', order_priority='M', region='Europe', sales_channel='Online', ship_date='12/28/2011', total_cost=829138.8125, total_profit=627217.1875, total_revenue=1456356.0, unit_cost=117.11000061035156, unit_price=205.6999969482422, units_sold=7080)
Row(order_id=274930989, country='Dominica', item_type='Household', order_date='11/19/2011', order_priority='C', region='Central America and the Caribbean', sales_channel='Offline', ship_date='12/13/2011', total_cost=3539891.75, total_profit=1167402.125, total_revenue=4707294.0, unit_cost=502.5400085449219, unit_price=668.27001953125, units_sold=7044)
Row(order_id=498071897, country='Taiwan', item_type='Cereal', order_date='4/11/2010', order_priority='H', region='Asia', sales_channel='Online', ship_date='5/26/2010', total_cost=1100482.625, total_profit=832480.25, total_revenue=1932962.875, unit_cost=117.11000061035156, unit_price=205.69999694

In [51]:
# Create Silver table with region as part of the primary key
session.execute("""
CREATE TABLE IF NOT EXISTS silver_sales (
    order_id BIGINT,
    region TEXT,
    country TEXT,
    item_type TEXT,
    sales_channel TEXT,
    order_priority TEXT,
    order_date DATE,
    ship_date DATE,
    units_sold INT,
    unit_price FLOAT,
    unit_cost FLOAT,
    total_revenue FLOAT,
    total_cost FLOAT,
    total_profit FLOAT,
    PRIMARY KEY (region, order_id)  -- Include region in the primary key
)
""")

<cassandra.cluster.ResultSet at 0x7f6125635360>

In [52]:
# Transfer and clean data from Bronze to Silver
rows = session.execute("SELECT * FROM bronze_sales")
for row in rows:
    clean_order_date = pd.to_datetime(row.order_date).date()
    clean_ship_date = pd.to_datetime(row.ship_date).date()

    session.execute("""
    INSERT INTO silver_sales (
        order_id, region, country, item_type, sales_channel, order_priority,
        order_date, ship_date, units_sold, unit_price, unit_cost,
        total_revenue, total_cost, total_profit
    )
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        row.order_id,
        row.region,
        row.country,
        row.item_type,
        row.sales_channel,
        row.order_priority,
        clean_order_date,
        clean_ship_date,
        row.units_sold,
        row.unit_price,
        row.unit_cost,
        row.total_revenue,
        row.total_cost,
        row.total_profit
    ))

print("Data cleaned and inserted into silver_sales table.")

Data cleaned and inserted into silver_sales table.


In [54]:
# Retrieve and display all rows from the silver_sales table
print("\nData in silver_sales table:")
silver_rows = session.execute("SELECT * FROM silver_sales")
for silver_row in silver_rows:
    print(silver_row)


Data in silver_sales table:
Row(region='Australia and Oceania', order_id=101328551, country='Solomon Islands', item_type='Household', order_date=Date(14960), order_priority='M', sales_channel='Online', ship_date=Date(14971), total_cost=2123231.5, total_profit=700209.25, total_revenue=2823440.75, unit_cost=502.5400085449219, unit_price=668.27001953125, units_sold=4225)
Row(region='Australia and Oceania', order_id=156295812, country='East Timor', item_type='Cereal', order_date=Date(16776), order_priority='M', sales_channel='Offline', ship_date=Date(16799), total_cost=30331.490234375, total_profit=22944.810546875, total_revenue=53276.30078125, unit_cost=117.11000061035156, unit_price=205.6999969482422, units_sold=259)
Row(region='Australia and Oceania', order_id=571997869, country='Vanuatu', item_type='Fruits', order_date=Date(16012), order_priority='C', sales_channel='Online', ship_date=Date(16020), total_cost=39686.19921875, total_profit=13821.349609375, total_revenue=53507.55078125, u

In [41]:
# 3. Create Gold Tables
session.execute("""
CREATE TABLE IF NOT EXISTS gold_sales_by_region (
    region TEXT PRIMARY KEY,
    total_revenue FLOAT,
    total_profit FLOAT
)
""")
# Populate Gold Table 1: Sales by Region
region_data = session.execute("SELECT region, total_revenue, total_profit FROM silver_sales")
region_df = pd.DataFrame(region_data, columns=['region', 'total_revenue', 'total_profit'])
region_summary = region_df.groupby('region').sum().reset_index()
for _, row in region_summary.iterrows():
    session.execute("""
    INSERT INTO gold_sales_by_region (region, total_revenue, total_profit)
    VALUES (%s, %s, %s)
    """, (row['region'], row['total_revenue'], row['total_profit']))

session.execute("""
CREATE TABLE IF NOT EXISTS gold_top_items (
    item_type TEXT PRIMARY KEY,
    total_revenue FLOAT,
    units_sold INT
)
""")
# Populate Gold Table 2: Top Items by Revenue
item_data = session.execute("SELECT item_type, total_revenue, units_sold FROM silver_sales")
item_df = pd.DataFrame(item_data, columns=['item_type', 'total_revenue', 'units_sold'])
item_summary = item_df.groupby('item_type').sum().reset_index()
for _, row in item_summary.iterrows():
    session.execute("""
    INSERT INTO gold_top_items (item_type, total_revenue, units_sold)
    VALUES (%s, %s, %s)
    """, (row['item_type'], row['total_revenue'], row['units_sold']))

session.execute("""
CREATE TABLE IF NOT EXISTS gold_channel_profit (
    sales_channel TEXT PRIMARY KEY,
    total_profit FLOAT
)
""")

print("Gold tables created.")

# Populate Gold Table 3: Profitability by Sales Channel
channel_data = session.execute("SELECT sales_channel, total_profit FROM silver_sales")
channel_df = pd.DataFrame(channel_data, columns=['sales_channel', 'total_profit'])
channel_summary = channel_df.groupby('sales_channel').sum().reset_index()
for _, row in channel_summary.iterrows():
    session.execute("""
    INSERT INTO gold_channel_profit (sales_channel, total_profit)
    VALUES (%s, %s)
    """, (row['sales_channel'], row['total_profit']))

print("Data inserted into Gold tables.")

Gold tables created.
Data inserted into Gold tables.


In [44]:
# Retrieve and display all data for gold_sales_by_region
print("\nData in gold_sales_by_region:")
rows = session.execute("SELECT * FROM gold_sales_by_region")
for row in rows:
    print(f"Region: {row.region}, Total Revenue: {row.total_revenue}, Total Profit: {row.total_profit}")


Data in gold_sales_by_region:
Region: Australia and Oceania, Total Revenue: 10711258.0, Total Profit: 3486940.0
Region: Europe, Total Revenue: 34964748.0, Total Profit: 11267281.0
Region: Middle East and North Africa, Total Revenue: 24765128.0, Total Profit: 6514262.0
Region: Central America and the Caribbean, Total Revenue: 17570836.0, Total Profit: 4252300.0
Region: Asia, Total Revenue: 28840812.0, Total Profit: 6749896.0
Region: Sub-Saharan Africa, Total Revenue: 24225438.0, Total Profit: 7651892.0
Region: North America, Total Revenue: 3611757.5, Total Profit: 1404621.5


In [45]:
# Retrieve and display all data for gold_top_items
print("\nData in gold_top_items:")
rows = session.execute("SELECT * FROM gold_top_items")
for row in rows:
    print(f"Item Type: {row.item_type}, Total Revenue: {row.total_revenue}, Units Sold: {row.units_sold}")


Data in gold_top_items:
Item Type: Household, Total Revenue: 38519084.0, Units Sold: 57640
Item Type: Office Supplies, Total Revenue: 27880904.0, Units Sold: 42814
Item Type: Vegetables, Total Revenue: 1135114.125, Units Sold: 7368
Item Type: Snacks, Total Revenue: 2193642.75, Units Sold: 14377
Item Type: Personal Care, Total Revenue: 3191147.75, Units Sold: 39045
Item Type: Meat, Total Revenue: 21278866.0, Units Sold: 50437
Item Type: Fruits, Total Revenue: 615033.625, Units Sold: 65920
Item Type: Beverages, Total Revenue: 2145024.75, Units Sold: 45206
Item Type: Cereal, Total Revenue: 9416123.0, Units Sold: 45776
Item Type: Cosmetics, Total Revenue: 28727100.0, Units Sold: 65707
Item Type: Baby Food, Total Revenue: 5200564.0, Units Sold: 20372
Item Type: Clothes, Total Revenue: 4387373.5, Units Sold: 40148


In [46]:
# Retrieve and display all data for gold_channel_profit
print("\nData in gold_channel_profit:")
rows = session.execute("SELECT * FROM gold_channel_profit")
for row in rows:
    print(f"Sales Channel: {row.sales_channel}, Total Profit: {row.total_profit}")



Data in gold_channel_profit:
Sales Channel: Online, Total Profit: 24963806.0
Sales Channel: Offline, Total Profit: 16363386.0
