In [1]:
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json
from cassandra.query import SimpleStatement

In [2]:
#function to connect to Cassandra
def connect_to_cassandra():
    cloud_config = {
        'secure_connect_bundle': 'secure-connect-sale01.zip'
    }
    with open("sale01-token.json") as f:
        secrets = json.load(f)

    CLIENT_ID = secrets["clientId"]
    CLIENT_SECRET = secrets["secret"]

    auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
    cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
    session = cluster.connect()
    return session, cluster  # Returns both session and cluster

In [3]:
def load(session, keyspace, table, df):
    #loading raw data into database
    session.execute(f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.{table} (
        region TEXT,
        country TEXT,
        itemtype TEXT,
        saleschannel TEXT,
        orderPriority TEXT,
        orderDate TEXT,
        orderId BIGINT PRIMARY KEY,
        shipDate TEXT,
        unitsSold INT,
        unitPrice FLOAT,
        unitCost FLOAT,
        totalRevenue FLOAT,
        totalCost FLOAT,
        totalProfit FLOAT
    )
    """)

    insert_query = session.prepare(f"""
    INSERT INTO {keyspace}.{table} (
        region, country, itemtype, saleschannel, orderPriority,
        orderDate, orderId, shipDate, unitsSold, unitPrice,
        unitCost, totalRevenue, totalCost, totalProfit
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """)
    
    for _, row in df.iterrows():
        session.execute(insert_query, (
            row['Region'], row['Country'], row['Item Type'], row['Sales Channel'],
            row['Order Priority'], row['Order Date'], row['Order ID'], row['Ship Date'],
            row['UnitsSold'], row['UnitPrice'], row['UnitCost'],
            row['TotalRevenue'], row['TotalCost'], row['TotalProfit']
        ))

    print("CSV data successfully loaded into Cassandra!(Bronze table)")

In [4]:
from datetime import datetime

def bronze_to_silver(dataframe):
    """
    Cleans and processes the bronze-level DataFrame to silver-level.
    """
    # Handle missing values: Drop rows where critical fields are null
    critical_columns = [
        "region", "country", "itemtype", "saleschannel", 
        "orderpriority", "orderdate", "orderid", "shipdate"
    ]
    dataframe = dataframe.dropna(subset=critical_columns)
    
    # Ensuring correct data types
    dataframe["orderid"] = dataframe["orderid"].astype(int)
    dataframe["unitssold"] = dataframe["unitssold"].astype(int)
    dataframe["unitprice"] = dataframe["unitprice"].astype(float)
    dataframe["unitcost"] = dataframe["unitcost"].astype(float)
    dataframe["totalrevenue"] = dataframe["totalrevenue"].astype(float)
    dataframe["totalcost"] = dataframe["totalcost"].astype(float)
    dataframe["totalprofit"] = dataframe["totalprofit"].astype(float)
    
    # Converting dates to uniform format
    date_format = "%m/%d/%Y"
    dataframe["orderdate"] = pd.to_datetime(dataframe["orderdate"], format=date_format, errors="coerce")
    dataframe["shipdate"] = pd.to_datetime(dataframe["shipdate"], format=date_format, errors="coerce")
    
    # Removing rows with invalid dates
    dataframe = dataframe.dropna(subset=["orderdate", "shipdate"])
    
    # Standardizing categorical fields to lowercase
    dataframe["region"] = dataframe["region"].str.lower()
    dataframe["country"] = dataframe["country"].str.lower()
    dataframe["itemtype"] = dataframe["itemtype"].str.lower()
    dataframe["saleschannel"] = dataframe["saleschannel"].str.lower()
    dataframe["orderpriority"] = dataframe["orderpriority"].str.lower()
    
    # Removing invalid records: Check if orderDate <= shipDate
    dataframe = dataframe[dataframe["orderdate"] <= dataframe["shipdate"]]
    
    # Add a "processed_at" column to track processing time
    dataframe["processed_at"] = datetime.now()
    
    return dataframe

In [5]:
#Loading cleaned data to cassandra
def load_to_cassandra(silver_df, keyspace, silvertable):
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.{silvertable} (
        orderid BIGINT PRIMARY KEY,
        country TEXT,
        itemtype TEXT,
        orderdate DATE,
        orderpriority TEXT,
        region TEXT,
        saleschannel TEXT,
        shipdate DATE,
        totalcost FLOAT,
        totalprofit FLOAT,
        totalrevenue FLOAT,
        unitcost FLOAT,
        unitprice FLOAT,
        unitssold INT,
        processed_at TIMESTAMP
    );
    """
    session.execute(create_table_query)
    

    insert_query = f"""
    INSERT INTO {keyspace}.{silvertable} (
        orderid, country, itemtype, orderdate, orderpriority, region, 
        saleschannel, shipdate, totalcost, totalprofit, totalrevenue, 
        unitcost, unitprice, unitssold, processed_at
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """
    prepared = session.prepare(insert_query)

    for _, row in silver_df.iterrows():
        session.execute(prepared, (
            int(row["orderid"]),
            row["country"],
            row["itemtype"],
            row["orderdate"].date(),  
            row["orderpriority"],
            row["region"],
            row["saleschannel"],
            row["shipdate"].date(),
            float(row["totalcost"]),
            float(row["totalprofit"]),
            float(row["totalrevenue"]),
            float(row["unitcost"]),
            float(row["unitprice"]),
            int(row["unitssold"]),
            row["processed_at"].to_pydatetime() 
        ))

    print("Silver Data loaded successfully!")


In [6]:
#Generating gold tables(following 3 functions)
def gold_table1(silver_df):
    """
    top 5 countries by profit by region.
    """
    gold_table1 = (
        silver_df.groupby(["region", "country"], as_index=False)
        .agg(total_profit=("totalprofit", "sum"))
    )
    gold_table1 = (
        gold_table1.sort_values(by=["region", "total_profit"], ascending=[True, False])
        .groupby("region")
        .head(5)
    )
    return gold_table1

In [7]:
def gold_table2(silver_df):
    """
    Region wise item type revenue
    """
    gold_table2 = silver_df.groupby(['region', 'itemtype']).agg(
        total_revenue=('totalrevenue', 'sum')
    ).reset_index()

    return gold_table2

In [8]:
def gold_table3(silver_df):
    """Yearly item sales"""
    silver_df["year"] = silver_df["orderdate"].dt.to_period("Y").astype(str)  # Extract Year
    gold_table = (
        silver_df.groupby(["itemtype", "year"], as_index=False)
        .agg(
            total_revenue=("totalrevenue", "sum"),
            total_units_sold=("unitssold", "sum")
        )
    )
    return gold_table

In [9]:
#loading gold tables to database
def load_gold_table_to_cassandra(df, keyspace, table_name, schema_query, insert_query):
    session.execute(schema_query) 
    prepared = session.prepare(insert_query)
    for _, row in df.iterrows():
        session.execute(prepared, tuple(row))
    print(f"Data loaded successfully into {table_name}!")

In [10]:
def top_countries_by_profit(gold_table1, keyspace="sales"):#gold table 1
    schema_query = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.top_countries_by_profit (
        region TEXT,
        country TEXT,
        total_profit FLOAT,
        PRIMARY KEY (region, country)
    );
    """
    insert_query = f"""
    INSERT INTO {keyspace}.top_countries_by_profit (region, country, total_profit)
    VALUES (?, ?, ?);
    """
    load_gold_table_to_cassandra(
        gold_table1,
        keyspace,
        "top_countries_by_profit",
        schema_query,
        insert_query
    )

In [11]:
def region_itemtype_revenue(gold_table, keyspace="sales"):#gold table 2
    schema_query =f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.region_itemtype_revenue (
        region TEXT,
        itemtype TEXT,
        total_revenue FLOAT,
        PRIMARY KEY (region, itemtype)
    );
    """
    insert_query =f"""
    INSERT INTO {keyspace}.region_itemtype_revenue (region, itemtype, total_revenue)
    VALUES (?, ?, ?);
    """
    load_gold_table_to_cassandra(
        gold_table2,
        keyspace,
        "region_itemtype_revenue",
        schema_query,
        insert_query
    )

In [12]:
def annual_item_performance(gold_table3, keyspace="sales"):#gold table 3
    schema_query = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.annual_item_performance (
        itemtype TEXT,
        month_year TEXT,
        total_revenue FLOAT,
        total_units_sold INT,
        PRIMARY KEY (itemtype, month_year)
    );
    """
    insert_query = f"""
    INSERT INTO {keyspace}.annual_item_performance (itemtype, month_year, total_revenue, total_units_sold)
    VALUES (?, ?, ?, ?);
    """
    load_gold_table_to_cassandra(
        gold_table3,
        keyspace,
        "annual_item_performance",
        schema_query,
        insert_query
    )

In [13]:
if __name__ == "__main__":
    session,cluster=connect_to_cassandra()
    df=pd.read_csv("https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv")
    keyspace='sales'
    table='rawsales'
    #loading raw data in to bronze table
    load(session,keyspace,table,df)
    #extracting data from bronze table
    query = f"SELECT * FROM {keyspace}.{table}"
    rows = session.execute(query)
    data = [row._asdict() for row in rows]
    df1 = pd.DataFrame(data)
    print('Data Extracted Successfully!')
    #cleaning the data(making it suitable for silver stage)
    silver_df = bronze_to_silver(df1)
    #loading silver data to cassandra
    silvertable = "salescleaned"
    load_to_cassandra(silver_df, keyspace, silvertable)
    #Generating Gold tables
    gold_table1= gold_table1(silver_df)
    gold_table2 = gold_table2(silver_df)
    gold_table3 = gold_table3(silver_df)
    #loading gold tables to cassandra
    top_countries_by_profit(gold_table1, keyspace="sales")
    region_itemtype_revenue(gold_table2, keyspace="sales")
    annual_item_performance(gold_table3, keyspace="sales")

CSV data successfully loaded into Cassandra!(Bronze table)
Data Extracted Successfully!
Silver Data loaded successfully!
Data loaded successfully into top_countries_by_profit!
Data loaded successfully into region_itemtype_revenue!
Data loaded successfully into annual_item_performance!


# Viewing gold tables

In [14]:
query = f"SELECT * FROM {keyspace}.{'top_countries_by_profit'}"
rows = session.execute(query)
data = [row._asdict() for row in rows]
df1 = pd.DataFrame(data)
df1

Unnamed: 0,region,country,total_profit
0,central america and the caribbean,antigua and barbuda,794996.2
1,central america and the caribbean,dominica,1266076.0
2,central america and the caribbean,haiti,259065.0
3,central america and the caribbean,nicaragua,1568334.0
4,central america and the caribbean,the bahamas,240396.6
5,sub-saharan africa,mali,715456.4
6,sub-saharan africa,tanzania,1396870.0
7,sub-saharan africa,togo,835619.2
8,sub-saharan africa,uganda,1059912.0
9,sub-saharan africa,zimbabwe,1214904.0


In [15]:
query = f"SELECT * FROM {keyspace}.{'region_itemtype_revenue'}"
rows = session.execute(query)
data = [row._asdict() for row in rows]
df2 = pd.DataFrame(data)
df2

Unnamed: 0,region,itemtype,total_revenue
0,central america and the caribbean,baby food,807961.2
1,central america and the caribbean,beverages,298982.4
2,central america and the caribbean,fruits,77634.93
3,central america and the caribbean,household,9913786.0
4,central america and the caribbean,office supplies,5436952.0
5,central america and the caribbean,personal care,1035519.0
6,sub-saharan africa,baby food,337990.7
7,sub-saharan africa,beverages,266004.7
8,sub-saharan africa,clothes,1725313.0
9,sub-saharan africa,cosmetics,9720705.0


In [16]:
query = f"SELECT * FROM {keyspace}.{'annual_item_performance'}"
rows = session.execute(query)
data = [row._asdict() for row in rows]
df3 = pd.DataFrame(data)
df3

Unnamed: 0,itemtype,month_year,total_revenue,total_units_sold
0,meat,2013,4205822.0,9969
1,meat,2014,4006268.0,9496
2,meat,2015,2008618.0,4761
3,meat,2016,5161824.0,12235
4,meat,2017,5896334.0,13976
5,vegetables,2012,107533.9,698
6,vegetables,2015,1027580.0,6670
7,household,2010,5708362.0,8542
8,household,2011,12769970.0,19109
9,household,2014,5608790.0,8393


In [17]:
cluster.shutdown()