In [1]:
!pip install cassandra-driver

Collecting cassandra-driver
  Downloading cassandra_driver-3.29.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.2 kB)
Collecting geomet<0.3,>=0.1 (from cassandra-driver)
  Downloading geomet-0.2.1.post1-py3-none-any.whl.metadata (1.0 kB)
Downloading cassandra_driver-3.29.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m48.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.29.2 geomet-0.2.1.post1


In [2]:
# Import the necessary libraries
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

##### Reference: youtube video link provided in assignment

**Connection Setup**

In [4]:
cloud_config= {
  'secure_connect_bundle': 'secure-connect-db-sales.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("db_sales-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


**Creating Bronze Table**

In [11]:
def create_bronze_tables(session):
    session.execute("""
    CREATE TABLE IF NOT EXISTS nikitha.bronze_sales (
        id UUID PRIMARY KEY,
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date TEXT,
        order_id TEXT,
        ship_date TEXT,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    );
    """)
create_bronze_tables(session)


In [12]:
import uuid
import pandas as pd

filename = 'sales_100.csv'
dataset = pd.read_csv(filename)

def insert_into_bronze_table(session, dataset):
    for _, row in dataset.iterrows():
        session.execute("""
        INSERT INTO nikitha.bronze_sales (id, region, country, item_type, sales_channel, order_priority, order_date, order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
        """, (uuid.uuid4(), row['Region'], row['Country'], row['Item Type'], row['Sales Channel'], row['Order Priority'], row['Order Date'], str(row['Order ID']), row['Ship Date'], row['UnitsSold'], row['UnitPrice'], row['UnitCost'], row['TotalRevenue'], row['TotalCost'], row['TotalProfit']))

insert_into_bronze_table(session, dataset)

In [13]:
session = cluster.connect()
data = session.execute("SELECT * FROM nikitha.bronze_sales;")
row_count = sum(1 for _ in data)
print(f"Total Rows in nikitha.bronze_sales: {row_count}")
rows = session.execute("SELECT * FROM nikitha.bronze_sales")
for row in rows:
    print(row)
session.shutdown()

Total Rows in nikitha.bronze_sales: 99
Row(id=UUID('fe85bc70-a80e-438e-92ac-c181c0db59f1'), country='Nicaragua', item_type='Baby Food', order_date='11/11/2016', order_id='343752610', order_priority='L', region='Central America and the Caribbean', sales_channel='Online', ship_date='11/16/2016', total_cost=460883.21875, total_profit=277131.25, total_revenue=738014.5, unit_cost=159.4199981689453, unit_price=255.27999877929688, units_sold=2891)
Row(id=UUID('26f2ffa1-a775-4b41-b315-ab58b3307117'), country='Turkmenistan', item_type='Meat', order_date='4/30/2017', order_id='757619178', order_priority='M', region='Asia', sales_channel='Online', ship_date='5/14/2017', total_cost=1898576.125, total_profit=297783.1875, total_revenue=2196359.25, unit_cost=364.69000244140625, unit_price=421.8900146484375, units_sold=5206)
Row(id=UUID('8351afc3-2faa-47fb-880b-c60de4f5f402'), country='Iceland', item_type='Baby Food', order_date='10/2/2010', order_id='678230941', order_priority='M', region='Europe', s

In [15]:
def data_profiling(session, keyspace, table_name):
    session = cluster.connect()
    rows = session.execute(f"SELECT * FROM {keyspace}.{table_name}")
    df = pd.DataFrame(list(rows))
    df = df.drop(columns=['id'])
    print(f"Data Profiling for table: {keyspace}.{table_name}")
    print("-" * 30)
    print(df.info())  # Data types and non-null counts
    print("\nDescriptive Statistics:")
    print(df.describe(include='all'))  # Summary statistics for numerical and categorical columns
    print("\nUnique Values per column")
    for col in df.columns:
      print(f"{col}: {df[col].nunique()}")
    # Add more detailed profiling as needed (e.g., missing value counts, value distributions)
keyspace_name = "nikitha"
table_name = "bronze_sales"
data_profiling(session, keyspace_name, table_name)

session.shutdown()


Data Profiling for table: nikitha.bronze_sales
------------------------------
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 99 entries, 0 to 98
Data columns (total 14 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   country         99 non-null     object 
 1   item_type       99 non-null     object 
 2   order_date      99 non-null     object 
 3   order_id        99 non-null     object 
 4   order_priority  99 non-null     object 
 5   region          99 non-null     object 
 6   sales_channel   99 non-null     object 
 7   ship_date       99 non-null     object 
 8   total_cost      99 non-null     float64
 9   total_profit    99 non-null     float64
 10  total_revenue   99 non-null     float64
 11  unit_cost       99 non-null     float64
 12  unit_price      99 non-null     float64
 13  units_sold      99 non-null     int64  
dtypes: float64(5), int64(1), object(8)
memory usage: 11.0+ KB
None

Descriptive Statistics:
      

**No nulls, any edge cases.**

**Creating Silver Table**

In [17]:
def create_silver_table(session):
    session = cluster.connect()
    session.execute("""
    CREATE TABLE IF NOT EXISTS nikitha.silver_sales (
        id UUID PRIMARY KEY,
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date DATE,
        order_id BIGINT,
        ship_date DATE,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    );
    """)
create_silver_table(session)

In [22]:
from datetime import datetime
session = cluster.connect()
session.set_keyspace('nikitha')
rows = session.execute("SELECT * FROM bronze_sales;")
data = []
for row in rows:
    data.append(row._asdict())
df = pd.DataFrame(data)
def convert_to_date(date_string):
    try:
        return datetime.strptime(date_string, "%m/%d/%Y").date()
    except ValueError:
        return None
def convert_to_bigint(order_id_string):
    try:
        return int(order_id_string)
    except ValueError:
        return None
for _, row in df.iterrows():
    order_date = convert_to_date(row['order_date'])
    ship_date = convert_to_date(row['ship_date'])
    order_id = convert_to_bigint(row['order_id'])

    session.execute("""
    INSERT INTO nikitha.silver_sales (
        id, region, country, item_type, sales_channel, order_priority, order_date,
        order_id, ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit
    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
    """, (
        uuid.uuid4(),
        row['region'],
        row['country'],
        row['item_type'],
        row['sales_channel'],
        row['order_priority'],
        order_date,
        order_id,
        ship_date,
        row['units_sold'],
        row['unit_price'],
        row['unit_cost'],
        row['total_revenue'],
        row['total_cost'],
        row['total_profit']
    ))


In [23]:
rows = session.execute("SELECT * FROM example.silver_sales")
for row in rows:
    print(row)

Row(id=UUID('f56b7e15-f7ae-4a0c-a238-4d6d1b7c3b71'), country='Vanuatu', item_type='Fruits', order_date=Date(16012), order_id=571997869, order_priority='C', region='Australia and Oceania', sales_channel='Online', ship_date=Date(16020), total_cost=39686.19921875, total_profit=13821.349609375, total_revenue=53507.55078125, unit_cost=6.920000076293945, unit_price=9.329999923706055, units_sold=5735)
Row(id=UUID('8ebc79c2-8dfb-4eb2-9ddd-35fd34e54644'), country='Italy', item_type='Cereal', order_date=Date(15293), order_id=294530856, order_priority='M', region='Europe', sales_channel='Online', ship_date=Date(15336), total_cost=829138.8125, total_profit=627217.1875, total_revenue=1456356.0, unit_cost=117.11000061035156, unit_price=205.6999969482422, units_sold=7080)
Row(id=UUID('916bc703-7ded-4ac4-885f-9c237190c6d9'), country='Tanzania', item_type='Beverages', order_date=Date(17135), order_id=659878194, order_priority='L', region='Sub-Saharan Africa', sales_channel='Online', ship_date=Date(1718

**Create Gold Tables**

In [36]:
def create_gold_tables(session):
    session.execute("""
        CREATE TABLE IF NOT EXISTS nikitha.gold_total_sales_by_region (
            region TEXT PRIMARY KEY,
            total_revenue FLOAT
        );
    """)

    session.execute("""
        CREATE TABLE IF NOT EXISTS nikitha.gold_sales_by_item_type (
            item_type TEXT,
            order_date DATE,
            total_revenue FLOAT,
            PRIMARY KEY (item_type, order_date)
        );
    """)

    session.execute("""
        CREATE TABLE IF NOT EXISTS nikitha.gold_sales_by_country_and_channel (
            country TEXT,
            sales_channel TEXT,
            total_profit FLOAT,
            PRIMARY KEY (country, sales_channel)
        );
    """)

    session.execute("""
        CREATE TABLE IF NOT EXISTS nikitha.gold_average_unit_price_by_priority (
            order_priority TEXT PRIMARY KEY,
            average_unit_price FLOAT
        );
    """)

# Call the function to create the tables
create_gold_tables(session)

In [32]:
import pandas as pd
def populate_gold_table(session):
    rows = session.execute("SELECT region, total_revenue FROM nikitha.silver_sales;")
    df = pd.DataFrame(list(rows))
    #print(df)
    region_sales = df.groupby('region')['total_revenue'].sum().reset_index()
    #print(region_sales)

    for _, row in region_sales.iterrows():
      session.execute("""
        INSERT INTO nikitha.gold_total_sales_by_region (region, total_revenue)
        VALUES (%s, %s);""", (row['region'], row['total_revenue']))

    rows = session.execute("SELECT item_type, order_date, total_revenue FROM nikitha.silver_sales;")
    df = pd.DataFrame(list(rows))
    item_sales = df.groupby(['item_type', 'order_date'])['total_revenue'].sum().reset_index()

    for _, row in item_sales.iterrows():
      session.execute("""
        INSERT INTO nikitha.gold_sales_by_item_type (item_type, order_date, total_revenue)
        VALUES (%s, %s, %s);""", (row['item_type'], row['order_date'], row['total_revenue']))

    rows = session.execute("SELECT country, sales_channel, total_profit FROM nikitha.silver_sales;")
    df = pd.DataFrame(list(rows))
    channel_profit = df.groupby(['country', 'sales_channel'])['total_profit'].sum().reset_index()

    for _, row in channel_profit.iterrows():
      session.execute("""
        INSERT INTO nikitha.gold_sales_by_country_and_channel (country, sales_channel, total_profit)
        VALUES (%s, %s, %s);""", (row['country'], row['sales_channel'], row['total_profit']))
    rows = session.execute("SELECT order_priority, unit_price FROM nikitha.silver_sales;")
    df = pd.DataFrame(list(rows))
    priority_price = df.groupby('order_priority')['unit_price'].mean().reset_index()

    for _, row in priority_price.iterrows():
      session.execute("""
        INSERT INTO nikitha.gold_average_unit_price_by_priority (order_priority, average_unit_price)
        VALUES (%s, %s);""", (row['order_priority'], row['unit_price']))

In [33]:
populate_gold_table(session)

                   region  total_revenue
0   Australia and Oceania   5.350755e+04
1                  Europe   1.456356e+06
2      Sub-Saharan Africa   7.003620e+04
3      Sub-Saharan Africa   1.234359e+04
4      Sub-Saharan Africa   2.884922e+06
..                    ...            ...
94     Sub-Saharan Africa   2.636753e+06
95     Sub-Saharan Africa   8.116167e+04
96                 Europe   1.885188e+05
97                   Asia   8.684654e+05
98                 Europe   8.029894e+05

[99 rows x 2 columns]
                              region  total_revenue
0                               Asia   2.884081e+07
1              Australia and Oceania   1.071126e+07
2  Central America and the Caribbean   1.757084e+07
3                             Europe   3.496475e+07
4       Middle East and North Africa   2.476513e+07
5                      North America   3.611757e+06
6                 Sub-Saharan Africa   2.422544e+07


In [35]:
rows = session.execute("SELECT * FROM nikitha.gold_total_sales_by_region")
for row in rows:
    print(row)
rows = session.execute("SELECT * FROM nikitha.gold_sales_by_item_type")
for row in rows:
    print(row)
rows = session.execute("SELECT * FROM nikitha.gold_sales_by_country_and_channel")
for row in rows:
    print(row)
rows = session.execute("SELECT * FROM nikitha.gold_average_unit_price_by_priority")
for row in rows:
    print(row)

Row(region='Australia and Oceania', total_revenue=10711258.0)
Row(region='Europe', total_revenue=34964748.0)
Row(region='Middle East and North Africa', total_revenue=24765128.0)
Row(region='Central America and the Caribbean', total_revenue=17570836.0)
Row(region='Asia', total_revenue=28840812.0)
Row(region='Sub-Saharan Africa', total_revenue=24225438.0)
Row(region='North America', total_revenue=3611757.5)
Row(item_type='Household', order_date=Date(14840), total_revenue=2884921.5)
Row(item_type='Household', order_date=Date(14960), total_revenue=2823440.75)
Row(item_type='Household', order_date=Date(15138), total_revenue=979683.8125)
Row(item_type='Household', order_date=Date(15218), total_revenue=6666661.5)
Row(item_type='Household', order_date=Date(15245), total_revenue=416332.21875)
Row(item_type='Household', order_date=Date(15297), total_revenue=4707294.0)
Row(item_type='Household', order_date=Date(16267), total_revenue=5608790.0)
Row(item_type='Household', order_date=Date(16736), to