In [1]:
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json
from datetime import datetime

In [2]:
class CassandraConnection:
    def __init__(self, secure_connect_bundle, token_file):
        self.secure_connect_bundle = secure_connect_bundle
        self.token_file = token_file
        self.session = None
        self.cluster = None
        self._connect_to_cassandra()

    def _connect_to_cassandra(self):
        """Connects to Cassandra using the provided secure bundle and credentials."""
        with open(self.token_file) as f:
            secrets = json.load(f)

        CLIENT_ID = secrets["clientId"]
        CLIENT_SECRET = secrets["secret"]

        auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
        self.cluster = Cluster(cloud={'secure_connect_bundle': self.secure_connect_bundle}, auth_provider=auth_provider)
        self.session = self.cluster.connect()

    def get_session(self):
        """Return the Cassandra session."""
        return self.session

    def get_cluster(self):
        """Return the Cassandra cluster."""
        return self.cluster

In [3]:
class DataLoader:
    def __init__(self, session, keyspace, table):
        self.session = session
        self.keyspace = keyspace
        self.table = table

    def load_raw_data(self, df):
        """Loading raw data into Cassandra Bronze table."""
        self._create_table_if_not_exists()
        insert_query = self._prepare_insert_query()
        for _, row in df.iterrows():
            self.session.execute(insert_query, (
                row['Region'], row['Country'], row['Item Type'], row['Sales Channel'],
                row['Order Priority'], row['Order Date'], row['Order ID'], row['Ship Date'],
                row['UnitsSold'], row['UnitPrice'], row['UnitCost'],
                row['TotalRevenue'], row['TotalCost'], row['TotalProfit']
            ))

    def _create_table_if_not_exists(self):
        """Create table if it doesn't exist."""
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {self.keyspace}.{self.table} (
            region TEXT,
            country TEXT,
            itemtype TEXT,
            saleschannel TEXT,
            orderPriority TEXT,
            orderDate TEXT,
            orderId BIGINT PRIMARY KEY,
            shipDate TEXT,
            unitsSold INT,
            unitPrice FLOAT,
            unitCost FLOAT,
            totalRevenue FLOAT,
            totalCost FLOAT,
            totalProfit FLOAT
        )
        """
        self.session.execute(create_table_query)

    def _prepare_insert_query(self):
        """Prepare the insert query."""
        return self.session.prepare(f"""
        INSERT INTO {self.keyspace}.{self.table} (
            region, country, itemtype, saleschannel, orderPriority,
            orderDate, orderId, shipDate, unitsSold, unitPrice,
            unitCost, totalRevenue, totalCost, totalProfit
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """)

In [4]:
class DataCleaner:
    @staticmethod
    def bronze_to_silver(dataframe):
        """
        Cleans and processes the bronze-level DataFrame to silver-level.
        """
        # Handle missing values: Drop rows where critical fields are null
        critical_columns = [
            "region", "country", "itemtype", "saleschannel", 
            "orderpriority", "orderdate", "orderid", "shipdate"
        ]
        dataframe = dataframe.dropna(subset=critical_columns)
        
        # Ensuring correct data types
        dataframe["orderid"] = dataframe["orderid"].astype(int)
        dataframe["unitssold"] = dataframe["unitssold"].astype(int)
        dataframe["unitprice"] = dataframe["unitprice"].astype(float)
        dataframe["unitcost"] = dataframe["unitcost"].astype(float)
        dataframe["totalrevenue"] = dataframe["totalrevenue"].astype(float)
        dataframe["totalcost"] = dataframe["totalcost"].astype(float)
        dataframe["totalprofit"] = dataframe["totalprofit"].astype(float)
        
        # Converting dates to uniform format
        date_format = "%m/%d/%Y"
        dataframe["orderdate"] = pd.to_datetime(dataframe["orderdate"], format=date_format, errors="coerce")
        dataframe["shipdate"] = pd.to_datetime(dataframe["shipdate"], format=date_format, errors="coerce")
        
        # Removing rows with invalid dates
        dataframe = dataframe.dropna(subset=["orderdate", "shipdate"])
        
        # Standardizing categorical fields to lowercase
        dataframe["region"] = dataframe["region"].str.lower()
        dataframe["country"] = dataframe["country"].str.lower()
        dataframe["itemtype"] = dataframe["itemtype"].str.lower()
        dataframe["saleschannel"] = dataframe["saleschannel"].str.lower()
        dataframe["orderpriority"] = dataframe["orderpriority"].str.lower()
        
        # Removing invalid records: Check if orderDate <= shipDate
        dataframe = dataframe[dataframe["orderdate"] <= dataframe["shipdate"]]
        
        # Add a "processed_at" column to track processing time
        dataframe["processed_at"] = datetime.now()
        
        return dataframe

In [5]:
class SilverDataLoader:
    def __init__(self, session, keyspace, silvertable):
        self.session = session
        self.keyspace = keyspace
        self.silvertable = silvertable

    def load_data(self, silver_df):
        """Load cleaned silver data into Cassandra."""
        self._create_table_if_not_exists()
        self._insert_data(silver_df)

    def _create_table_if_not_exists(self):
        """Create the Silver table if it doesn't exist."""
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {self.keyspace}.{self.silvertable} (
            orderid BIGINT PRIMARY KEY,
            country TEXT,
            itemtype TEXT,
            orderdate DATE,
            orderpriority TEXT,
            region TEXT,
            saleschannel TEXT,
            shipdate DATE,
            totalcost FLOAT,
            totalprofit FLOAT,
            totalrevenue FLOAT,
            unitcost FLOAT,
            unitprice FLOAT,
            unitssold INT,
            processed_at TIMESTAMP
        );
        """
        self.session.execute(create_table_query)

    def _insert_data(self, silver_df):
        """Insert data into the Silver table."""
        insert_query = f"""
        INSERT INTO {self.keyspace}.{self.silvertable} (
            orderid, country, itemtype, orderdate, orderpriority, region, 
            saleschannel, shipdate, totalcost, totalprofit, totalrevenue, 
            unitcost, unitprice, unitssold, processed_at
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """
        prepared = self.session.prepare(insert_query)
        
        for _, row in silver_df.iterrows():
            self.session.execute(prepared, (
                int(row["orderid"]),
                row["country"],
                row["itemtype"],
                row["orderdate"].date(),  
                row["orderpriority"],
                row["region"],
                row["saleschannel"],
                row["shipdate"].date(),
                float(row["totalcost"]),
                float(row["totalprofit"]),
                float(row["totalrevenue"]),
                float(row["unitcost"]),
                float(row["unitprice"]),
                int(row["unitssold"]),
                row["processed_at"].to_pydatetime() 
            ))

        print("Silver Data loaded successfully!")


In [6]:
class TotalRevenueByRegion:
    def __init__(self, session, keyspace, gold_table):
        self.session = session
        self.keyspace = keyspace
        self.gold_table = gold_table

    def aggregate(self, silver_df):
        """Aggregate total revenue by region."""
        aggregated_df = silver_df.groupby('region', as_index=False)['totalrevenue'].sum()
        self._load_to_cassandra(aggregated_df)

    def _load_to_cassandra(self, aggregated_df):
        """Load aggregated data into Cassandra gold table."""
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {self.keyspace}.{self.gold_table} (
            region TEXT PRIMARY KEY,
            totalrevenue FLOAT
        );
        """
        self.session.execute(create_table_query)
        
        insert_query = f"""
        INSERT INTO {self.keyspace}.{self.gold_table} (region, totalrevenue) 
        VALUES (?, ?)
        """
        prepared = self.session.prepare(insert_query)

        for _, row in aggregated_df.iterrows():
            self.session.execute(prepared, (row["region"], row["totalrevenue"]))
        
        print("Total Revenue by Region loaded successfully!")

In [7]:
class TotalProfitByRegion:
    def __init__(self, session, keyspace, gold_table):
        self.session = session
        self.keyspace = keyspace
        self.gold_table = gold_table

    def aggregate(self, silver_df):
        """Aggregate total profit by region."""
        aggregated_df = silver_df.groupby('region', as_index=False)['totalprofit'].sum()
        self._load_to_cassandra(aggregated_df)

    def _load_to_cassandra(self, aggregated_df):
        """Load aggregated data into Cassandra gold table."""
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {self.keyspace}.{self.gold_table} (
            region TEXT PRIMARY KEY,
            totalprofit FLOAT
        );
        """
        self.session.execute(create_table_query)
        
        insert_query = f"""
        INSERT INTO {self.keyspace}.{self.gold_table} (region, totalprofit) 
        VALUES (?, ?)
        """
        prepared = self.session.prepare(insert_query)

        for _, row in aggregated_df.iterrows():
            self.session.execute(prepared, (row["region"], row["totalprofit"]))
        
        print("Total Profit by Region loaded successfully!")

In [8]:
class TotalUnitsSoldByRegion:
    def __init__(self, session, keyspace, gold_table):
        self.session = session
        self.keyspace = keyspace
        self.gold_table = gold_table

    def aggregate(self, silver_df):
        """Aggregate total units sold by region."""
        aggregated_df = silver_df.groupby('region', as_index=False)['unitssold'].sum()
        self._load_to_cassandra(aggregated_df)

    def _load_to_cassandra(self, aggregated_df):
        """Load aggregated data into Cassandra gold table."""
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {self.keyspace}.{self.gold_table} (
            region TEXT PRIMARY KEY,
            unitssold INT
        );
        """
        self.session.execute(create_table_query)
        
        insert_query = f"""
        INSERT INTO {self.keyspace}.{self.gold_table} (region, unitssold) 
        VALUES (?, ?)
        """
        prepared = self.session.prepare(insert_query)

        for _, row in aggregated_df.iterrows():
            self.session.execute(prepared, (row["region"], row["unitssold"]))
        
        print("Total Units Sold by Region loaded successfully!")


In [9]:
def main():
    secure_connect_bundle = 'secure-connect-kalyanc.zip'
    token_file = "kalyanc-token.json"
    keyspace = 'cdb'
    bronze_table = 'bronze_sales'
    silver_table = 'silver_sales'

    cassandra_connection = CassandraConnection(secure_connect_bundle, token_file)
    session = cassandra_connection.get_session()

    # Load raw data to Cassandra bronze table
    df = pd.read_csv("https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv")
    data_loader = DataLoader(session, keyspace, bronze_table)
    data_loader.load_raw_data(df)

    # Extract data from the Bronze table
    query = f"SELECT * FROM {keyspace}.{bronze_table}"
    rows = session.execute(query)
    data = [row._asdict() for row in rows]
    df1 = pd.DataFrame(data)
    print('Data Extracted Successfully!')

    # Clean and process the data for Silver stage
    silver_df = DataCleaner.bronze_to_silver(df1)

    # Load the cleaned data into the Silver table
    silver_data_loader = SilverDataLoader(session, keyspace, silver_table)
    silver_data_loader.load_data(silver_df)
    
    # Create Gold Tables (Aggregation)
    total_revenue = TotalRevenueByRegion(session, keyspace, "total_revenue_by_region")
    total_profit = TotalProfitByRegion(session, keyspace, "total_profit_by_region")
    total_units_sold = TotalUnitsSoldByRegion(session, keyspace, "total_units_sold_by_region")

    # Perform aggregations
    total_revenue.aggregate(silver_df)
    total_profit.aggregate(silver_df)
    total_units_sold.aggregate(silver_df)


if __name__ == "__main__":
    main()

Data Extracted Successfully!
Silver Data loaded successfully!
Total Revenue by Region loaded successfully!
Total Profit by Region loaded successfully!
Total Units Sold by Region loaded successfully!
