In [1]:
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import SimpleStatement
import json
from datetime import datetime

In [2]:
# Function to establish connection with Cassandra
def cassandra_connection():
    config = {
        'secure_connect_bundle': 'secure-connect-cassandradb.zip'
    }
    with open("cassandradb-token.json") as file:
        creds = json.load(file)

    auth_provider = PlainTextAuthProvider(creds["clientId"], creds["secret"])
    cluster = Cluster(cloud=config, auth_provider=auth_provider)
    session = cluster.connect()
    return session, cluster

In [3]:
#inserting raw data into a Cassandra table
def insert_raw_data(session, keyspace_name, table_name, dataframe):
    session.execute(f"""
    CREATE TABLE IF NOT EXISTS {keyspace_name}.{table_name} (
        region TEXT,
        country TEXT,
        item_category TEXT,
        sales_medium TEXT,
        priority_level TEXT,
        order_date TEXT,
        order_id BIGINT PRIMARY KEY,
        shipping_date TEXT,
        items_sold INT,
        price_per_unit FLOAT,
        cost_per_unit FLOAT,
        revenue_total FLOAT,
        cost_total FLOAT,
        profit_total FLOAT
    )
    """)

    prepared_query = session.prepare(f"""
    INSERT INTO {keyspace_name}.{table_name} (
        region, country, item_category, sales_medium, priority_level,
        order_date, order_id, shipping_date, items_sold, price_per_unit,
        cost_per_unit, revenue_total, cost_total, profit_total
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """)

    for _, record in dataframe.iterrows():
        session.execute(prepared_query, (
            record['Region'], record['Country'], record['Item Type'], record['Sales Channel'],
            record['Order Priority'], record['Order Date'], record['Order ID'], record['Ship Date'],
            record['UnitsSold'], record['UnitPrice'], record['UnitCost'],
            record['TotalRevenue'], record['TotalCost'], record['TotalProfit']
        ))
    print("Data Successfully inserted in bronze table!!")

In [4]:
def extract_from_bronze(session, keyspace, table_name):
    """
    Extracts data from the bronze table in Cassandra.
    """
    query = f"SELECT * FROM {keyspace}.{table_name}"
    rows = session.execute(query)
    data = [row._asdict() for row in rows]
    df = pd.DataFrame(data)
    print("Raw data extracted successfully!")
    return df

In [5]:
def drop_missing_values(dataframe, required_columns):
    """
    Drops rows with missing values in the specified columns.
    """
    return dataframe.dropna(subset=required_columns)


In [6]:
def convert_data_types(dataframe):
    """
    Converts columns to appropriate data types.
    """
    dataframe["order_id"] = dataframe["order_id"].astype(int)
    dataframe["items_sold"] = dataframe["items_sold"].astype(int)
    dataframe["price_per_unit"] = dataframe["price_per_unit"].astype(float)
    dataframe["cost_per_unit"] = dataframe["cost_per_unit"].astype(float)
    dataframe["revenue_total"] = dataframe["revenue_total"].astype(float)
    dataframe["cost_total"] = dataframe["cost_total"].astype(float)
    dataframe["profit_total"] = dataframe["profit_total"].astype(float)
    return dataframe

In [7]:
def handle_dates(dataframe):
    """
    Converts order and shipping dates to datetime and removes invalid rows.
    """
    date_format = "%m/%d/%Y"
    dataframe["order_date"] = pd.to_datetime(dataframe["order_date"], format=date_format, errors="coerce")
    dataframe["shipping_date"] = pd.to_datetime(dataframe["shipping_date"], format=date_format, errors="coerce")
    dataframe = dataframe.dropna(subset=["order_date", "shipping_date"])
    return dataframe

In [8]:
def standardize_text_fields(dataframe, text_columns):
    """
    Converts text columns to lowercase for standardization.
    """
    for column in text_columns:
        dataframe[column] = dataframe[column].str.lower()
    return dataframe

In [9]:
def transform_to_silver(dataframe):
    """
    Full pipeline for transforming bronze-level data into silver-level data.
    """
    required_columns = [
        "region", "country", "item_category", "sales_medium",
        "priority_level", "order_date", "order_id", "shipping_date"
    ]

    dataframe = drop_missing_values(dataframe, required_columns)
    dataframe = convert_data_types(dataframe)
    dataframe = handle_dates(dataframe)
    dataframe = standardize_text_fields(dataframe, ["region", "country", "item_category", "sales_medium", "priority_level"])

    return dataframe

In [10]:
def create_and_populate_silver_table(session, keyspace, silver_table, silver_df):
    """
    Creates a Silver table in Cassandra if it doesn't exist and populates it with data from the provided DataFrame.

    Args:
        session: Cassandra session object.
        keyspace (str): Keyspace name.
        silver_table (str): Silver table name.
        silver_df (pd.DataFrame): Transformed DataFrame with cleaned and structured data.

    Returns:
        None
    """
    try:
        # Handle date columns using the provided handle_dates function
        silver_df = handle_dates(silver_df)

        # Convert dates to Cassandra-compatible format (YYYY-MM-DD HH:MM:SS)
        silver_df["order_date"] = silver_df["order_date"].dt.strftime('%Y-%m-%d %H:%M:%S')
        silver_df["shipping_date"] = silver_df["shipping_date"].dt.strftime('%Y-%m-%d %H:%M:%S')

        # Create the Silver table if it doesn't exist
        session.execute(f"""
        CREATE TABLE IF NOT EXISTS {keyspace}.{silver_table} (
            region TEXT,
            country TEXT,
            item_category TEXT,
            sales_medium TEXT,
            priority_level TEXT,
            order_date TIMESTAMP,
            order_id BIGINT PRIMARY KEY,
            shipping_date TIMESTAMP,
            items_sold INT,
            price_per_unit FLOAT,
            cost_per_unit FLOAT,
            revenue_total FLOAT,
            cost_total FLOAT,
            profit_total FLOAT
        )
        """)
        print("Silver table created.")

        # Populate the Silver table with data from the DataFrame
        print("Inserting data into Silver table...")
        for _, record in silver_df.iterrows():
            session.execute(f"""
            INSERT INTO {keyspace}.{silver_table} (
                region, country, item_category, sales_medium, priority_level,
                order_date, order_id, shipping_date, items_sold, price_per_unit,
                cost_per_unit, revenue_total, cost_total, profit_total
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """, (
                record["region"], record["country"], record["item_category"], record["sales_medium"],
                record["priority_level"], record["order_date"], record["order_id"], record["shipping_date"],
                record["items_sold"], record["price_per_unit"], record["cost_per_unit"],
                record["revenue_total"], record["cost_total"], record["profit_total"]
            ))
        print("Data inserted into Silver table successfully!")

    except Exception as e:
        print(f"An error occurred while creating or populating the Silver table: {e}")


In [11]:
def extract_silver_data(session, keyspace, silver_table):
    """
    Extracts data from the silver table in Cassandra and converts it to a DataFrame.
    """
    query = f"SELECT * FROM {keyspace}.{silver_table}"
    rows = session.execute(query)
    silver_df = pd.DataFrame([row._asdict() for row in rows])
    print("Data extracted from silver table successfully!")
    return silver_df

In [12]:
def avg_revenue_by_channel(session, keyspace, silver_df):
    """
    Creates and populates a gold table with average revenue per unit by sales channel.
    """
    # Calculate average revenue per unit by sales channel
    avg_revenue_per_unit = silver_df.groupby("sales_medium").apply(
        lambda x: (x["revenue_total"] / x["items_sold"]).mean()
    ).reset_index(name="avg_revenue_per_unit")

    # Create gold table for average revenue by sales channel
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.avg_revenue_by_saleschannel (
        sales_medium TEXT PRIMARY KEY,
        avg_revenue_per_unit FLOAT
    )
    """
    session.execute(create_table_query)

    # Insert data into the table
    insert_query = session.prepare(f"""
    INSERT INTO {keyspace}.avg_revenue_by_saleschannel (sales_medium, avg_revenue_per_unit)
    VALUES (?, ?)
    """)
    for _, row in avg_revenue_per_unit.iterrows():
        session.execute(insert_query, (row["sales_medium"], row["avg_revenue_per_unit"]))

    print("Gold table: Average Revenue per Unit by Sales Channel created and populated!")


In [13]:
def order_priority_distribution(session, keyspace, silver_df):
    """
    Creates and populates a gold table with order priority distribution by country.
    """
    # Calculate order priority distribution by country
    order_priority_distribution = silver_df.groupby(["country", "priority_level"]).size().reset_index(name="order_count")

    # Create gold table for order priority distribution
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.order_priority_distribution (
        country TEXT,
        priority_level TEXT,
        order_count INT,
        PRIMARY KEY (country, priority_level)
    )
    """
    session.execute(create_table_query)

    # Insert data into the table
    insert_query = session.prepare(f"""
    INSERT INTO {keyspace}.order_priority_distribution (country, priority_level, order_count)
    VALUES (?, ?, ?)
    """)
    for _, row in order_priority_distribution.iterrows():
        session.execute(insert_query, (row["country"], row["priority_level"], row["order_count"]))

    print("Gold table: Order Priority Distribution by Country created and populated!")


In [14]:
def top_item_type_by_month(session, keyspace, silver_df):
    """
    Creates and populates a gold table with the top-selling item type by month based on revenue.
    """
    # Convert order_date to datetime if it's not already
    silver_df["order_date"] = pd.to_datetime(silver_df["order_date"])

    # Calculate top-selling item type by month
    silver_df["year_month"] = silver_df["order_date"].apply(lambda x: x.strftime("%Y-%m"))
    monthly_item_revenue = silver_df.groupby(["year_month", "item_category"]).agg(
        total_revenue=pd.NamedAgg(column="revenue_total", aggfunc="sum")
    ).reset_index()

    # Identify the top item type for each month
    top_item_by_month = monthly_item_revenue.sort_values(
        ["year_month", "total_revenue"], ascending=[True, False]
    ).drop_duplicates(subset=["year_month"], keep="first")

    # Create gold table for top-selling item type by month
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.top_item_by_month (
        year_month TEXT PRIMARY KEY,
        item_category TEXT,
        total_revenue FLOAT
    )
    """
    session.execute(create_table_query)

    # Insert data into the table
    insert_query = session.prepare(f"""
    INSERT INTO {keyspace}.top_item_by_month (year_month, item_category, total_revenue)
    VALUES (?, ?, ?)
    """)
    for _, row in top_item_by_month.iterrows():
        session.execute(insert_query, (row["year_month"], row["item_category"], row["total_revenue"]))

    print("Gold table: Top Selling Item Type by Month created and populated!")


In [15]:
def main():
    # Establish connection
    session, cluster = cassandra_connection()
    keyspace = 'rdb'
    bronze_table = "bronzedata"
    silver_table = "silverdata"

    # Load raw data from the CSV file
    df = pd.read_csv("https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv")

    # Insert raw data into the bronze table
    insert_raw_data(session, keyspace, bronze_table, df)

    # Extract data from the bronze table
    bronze_df = extract_from_bronze(session, keyspace, bronze_table)

    # Transform data to silver level
    silver_df = transform_to_silver(bronze_df)

    # Create and populate Silver table
    create_and_populate_silver_table(session, keyspace, silver_table, silver_df)

    # Perform analysis and populate Gold tables
    avg_revenue_by_channel(session, keyspace, silver_df)
    order_priority_distribution(session, keyspace, silver_df)
    top_item_type_by_month(session, keyspace, silver_df)

if __name__ == "__main__":
    main()


Data Successfully inserted in bronze table!!
Raw data extracted successfully!
Silver table created.
Inserting data into Silver table...
Data inserted into Silver table successfully!
Gold table: Average Revenue per Unit by Sales Channel created and populated!
Gold table: Order Priority Distribution by Country created and populated!
Gold table: Top Selling Item Type by Month created and populated!
