In [51]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

In [52]:
import json
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

# This secure connect bundle is autogenerated when you download your SCB,
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': 'secure-connect-bd-db.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("BD_DB-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")




Connected!


In [56]:
!pip install cassandra-driver



In [55]:
import requests
import pandas as pd

# Define the source URL and the name for the downloaded file
CSV_SOURCE_URL = "https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv"
DOWNLOAD_FILENAME = "sales_100.csv"

try:
    # Fetch the CSV file from the web
    response = requests.get(CSV_SOURCE_URL)
    response.raise_for_status()  # Check if the request was successful

    # Save the downloaded content to a local file
    with open(DOWNLOAD_FILENAME, 'wb') as file:
        file.write(response.content)

    print(f"The file has been successfully saved as '{DOWNLOAD_FILENAME}'.")

    # Load the CSV file into a pandas DataFrame
    sales_data = pd.read_csv(DOWNLOAD_FILENAME)

    # Display the first few rows of the DataFrame
    print("First 5 rows of the dataset:")
    print(sales_data.head())

except requests.RequestException as download_error:
    print(f"An error occurred during the file download: {download_error}")
except pd.errors.EmptyDataError:
    print("The CSV file appears to be empty.")
except Exception as general_error:
    print(f"An unexpected error occurred: {general_error}")


The file has been successfully saved as 'sales_100.csv'.
First 5 rows of the dataset:
                         Region           Country  Item Type Sales Channel  \
0            Sub-Saharan Africa      South Africa     Fruits       Offline   
1  Middle East and North Africa           Morocco    Clothes        Online   
2         Australia and Oceania  Papua New Guinea       Meat       Offline   
3            Sub-Saharan Africa          Djibouti    Clothes       Offline   
4                        Europe          Slovakia  Beverages       Offline   

  Order Priority  Order Date   Order ID   Ship Date  UnitsSold  UnitPrice  \
0              M   7/27/2012  443368995   7/28/2012       1593       9.33   
1              M   9/14/2013  667593514  10/19/2013       4611     109.28   
2              M   5/15/2015  940995585    6/4/2015        360     421.89   
3              H   5/17/2017  880811536    7/2/2017        562     109.28   
4              L  10/26/2016  174590194   12/4/2016       39

In [54]:
session.execute("""
CREATE TABLE IF NOT EXISTS cassandra.bronze_sales (
    id UUID PRIMARY KEY,
    transaction_id text,
    customer_id text,
    product_id text,
    amount decimal,
    transaction_date text
)
""")

<cassandra.cluster.ResultSet at 0x7ddd3bb50520>

In [57]:
from uuid import uuid4

In [58]:
dataframe.columns = dataframe.columns.str.strip().str.replace(' ', '_')


In [72]:
from datetime import datetime

for _, row in dataframe.iterrows():
    # Parse the Order_Date string to a datetime object
    try:
        transaction_date = datetime.strptime(row['Order_Date'], '%m/%d/%Y').strftime('%Y-%m-%d %H:%M:%S')
    except ValueError:
        print(f"Invalid date format: {row['Order_Date']}")
        continue  # Skip rows with invalid date formats

    # Insert the data into the Cassandra table
    session.execute("""
    INSERT INTO cassandra.bronze_sales (
        id, transaction_id, customer_id, product_id, amount, transaction_date
    ) VALUES (%s, %s, %s, %s, %s, %s)
    """, (
        uuid4(),                             # UUID for the primary key
        str(row['Order_ID']),                # Convert Order_ID to string
        str(row['Region']),                  # Convert Region to string
        str(row['Item_Type']),               # Convert Item_Type to string
        float(row['TotalRevenue']),          # Ensure TotalRevenue is a float
        transaction_date                     # Converted date string
    ))


In [64]:
from datetime import datetime

# Create Silver Table
session.execute("""
CREATE TABLE IF NOT EXISTS cassandra.silver_sales (
    transaction_id text PRIMARY KEY,
    customer_id text,
    product_id text,
    amount decimal,
    transaction_date date
)
""")

# Transform Data for Silver Table
query = "SELECT transaction_id, customer_id, product_id, amount, transaction_date FROM cassandra.bronze_sales"
rows = session.execute(query)

# Insert data into silver_sales
for row in rows:
    # Convert 'transaction_date' to 'YYYY-MM-DD' format (only the date part)
    try:
        # If the date is in 'YYYY-MM-DD HH:MM:SS' format, extract only the date part
        transaction_date = datetime.strptime(row.transaction_date, '%Y-%m-%d %H:%M:%S').date()
    except ValueError:
        # If the 'transaction_date' is already in 'YYYY-MM-DD' format, just parse it
        transaction_date = datetime.strptime(row.transaction_date, '%Y-%m-%d').date()

    # Insert the data into the silver_sales table
    session.execute("""
    INSERT INTO cassandra.silver_sales (transaction_id, customer_id, product_id, amount, transaction_date)
    VALUES (%s, %s, %s, %s, %s)
    """, (
        row.transaction_id,
        row.customer_id,
        row.product_id,
        row.amount,
        transaction_date
    ))


In [65]:
from collections import defaultdict
from decimal import Decimal

In [66]:
from collections import defaultdict

# Gold Table 1: Total Sales by Customer
session.execute("""
CREATE TABLE IF NOT EXISTS cassandra.gold_sales_by_customer (
    customer_id text PRIMARY KEY,
    total_sales decimal
)
""")

# Fetch raw data from silver_sales and aggregate by customer_id
query = "SELECT customer_id, amount FROM cassandra.silver_sales"
rows = session.execute(query)

# Aggregate total sales by customer_id using Python
customer_sales = defaultdict(float)
for row in rows:
    # Convert row.amount (which is Decimal) to float
    customer_sales[row.customer_id] += float(row.amount)

# Insert aggregated data into gold_sales_by_customer
for customer_id, total_sales in customer_sales.items():
    session.execute("""
    INSERT INTO cassandra.gold_sales_by_customer (customer_id, total_sales)
    VALUES (%s, %s)
    """, (customer_id, total_sales))

# Gold Table 2: Total Sales by Product
session.execute("""
CREATE TABLE IF NOT EXISTS cassandra.gold_sales_by_product (
    product_id text PRIMARY KEY,
    total_sales decimal
)
""")

# Fetch raw data from silver_sales and aggregate by product_id
query = "SELECT product_id, amount FROM cassandra.silver_sales"
rows = session.execute(query)

# Aggregate total sales by product_id using Python
product_sales = defaultdict(float)
for row in rows:
    # Convert row.amount (which is Decimal) to float
    product_sales[row.product_id] += float(row.amount)

# Insert aggregated data into gold_sales_by_product
for product_id, total_sales in product_sales.items():
    session.execute("""
    INSERT INTO cassandra.gold_sales_by_product (product_id, total_sales)
    VALUES (%s, %s)
    """, (product_id, total_sales))

# Gold Table 3: Daily Sales Summary
session.execute("""
CREATE TABLE IF NOT EXISTS cassandra.gold_daily_sales (
    transaction_date date PRIMARY KEY,
    total_sales decimal
)
""")

# Fetch raw data from silver_sales and aggregate by transaction_date
query = "SELECT transaction_date, amount FROM cassandra.silver_sales"
rows = session.execute(query)

# Aggregate total sales by transaction_date using Python
daily_sales = defaultdict(float)
for row in rows:
    # Convert row.amount (which is Decimal) to float
    daily_sales[row.transaction_date] += float(row.amount)

# Insert aggregated data into gold_daily_sales
for transaction_date, total_sales in daily_sales.items():
    session.execute("""
    INSERT INTO cassandra.gold_daily_sales (transaction_date, total_sales)
    VALUES (%s, %s)
    """, (transaction_date, total_sales))

In [67]:
# Golden Table 1: Total Sales by Customer
result_customer = session.execute("SELECT * FROM cassandra.gold_sales_by_customer")

# Fetch and print rows using a while loop
row_iterator = iter(result_customer)  # Convert the result to an iterator
while True:
    try:
        row = next(row_iterator)  # Get the next row
        print(row)
    except StopIteration:
        break  # Exit the loop when there are no more rows


Row(customer_id='Australia and Oceania', total_sales=Decimal('10711258.13'))
Row(customer_id='Europe', total_sales=Decimal('34964749.830000006'))
Row(customer_id='Middle East and North Africa', total_sales=Decimal('24765127.25'))
Row(customer_id='Central America and the Caribbean', total_sales=Decimal('17570835.42'))
Row(customer_id='Asia', total_sales=Decimal('28840812.190000005'))
Row(customer_id='Sub-Saharan Africa', total_sales=Decimal('24225437.419999998'))
Row(customer_id='North America', total_sales=Decimal('3611757.5199999996'))


In [68]:
# Golden Table 2: Total Sales by Product
result_product = session.execute("SELECT * FROM cassandra.gold_sales_by_product")
# Fetch and print rows using a while loop
row_iterator = iter(result_product)  # Convert the result to an iterator
while True:
    try:
        row = next(row_iterator)  # Get the next row
        print(row)
    except StopIteration:
        break  # Exit the loop when there are no more rows

Row(product_id='Household', total_sales=Decimal('38519082.8'))
Row(product_id='Office Supplies', total_sales=Decimal('27880904.94'))
Row(product_id='Vegetables', total_sales=Decimal('1135114.08'))
Row(product_id='Snacks', total_sales=Decimal('2193642.66'))
Row(product_id='Personal Care', total_sales=Decimal('3191147.8499999996'))
Row(product_id='Meat', total_sales=Decimal('21278865.93'))
Row(product_id='Fruits', total_sales=Decimal('615033.6'))
Row(product_id='Beverages', total_sales=Decimal('2145024.7'))
Row(product_id='Cereal', total_sales=Decimal('9416123.2'))
Row(product_id='Cosmetics', total_sales=Decimal('28727100.399999995'))
Row(product_id='Baby Food', total_sales=Decimal('5200564.159999999'))
Row(product_id='Clothes', total_sales=Decimal('4387373.4399999995'))


In [69]:
# Golden Table 3: Daily Sales Summary
result_daily_sales = session.execute("SELECT * FROM cassandra.gold_daily_sales")
# Fetch and print rows using a while loop
row_iterator = iter(result_daily_sales)  # Convert the result to an iterator
while True:
    try:
        row = next(row_iterator)  # Get the next row
        print(row)
    except StopIteration:
        break  # Exit the loop when there are no more rows

Row(transaction_date=Date(16267), total_sales=Decimal('5608790.11'))
Row(transaction_date=Date(15042), total_sales=Decimal('167640.85'))
Row(transaction_date=Date(15162), total_sales=Decimal('289426.4'))
Row(transaction_date=Date(17303), total_sales=Decimal('61415.36'))
Row(transaction_date=Date(16781), total_sales=Decimal('33410.73'))
Row(transaction_date=Date(16769), total_sales=Decimal('643018.2'))
Row(transaction_date=Date(14710), total_sales=Decimal('1932962.9'))
Row(transaction_date=Date(15293), total_sales=Decimal('1456356.0'))
Row(transaction_date=Date(15962), total_sales=Decimal('503890.08'))
Row(transaction_date=Date(16151), total_sales=Decimal('4003440.4'))
Row(transaction_date=Date(16110), total_sales=Decimal('74957.22'))
Row(transaction_date=Date(15218), total_sales=Decimal('6666661.52'))
Row(transaction_date=Date(15389), total_sales=Decimal('217368.45'))
Row(transaction_date=Date(17135), total_sales=Decimal('70036.2'))
Row(transaction_date=Date(17048), total_sales=Decimal