In [1]:
!pip install pandas cassandra-driver

# Import the necessary libraries
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

Collecting cassandra-driver
  Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.2 kB)
Collecting geomet<0.3,>=0.1 (from cassandra-driver)
  Downloading geomet-0.2.1.post1-py3-none-any.whl.metadata (1.0 kB)
Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m28.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.29.2 geomet-0.2.1.post1


In [3]:

cloud_config= {
  'secure_connect_bundle': '/content/sample_data/secure-connect-big-data2.zip'
}


with open("/content/sample_data/Big_Data2-token.json") as f:
    secrets = json.load(f)


CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


In [15]:
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

# Read CSV
df = pd.read_csv('/content/sample_data/sales_100.csv')

# Cassandra Connection
cloud_config= {
    'secure_connect_bundle': '/content/sample_data/secure-connect-big-data2.zip'
}
auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

# Switch to existing keyspace
session.set_keyspace('sales_medallion1')  # Using the existing keyspace name


# Create Bronze Layer Table
session.execute("""
    CREATE TABLE IF NOT EXISTS bronze_sales (
        order_id bigint PRIMARY KEY,
        region text,
        country text,
        item_type text,
        sales_channel text,
        order_priority text,
        order_date text,
        ship_date text,
        units_sold int,
        unit_price decimal,
        unit_cost decimal,
        total_revenue decimal,
        total_cost decimal,
        total_profit decimal
    )
""")

# Insert Data into Bronze Layer
insert_query = """
    INSERT INTO bronze_sales (
        order_id, region, country, item_type, sales_channel,
        order_priority, order_date, ship_date, units_sold,
        unit_price, unit_cost, total_revenue, total_cost, total_profit
    ) VALUES (
        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
    )
"""

# Prepare and execute batch inserts
batch = session.prepare(insert_query)
for _, row in df.iterrows():
    session.execute(batch, (
        int(row['Order ID']), row['Region'], row['Country'],
        row['Item Type'], row['Sales Channel'], row['Order Priority'],
        row['Order Date'], row['Ship Date'], int(row['UnitsSold']),
        float(row['UnitPrice']), float(row['UnitCost']),
        float(row['TotalRevenue']), float(row['TotalCost']),
        float(row['TotalProfit'])
    ))

print("Bronze Layer Data Ingestion Complete!")



Bronze Layer Data Ingestion Complete!


In [20]:
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import datetime

# Cassandra Connection (same as Bronze Layer script)
cloud_config= {
    'secure_connect_bundle': '/content/sample_data/secure-connect-big-data2.zip'
}
# Use the correct credentials obtained earlier
auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
# Specify cloud_config using the cloud parameter
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect('sales_medallion1')


# Create Silver Layer Tables with Data Quality and Transformation
session.execute("""
    CREATE TABLE IF NOT EXISTS silver_sales_cleaned (
        order_id bigint PRIMARY KEY,
        region text,
        country text,
        item_type text,
        sales_channel text,
        order_priority text,
        order_date date,
        ship_date date,
        units_sold int,
        unit_price decimal,
        unit_cost decimal,
        total_revenue decimal,
        total_cost decimal,
        total_profit decimal,
        order_year int,
        order_month int
    )
""")

# Function to transform data
def transform_data(row):
    order_date = datetime.datetime.strptime(row['order_date'], '%m/%d/%Y')
    return {
        'order_id': int(row['order_id']),
        'region': row['region'],
        'country': row['country'],
        'item_type': row['item_type'],
        'sales_channel': row['sales_channel'],
        'order_priority': row['order_priority'],
        'order_date': order_date.date(),
        'ship_date': datetime.datetime.strptime(row['ship_date'], '%m/%d/%Y').date(),
        'units_sold': int(row['units_sold']),
        'unit_price': float(row['unit_price']),
        'unit_cost': float(row['unit_cost']),
        'total_revenue': float(row['total_revenue']),
        'total_cost': float(row['total_cost']),
        'total_profit': float(row['total_profit']),
        'order_year': order_date.year,
        'order_month': order_date.month
    }

# Insert Transformed Data
insert_query = """
    INSERT INTO silver_sales_cleaned (
        order_id, region, country, item_type, sales_channel,
        order_priority, order_date, ship_date, units_sold,
        unit_price, unit_cost, total_revenue, total_cost,
        total_profit, order_year, order_month
    ) VALUES (
        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?  )
"""

# Fetch Bronze Layer Data and Transform
rows = session.execute("SELECT * FROM bronze_sales")
batch = session.prepare(insert_query)

for row in rows:
    transformed_row = transform_data(row._asdict())
    session.execute(batch, tuple(transformed_row.values()))

print("Silver Layer Data Transformation Complete!")



Silver Layer Data Transformation Complete!


In [40]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

# Cassandra Connection
cloud_config= {
    'secure_connect_bundle': '/content/sample_data/secure-connect-big-data2.zip'
}
auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect('sales_medallion1')


# Gold Layer 1: Yearly Sales by Region
session.execute("""
    CREATE TABLE IF NOT EXISTS gold_yearly_sales_by_region (
        order_year int,
        region text,
        total_revenue decimal,
        total_units_sold int,
        total_profit decimal,
        PRIMARY KEY ((order_year), region)
    )
""")

# Fetch data and insert individually
select_query = """
    SELECT order_year, region, total_revenue, units_sold, total_profit
    FROM silver_sales_cleaned
    WHERE order_year = ? ALLOW FILTERING
"""

insert_query = """
    INSERT INTO gold_yearly_sales_by_region (order_year, region, total_revenue, total_units_sold, total_profit)
    VALUES (?, ?, ?, ?, ?)
"""
prepared_insert = session.prepare(insert_query)
prepared_select = session.prepare(select_query)

# Fetch all order_year values
fetch_years_query = "SELECT order_year FROM silver_sales_cleaned"
rows = session.execute(fetch_years_query)

# Extract distinct years using a set
years = {row.order_year for row in rows}
print("Distinct Years:", years)

# Iterate through years and regions to aggregate
for year in years:
    rows = session.execute(prepared_select, (year,))
    for row in rows:

        regional_totals = {}
        if row.region in regional_totals:
            regional_totals[row.region]['total_revenue'] += row.total_revenue
            regional_totals[row.region]['total_units_sold'] += row.units_sold
            regional_totals[row.region]['total_profit'] += row.total_profit
        else:
            regional_totals[row.region] = {
                'total_revenue': row.total_revenue,
                'total_units_sold': row.units_sold,
                'total_profit': row.total_profit
            }

    # Insert aggregated data for the current year
    for region, totals in regional_totals.items():
        session.execute(prepared_insert, (year, region, totals['total_revenue'], totals['total_units_sold'], totals['total_profit']))


# Gold Layer 2: Sales by Sales Channel
session.execute("""
    CREATE TABLE IF NOT EXISTS gold_sales_by_channel (
        sales_channel text PRIMARY KEY,
        total_revenue decimal,
        total_units_sold int,
        avg_unit_price decimal
    )
""")

# Fetch data and aggregate using Python
select_channel_data = """
    SELECT sales_channel, total_revenue, units_sold, unit_price
    FROM silver_sales_cleaned
"""

channel_data = session.execute(select_channel_data)

channel_totals = {}
for row in channel_data:
    if row.sales_channel in channel_totals:
        channel_totals[row.sales_channel]['total_revenue'] += row.total_revenue
        channel_totals[row.sales_channel]['total_units_sold'] += row.units_sold
        channel_totals[row.sales_channel]['total_unit_price_sum'] += row.unit_price
        channel_totals[row.sales_channel]['count'] += 1
    else:
        channel_totals[row.sales_channel] = {
            'total_revenue': row.total_revenue,
            'total_units_sold': row.units_sold,
            'total_unit_price_sum': row.unit_price,
            'count': 1
        }




Distinct Years: {2016, 2017, 2010, 2011, 2012, 2013, 2014, 2015}


In [45]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json


table_name = "bronze_sales"
columns = ["order_id", "region", "country", "item_type", "sales_channel"]

# Build the SELECT query
select_query = f"SELECT {', '.join(columns)} FROM {table_name}"

# Execute the query and print only the first 5 records
rows = session.execute(select_query)
for i, row in enumerate(rows):
    if i < 5:
        print(row)
    else:
        break

Row(order_id=294530856, region='Europe', country='Italy', item_type='Cereal', sales_channel='Online')
Row(order_id=274930989, region='Central America and the Caribbean', country='Dominica', item_type='Household', sales_channel='Offline')
Row(order_id=498071897, region='Asia', country='Taiwan', item_type='Cereal', sales_channel='Online')
Row(order_id=940980136, region='Australia and Oceania', country='New Zealand', item_type='Beverages', sales_channel='Online')
Row(order_id=324669444, region='Europe', country='France', item_type='Cosmetics', sales_channel='Online')


In [46]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

# Cassandra Connection
cloud_config= {
    'secure_connect_bundle': '/content/sample_data/secure-connect-big-data2.zip'
}
auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect('sales_medallion1')


# Specify the table and columns you want to select
table_name = "silver_sales_cleaned"  # Replace with your table name
columns = ["order_id", "region", "country", "item_type", "sales_channel"]  # Replace with your desired columns

# Build the SELECT query
select_query = f"SELECT {', '.join(columns)} FROM {table_name}"

# Execute the query and print only the first 5 records
rows = session.execute(select_query)
for i, row in enumerate(rows):
    if i < 5:
        print(row)
    else:
        break



Row(order_id=294530856, region='Europe', country='Italy', item_type='Cereal', sales_channel='Online')
Row(order_id=274930989, region='Central America and the Caribbean', country='Dominica', item_type='Household', sales_channel='Offline')
Row(order_id=498071897, region='Asia', country='Taiwan', item_type='Cereal', sales_channel='Online')
Row(order_id=940980136, region='Australia and Oceania', country='New Zealand', item_type='Beverages', sales_channel='Online')
Row(order_id=324669444, region='Europe', country='France', item_type='Cosmetics', sales_channel='Online')


In [51]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

# Cassandra Connection (Reusing existing connection from your notebook)
cloud_config= {
    'secure_connect_bundle': '/content/sample_data/secure-connect-big-data2.zip'
}
auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect('sales_medallion1')

# Function to print 5 records from a table
def print_5_records(table_name):
    select_query = f"SELECT * FROM {table_name}"
    rows = session.execute(select_query)
    for i, row in enumerate(rows):
        if i < 5:
            print(row)
        else:
            break
    print("-" * 20)  # Separator between tables


# Print records from gold_yearly_sales_by_region
print_5_records("gold_yearly_sales_by_region")



Row(order_year=2014, region='Sub-Saharan Africa', total_profit=Decimal('11302.059999999999490682967007160186767578125'), total_revenue=Decimal('36860.2300000000032014213502407073974609375'), total_units_sold=451)
Row(order_year=2010, region='Australia and Oceania', total_profit=Decimal('700209.25'), total_revenue=Decimal('2823440.75'), total_units_sold=4225)
Row(order_year=2012, region='Europe', total_profit=Decimal('756335.400000000023283064365386962890625'), total_revenue=Decimal('2014159.19999999995343387126922607421875'), total_units_sold=7890)
Row(order_year=2017, region='Sub-Saharan Africa', total_profit=Decimal('113120'), total_revenue=Decimal('583484.160000000032596290111541748046875'), total_units_sold=896)
Row(order_year=2015, region='Europe', total_profit=Decimal('180477.0599999999976716935634613037109375'), total_revenue=Decimal('453813.599999999976716935634613037109375'), total_units_sold=1038)
--------------------
