In [2]:
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json
from cassandra.query import SimpleStatement

In [3]:
#function to connect to Cassandra
def connect_to_cassandra():
    cloud_config = {
        'secure_connect_bundle': 'secure-connect-salesdatabase.zip'
    }
    with open("salesdatabase-token.json") as f:
        secrets = json.load(f)

    CLIENT_ID = secrets["clientId"]
    CLIENT_SECRET = secrets["secret"]

    auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
    cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
    session = cluster.connect()
    return session, cluster  # Returns both session and cluster

In [4]:
def load(session, keyspace, table, df):
    # loading raw data into database
    try:
        # Modify the table schema (column names)
        session.execute(f"""
        CREATE TABLE IF NOT EXISTS {keyspace}.{table} (
            region_name TEXT,
            country_name TEXT,
            item_category TEXT,
            sales_channel TEXT,
            order_priority TEXT,
            order_date TEXT,
            order_id BIGINT PRIMARY KEY,
            shipping_date TEXT,
            units_sold INT,
            unit_price FLOAT,
            unit_cost FLOAT,
            total_revenue FLOAT,
            total_cost FLOAT,
            total_profit FLOAT
        )
        """)

        # Prepare the insert query with modified column names
        insert_query = session.prepare(f"""
        INSERT INTO {keyspace}.{table} (
            region_name, country_name, item_category, sales_channel, order_priority,
            order_date, order_id, shipping_date, units_sold, unit_price,
            unit_cost, total_revenue, total_cost, total_profit
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """)

        # Insert each row into the Cassandra table
        for _, row in df.iterrows():
            session.execute(insert_query, (
                row['Region'], row['Country'], row['Item Type'], row['Sales Channel'],
                row['Order Priority'], row['Order Date'], row['Order ID'], row['Ship Date'],
                row['UnitsSold'], row['UnitPrice'], row['UnitCost'],
                row['TotalRevenue'], row['TotalCost'], row['TotalProfit']
            ))

        print("CSV data successfully loaded into Cassandra!(Bronze table)")

    except Exception as e:
        print(f"Error occurred while loading data: {e}")


In [5]:
from datetime import datetime
import pandas as pd

def bronze_to_silver(dataframe):
    """
    Cleans and processes the bronze-level DataFrame to silver-level.
    """
    try:
        # Handle missing values: Drop rows where critical fields are null
        critical_columns = [
            "region_name", "country_name", "item_category", "sales_channel", 
            "order_priority", "order_date", "order_id", "shipping_date"
        ]
        dataframe = dataframe.dropna(subset=critical_columns)
        
        # Ensuring correct data types
        dataframe["order_id"] = dataframe["order_id"].astype(int)
        dataframe["units_sold"] = dataframe["units_sold"].astype(int)
        dataframe["unit_price"] = dataframe["unit_price"].astype(float)
        dataframe["unit_cost"] = dataframe["unit_cost"].astype(float)
        dataframe["total_revenue"] = dataframe["total_revenue"].astype(float)
        dataframe["total_cost"] = dataframe["total_cost"].astype(float)
        dataframe["total_profit"] = dataframe["total_profit"].astype(float)
        
        # Converting dates to uniform format
        date_format = "%m/%d/%Y"
        dataframe["order_date"] = pd.to_datetime(dataframe["order_date"], format=date_format, errors="coerce")
        dataframe["shipping_date"] = pd.to_datetime(dataframe["shipping_date"], format=date_format, errors="coerce")
        
        # Removing rows with invalid dates
        dataframe = dataframe.dropna(subset=["order_date", "shipping_date"])
        
        # Standardizing categorical fields to lowercase
        dataframe["region_name"] = dataframe["region_name"].str.lower()
        dataframe["country_name"] = dataframe["country_name"].str.lower()
        dataframe["item_category"] = dataframe["item_category"].str.lower()
        dataframe["sales_channel"] = dataframe["sales_channel"].str.lower()
        dataframe["order_priority"] = dataframe["order_priority"].str.lower()
        
        # Removing invalid records: Check if order_date <= shipping_date
        dataframe = dataframe[dataframe["order_date"] <= dataframe["shipping_date"]]
        
        # Add a "processed_at" column to track processing time
        dataframe["processed_at"] = datetime.now()
        
        return dataframe
    
    except Exception as e:
        print(f"Error occurred while processing the data: {e}")
        return None


In [6]:
def load_to_cassandra(silver_df, keyspace, silvertable):
    try:
        # Create the table with updated column names
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {keyspace}.{silvertable} (
            order_id BIGINT PRIMARY KEY,
            country_name TEXT,
            item_category TEXT,
            order_date DATE,
            order_priority TEXT,
            region_name TEXT,
            sales_channel TEXT,
            shipping_date DATE,
            total_cost FLOAT,
            total_profit FLOAT,
            total_revenue FLOAT,
            unit_cost FLOAT,
            unit_price FLOAT,
            units_sold INT,
            processed_at TIMESTAMP
        );
        """
        session.execute(create_table_query)

        # Prepare the insert query with updated column names
        insert_query = f"""
        INSERT INTO {keyspace}.{silvertable} (
            order_id, country_name, item_category, order_date, order_priority, 
            region_name, sales_channel, shipping_date, total_cost, total_profit, 
            total_revenue, unit_cost, unit_price, units_sold, processed_at
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """
        prepared = session.prepare(insert_query)

        # Insert the data row by row
        for _, row in silver_df.iterrows():
            session.execute(prepared, (
                int(row["order_id"]),
                row["country_name"],
                row["item_category"],
                row["order_date"].date(),
                row["order_priority"],
                row["region_name"],
                row["sales_channel"],
                row["shipping_date"].date(),
                float(row["total_cost"]),
                float(row["total_profit"]),
                float(row["total_revenue"]),
                float(row["unit_cost"]),
                float(row["unit_price"]),
                int(row["units_sold"]),
                row["processed_at"].to_pydatetime()
            ))

        print("Silver data loaded successfully!")

    except Exception as e:
        print(f"Error occurred while loading data to Cassandra: {e}")


In [7]:
def gold_table1(silver_df):
    """
    Top 5 countries by total revenue by region.
    """
    gold_table1 = (
        silver_df.groupby(["region_name", "country_name"], as_index=False)
        .agg(total_revenue=("total_revenue", "sum"))
    )
    gold_table1 = (
        gold_table1.sort_values(by=["region_name", "total_revenue"], ascending=[True, False])
        .groupby("region_name")
        .head(5)
    )
    return gold_table1


In [8]:
def gold_table2(silver_df):
    """
    Region wise average unit price by item category.
    """
    gold_table2 = silver_df.groupby(['region_name', 'item_category']).agg(
        avg_unit_price=('unit_price', 'mean')
    ).reset_index()

    return gold_table2


In [9]:
def gold_table3(silver_df):
    """Yearly total cost and total units sold by item category"""
    silver_df["year"] = silver_df["order_date"].dt.to_period("Y").astype(str)  # Extract Year
    gold_table = (
        silver_df.groupby(["item_category", "year"], as_index=False)
        .agg(
            total_cost=("total_cost", "sum"),
            total_units_sold=("units_sold", "sum")
        )
    )
    return gold_table


In [10]:
def load_gold_table_to_cassandra(df, keyspace, table_name, schema_query, insert_query):
    session.execute(schema_query) 
    prepared = session.prepare(insert_query)
    for _, row in df.iterrows():
        session.execute(prepared, tuple(row))
    print(f"Data loaded successfully into {table_name}!")


In [11]:
def top_countries_by_revenue(gold_table1, keyspace="s2"):  # gold table 1 (modified)
    schema_query = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.top_countries_by_revenue (
        region_name TEXT,
        country_name TEXT,
        total_revenue FLOAT,
        PRIMARY KEY (region_name, country_name)
    );
    """
    insert_query = f"""
    INSERT INTO {keyspace}.top_countries_by_revenue (region_name, country_name, total_revenue)
    VALUES (?, ?, ?);
    """
    load_gold_table_to_cassandra(
        gold_table1,
        keyspace,
        "top_countries_by_revenue",
        schema_query,
        insert_query
    )


In [12]:
def region_itemtype_avg_unit_price(gold_table2, keyspace="s2"):  # gold table 2 (modified)
    schema_query = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.region_itemtype_avg_unit_price (
        region_name TEXT,
        item_category TEXT,
        avg_unit_price FLOAT,
        PRIMARY KEY (region_name, item_category)
    );
    """
    insert_query = f"""
    INSERT INTO {keyspace}.region_itemtype_avg_unit_price (region_name, item_category, avg_unit_price)
    VALUES (?, ?, ?);
    """
    load_gold_table_to_cassandra(
        gold_table2,
        keyspace,
        "region_itemtype_avg_unit_price",
        schema_query,
        insert_query
    )


In [13]:
def annual_item_performance(gold_table3, keyspace="s2"):  # gold table 3 (modified)
    schema_query = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.annual_item_performance (
        item_category TEXT,
        year TEXT,
        total_cost FLOAT,
        total_units_sold INT,
        PRIMARY KEY (item_category, year)
    );
    """
    insert_query = f"""
    INSERT INTO {keyspace}.annual_item_performance (item_category, year, total_cost, total_units_sold)
    VALUES (?, ?, ?, ?);
    """
    load_gold_table_to_cassandra(
        gold_table3,
        keyspace,
        "annual_item_performance",
        schema_query,
        insert_query
    )


In [14]:
if __name__ == "__main__":
    session, cluster = connect_to_cassandra()
    df = pd.read_csv("https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv")
    keyspace = 's2'  # Changed keyspace to 's2'
    table = 'bronzesales'

    # Loading raw data into bronze table
    load(session, keyspace, table, df)

    # Extracting data from the bronze table
    query = f"SELECT * FROM {keyspace}.{table}"
    rows = session.execute(query)
    data = [row._asdict() for row in rows]
    df1 = pd.DataFrame(data)
    print('Data Extracted Successfully!')

    # Cleaning the data (making it suitable for the silver stage)
    silver_df = bronze_to_silver(df1)

    # Loading silver data to Cassandra
    silvertable = "silversales"
    load_to_cassandra(silver_df, keyspace, silvertable)

    # Generating Gold tables
    gold_table1 = gold_table1(silver_df)
    gold_table2 = gold_table2(silver_df)
    gold_table3 = gold_table3(silver_df)

    # Loading gold tables to Cassandra
    top_countries_by_revenue(gold_table1, keyspace="s2")  # Adjusted to 's2'
    region_itemtype_avg_unit_price(gold_table2, keyspace="s2")  # Adjusted to 's2'
    annual_item_performance(gold_table3, keyspace="s2")  # Adjusted to 's2'


CSV data successfully loaded into Cassandra!(Bronze table)
Data Extracted Successfully!
Silver data loaded successfully!
Data loaded successfully into top_countries_by_revenue!
Data loaded successfully into region_itemtype_avg_unit_price!
Data loaded successfully into annual_item_performance!
