In [1]:
import warnings
warnings.filterwarnings("ignore")

In [27]:
!pip install tabulate



In [2]:
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

In [3]:
#function to connect to Cassandra
def connect_to_cassandra():
    cloud_config = {
        'secure_connect_bundle': 'secure-connect-salesdb.zip'
    }
    with open("salesdb-token.json") as f:
        secrets = json.load(f)

    CLIENT_ID = secrets["clientId"]
    CLIENT_SECRET = secrets["secret"]

    auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
    cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
    session = cluster.connect()
    return session, cluster  # Returns both session and cluster

In [4]:
def load_data(session, keyspace, table, df):
    df['Order Date'] = pd.to_datetime(df['Order Date']).dt.strftime('%Y-%m-%d')
    df['Ship Date'] = pd.to_datetime(df['Ship Date']).dt.strftime('%Y-%m-%d')
    session.execute(f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.{table} (
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date TEXT,
        order_id BIGINT PRIMARY KEY,
        ship_date TEXT,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT
    )
    """)

    insert_query = session.prepare(f"""
    INSERT INTO {keyspace}.{table} (
        region, country, item_type, sales_channel, order_priority,
        order_date, order_id, ship_date, units_sold, unit_price,
        unit_cost, total_revenue, total_cost, total_profit
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """)
    
    for _, row in df.iterrows():
        session.execute(insert_query, (
            row['Region'], row['Country'], row['Item Type'], row['Sales Channel'],
            row['Order Priority'], row['Order Date'], row['Order ID'], row['Ship Date'],
            row['UnitsSold'], row['UnitPrice'], row['UnitCost'],
            row['TotalRevenue'], row['TotalCost'], row['TotalProfit']
        ))

    print("CSV data successfully loaded into Cassandra!")


In [5]:
# Define the function to extract data
def extract_data(session, keyspace, table):
    query = f"SELECT * FROM {keyspace}.{table}"
    rows = session.execute(query)
    data = [row._asdict() for row in rows]
    df1 = pd.DataFrame(data)
    print('Data Extracted Successfully!')
    return df1

In [6]:
from datetime import datetime
def add_audit_columns(df1: pd.DataFrame) -> pd.DataFrame:
    try:
        df1['ingestion_date'] = datetime.now()
        return df1
    except Exception as e:
        print(e)

In [7]:
#Trims leading and trailing spaces from all string columns in the DataFrame.
def trim_string_data(df1: pd.DataFrame) -> pd.DataFrame:
    try:
        for col in df1.select_dtypes(include=['object', 'string']).columns:
            df1[col] = df1[col].str.strip()
        return df1
    except Exception as e:
        print(e)

In [8]:
#Convert columtypes
schema_mapping = { 
                    'order_date': 'datetime64[ns]',  
                    'ship_date': 'datetime64[ns]',   
                  }
def convert_column_types(df1: pd.DataFrame, schema_mapping: dict) -> pd.DataFrame:
    try:
        for column_name, target_type in schema_mapping.items():
            if column_name in df1.columns:
                df1[column_name] = pd.to_datetime(df1[column_name], errors='coerce')
        return df1
    except Exception as e:
        print(f"Error in convert_column_types: {str(e)}")
        raise

In [9]:
from cassandra.util import Date
# Helper function to convert Cassandra Date to datetime
def convert_cassandra_date_to_datetime(cassandra_date):
    if isinstance(cassandra_date, Date):
        return datetime.strptime(str(cassandra_date), '%Y-%m-%d')
    return cassandra_date

In [10]:
def standardize_dataframe(df1: pd.DataFrame) -> pd.DataFrame:
        try:
            # Cleaning the column names
            df1.columns = [col.lower().replace(' ', '_') for col in df1.columns]
            # Drop duplicates
            df1 = df1.drop_duplicates()
            #dropping null values
            df1 = df1.dropna()
            # adding the ingestion_date column
            df1['ingestion_date'] = pd.to_datetime('now')
            # Convert columns
            date_columns = ['order_date', 'ship_date']
            for col in date_columns:
                if col in df1.columns:
                    df1[col] = df1[col].apply(convert_cassandra_date_to_datetime)
            # Convert column types
            df1 = convert_column_types(df1, schema_mapping)
            # Trimming string columns data
            str_col= df1.select_dtypes(include=['object']).columns
            for col in str_col:
                df1[col] = df1[col].str.strip()
            return df1
    
        except Exception as e:
            print(f"Error in standardize_dataframe: {str(e)}")
            raise

In [11]:
def load_silver_data(session, df_silver):
    create_table_query = """
    CREATE TABLE IF NOT EXISTS finalsales.silversales (
        order_id BIGINT PRIMARY KEY,
        country TEXT,
        item_type TEXT,
        order_date DATE,
        order_priority TEXT,
        region TEXT,
        sales_channel TEXT,
        ship_date DATE,
        total_cost FLOAT,
        total_profit FLOAT,
        total_revenue FLOAT,
        unit_cost FLOAT,
        unit_price FLOAT,
        units_sold INT,
        ingestion_date TIMESTAMP
    );
    """
    session.execute(create_table_query)

    insert_query = session.prepare("""
    INSERT INTO finalsales.silversales (
        order_id, country, item_type, order_date, order_priority, 
        region, sales_channel, ship_date, total_cost, total_profit, 
        total_revenue, unit_cost, unit_price, units_sold, ingestion_date
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """)

    for _, row in df_silver.iterrows():
        session.execute(insert_query, (
            row['order_id'], row['country'], row['item_type'], row['order_date'], row['order_priority'],
            row['region'], row['sales_channel'], row['ship_date'], row['total_cost'], row['total_profit'],
            row['total_revenue'], row['unit_cost'], row['unit_price'], row['units_sold'], row['ingestion_date']
        ))

    print("Data inserted into the silversales table successfully.")

In [12]:
def gold1(session, keyspace):
    # Sales Summary by Region
    session.execute(f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.sales_by_region (
        region TEXT PRIMARY KEY,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT,
        units_sold INT
    )
    """)  

    region_agg = df_silver.groupby('region').agg(
        total_revenue=('total_revenue', 'sum'),
        total_cost=('total_cost', 'sum'),
        total_profit=('total_profit', 'sum'),
        units_sold=('units_sold', 'sum')
    ).reset_index()

    insert_query_region = f"""
    INSERT INTO {keyspace}.sales_by_region 
    (region, total_revenue, total_cost, total_profit, units_sold) 
    VALUES (?, ?, ?, ?, ?)
    """
    
    prepared_query = session.prepare(insert_query_region)
    
    for _, row in region_agg.iterrows():
        session.execute(prepared_query, (
            row['region'], row['total_revenue'], row['total_cost'], 
            row['total_profit'], row['units_sold']
        ))

    print("Sales Summary by Region inserted successfully.")

In [13]:
def gold2(session, keyspace):
    # Sales Summary by Product (Item Type)
    session.execute(f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.sales_by_product (
        item_type TEXT PRIMARY KEY,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT,
        units_sold INT
    )
    """)

    product_agg = df_silver.groupby('item_type').agg(
        total_revenue=('total_revenue', 'sum'),
        total_cost=('total_cost', 'sum'),
        total_profit=('total_profit', 'sum'),
        units_sold=('units_sold', 'sum')
    ).reset_index()

    insert_query_product = f"""
    INSERT INTO {keyspace}.sales_by_product 
    (item_type, total_revenue, total_cost, total_profit, units_sold) 
    VALUES (?, ?, ?, ?, ?)
    """

    prepared_query = session.prepare(insert_query_product)
    
    for _, row in product_agg.iterrows():
        session.execute(prepared_query, (
            row['item_type'], row['total_revenue'], row['total_cost'], 
            row['total_profit'], row['units_sold']
        ))

    print("Sales Summary by Product inserted successfully.")

In [14]:
def gold3(session, keyspace):
    # Sales Summary by Order Priority
    session.execute(f"""
    CREATE TABLE IF NOT EXISTS {keyspace}.sales_by_priority (
        order_priority TEXT PRIMARY KEY,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT,
        units_sold INT
    )
    """)

    priority_agg = df_silver.groupby('order_priority').agg(
        total_revenue=('total_revenue', 'sum'),
        total_cost=('total_cost', 'sum'),
        total_profit=('total_profit', 'sum'),
        units_sold=('units_sold', 'sum')
    ).reset_index()

    insert_query_priority = f"""
    INSERT INTO {keyspace}.sales_by_priority 
    (order_priority, total_revenue, total_cost, total_profit, units_sold) 
    VALUES (?, ?, ?, ?, ?)
    """

    prepared_query = session.prepare(insert_query_priority)
    
    for _, row in priority_agg.iterrows():
        session.execute(prepared_query, (
            row['order_priority'], row['total_revenue'], row['total_cost'], 
            row['total_profit'], row['units_sold']
        ))

    print("Sales Summary by Order Priority inserted successfully.")

In [15]:
if __name__ == "__main__":
    session,cluster=connect_to_cassandra()
    #data load -Bronze Level
    df=pd.read_csv("https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv")
    keyspace='finalsales'
    table='bronzesales'
    #loading raw data in to cassandea Bronze table
    load_data(session,keyspace,table,df)
    #retrieving data from bronzesales table
    df1 = extract_data(session, keyspace, table)
    # Clean and standardize the DataFrame
    df_silver = standardize_dataframe(df1)
    #loading Silver Data into cassandra
    load_silver_data(session, df_silver)
    #Creating Gold tables
    gold1(session,keyspace)
    gold2(session,keyspace)
    gold3(session,keyspace)

CSV data successfully loaded into Cassandra!
Data Extracted Successfully!
Data inserted into the silversales table successfully.
Sales Summary by Region inserted successfully.
Sales Summary by Product inserted successfully.
Sales Summary by Order Priority inserted successfully.


In [28]:
# Extracting Gold table data
from tabulate import tabulate
def extract_gold_table(session, keyspace, table_name):
    try:
        query = f"SELECT * FROM {keyspace}.{table_name}"
        rows = session.execute(query)
        data = [row._asdict() for row in rows]
        df_gold = pd.DataFrame(data)
        if df_gold.empty:
            print(f"No data found in the {table_name} table.")
        else:
            print(f"\n--- {table_name} ---")
            print(tabulate(df_gold, headers='keys', tablefmt='pretty', showindex=False))

        return df_gold
    except Exception as e:
        print(f"Error extracting data from {table_name}: {e}")
        return pd.DataFrame()

In [29]:
tables=['sales_by_region','sales_by_product','sales_by_priority']
for i in tables:
    extract_gold_table(session,keyspace,i)


--- sales_by_region ---
+-----------------------------------+------------+--------------+---------------+------------+
|              region               | total_cost | total_profit | total_revenue | units_sold |
+-----------------------------------+------------+--------------+---------------+------------+
|       Australia and Oceania       | 7224318.0  |  3486940.0   |  10711258.0   |   42328    |
|              Europe               | 23697468.0 |  11267281.0  |  34964748.0   |   121002   |
|   Middle East and North Africa    | 18250866.0 |  6514262.0   |  24765128.0   |   60376    |
| Central America and the Caribbean | 13318535.0 |  4252300.0   |  17570836.0   |   53641    |
|               Asia                | 22090916.0 |  6749896.0   |  28840812.0   |   113129   |
|        Sub-Saharan Africa         | 16573546.0 |  7651892.0   |  24225438.0   |   92606    |
|           North America           | 2207136.0  |  1404621.5   |   3611757.5   |   11728    |
+------------------------

In [16]:
# Shutdown the cluster
cluster.shutdown()