# Setting up Cassandra

### Installing Cassandra Drivers


In [1]:
!pip install cassandra-driver




### Importing Libraries

In [2]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json
import pandas as pd
from datetime import datetime



### Setting up a connection

In [3]:
def get_connection():
  try:

    cloud_config= {
      'secure_connect_bundle': 'secure-connect-mydb.zip'
    }
    #Initialize the token downloaded
    with open("mydb-token.json") as f:
        secrets = json.load(f)

    CLIENT_ID = secrets["clientId"]
    CLIENT_SECRET = secrets["secret"]

    auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
    cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
    session = cluster.connect()
    if session:
      print("Connection Established !!")
    else:
      print("Connection Failed !!")
    return session

  except Exception as e:
    print(e)
    return None



In [4]:
session = get_connection()




Connection Established !!


## Bronze Flow

### Raw data from URL

In [5]:
import requests
import pandas as pd
from io import StringIO

def raw_data():
    try:
        url = 'https://raw.githubusercontent.com/gchandra10/filestorage/main/sales_100.csv'
        response = requests.get(url)

        if response.status_code == 200:
            try:
                data = pd.read_csv(StringIO(response.text), delimiter=',')
                print("Data retrieved successfully !!")
                return data
            except Exception as e:
                print("An error occurred while parsing the CSV:", e)
        else:
            print("Failed to retrieve data, HTTP Status Code:", response.status_code)
    except Exception as e:
        print("An error occurred during data retrieval:", e)

    return None


### Creating Table

In [6]:
session.execute("drop table if exists bigdata.sales100_bronze")
session.execute("drop table if exists bigdata.sales100_silver")
session.execute("drop table if exists bigdata.sales100_gold")

<cassandra.cluster.ResultSet at 0x7cb63d5e0f70>

In [7]:
def create_table(session,table_name):
    create_table_cql = f"""
      CREATE TABLE IF NOT EXISTS {table_name} (
        region TEXT,
        country TEXT,
        item_type TEXT,
        sales_channel TEXT,
        order_priority TEXT,
        order_date TEXT,
        order_id BIGINT,
        ship_date TEXT,
        units_sold INT,
        unit_price FLOAT,
        unit_cost FLOAT,
        total_revenue FLOAT,
        total_cost FLOAT,
        total_profit FLOAT,
        PRIMARY KEY (order_id)
      );
    """
    try:
        session.execute(create_table_cql)
        print(f"{table_name} Table created successfully !!")
    except Exception as e:
        print(f"Failed to create table: {e}")


### Cleaning column names

In [8]:
def clean_column_names(data):
  try:
    data.columns = data.columns.str.strip().str.lower().str.replace(' ', '_')
    print("Column names cleaned successfully !!")
    return data

  except Exception as e:
    print(e)
    return None


### Data Pre Processing : Changing date format

In [9]:
def data_preprocessing(data):
  try:
    data['order_date'] = pd.to_datetime(data['order_date']).dt.strftime('%Y-%m-%d')
    data['ship_date'] = pd.to_datetime(data['ship_date']).dt.strftime('%Y-%m-%d')
    return data
  except Exception as e:
    print(e)

### Insert Bronze Data : Raw Data insertion from URL to Bronze table

In [10]:
def insert_bronze_data(session, data, table_name):
    query = session.prepare(f"""
        INSERT INTO {table_name} (region, country, item_type, sales_channel, order_priority, order_date, order_id,
                                  ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
    """)
    try:
        for index, row in data.iterrows():
            session.execute(query, (
                row['region'], row['country'], row['item_type'], row['sales_channel'],
                row['order_priority'], row['order_date'], row['order_id'], row['ship_date'],
                row['unitssold'], row['unitprice'], row['unitcost'], row['totalrevenue'],
                row['totalcost'], row['totalprofit']))
        print("Bronze Data inserted successfully for all rows.")
    except Exception as e:
        print(f"Failed to insert data: {e}")


## Silver Flow

### Convert to Dataframe : Converting result set into data frame  

In [11]:
def convert_to_dataframe(session,table_name):
  try:
    result = session.execute(f"SELECT * FROM {table_name}")
    df = pd.DataFrame(result)
    df['order_date'] = pd.to_datetime(df['order_date']).dt.strftime('%Y-%m-%d')
    df['ship_date'] = pd.to_datetime(df['ship_date']).dt.strftime('%Y-%m-%d')
    print("Data converted successfully !!")
    return df

  except Exception as e:
    print(e)
    return None


###  Round Col : Round off the decimal values

In [12]:
def round_col(df):
  try:
    for col in df.columns:
      if pd.api.types.is_numeric_dtype(df[col]):
        df[col] = df[col].round(2)
    return df

  except Exception as e:
    print(e)
    return None

### Clean Bronze : Bronze Data Cleaning

In [13]:
def clean_bronze(session,bronze_table):
  try:

    df = convert_to_dataframe(session,"bigdata.sales100_bronze")
    df = round_col(df)
    print("Bronze Data cleaned successfully !!")
    return df

  except Exception as e:
    print(e)
    return None

### Insert Silver Data : Cleaned data insertion in silver table

In [14]:
def insert_silver_data(session, data, table_name):
    query = session.prepare(f"""
        INSERT INTO {table_name} (region, country, item_type, sales_channel, order_priority, order_date, order_id,
                                  ship_date, units_sold, unit_price, unit_cost, total_revenue, total_cost, total_profit)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
    """)
    try:
        for index, row in data.iterrows():
            session.execute(query, (
                row['region'], row['country'], row['item_type'], row['sales_channel'],
                row['order_priority'], row['order_date'], row['order_id'], row['ship_date'],
                row['units_sold'], row['unit_price'], row['unit_cost'], row['total_revenue'],
                row['total_cost'], row['total_profit']))
        print("Silver Data inserted successfully for all rows.")
    except Exception as e:
        print(f"Failed to insert data: {e}")


## Gold Flow

### Create Table Gold : Create Gold Table

In [15]:
def create_table_gold(session):
    create_table_cqls = [
        """
        CREATE TABLE IF NOT EXISTS bigdata.sales100_gold_yearly (
            year INT,
            region TEXT,
            total_sales FLOAT,
            average_unit_price FLOAT,
            PRIMARY KEY (year, region)
        );
        """,
        """
        CREATE TABLE IF NOT EXISTS bigdata.sales100_gold_monthly (
            year INT,
            month INT,
            region TEXT,
            total_sales FLOAT,
            average_unit_price FLOAT,
            PRIMARY KEY ((year, month), region)
        );
        """,
        """
        CREATE TABLE IF NOT EXISTS bigdata.sales100_gold_country (
            country TEXT,
            year INT,
            total_sales FLOAT,
            average_unit_price FLOAT,
            PRIMARY KEY (country, year)
        );
        """
    ]
    try:
        for cql in create_table_cqls:
            session.execute(cql)
        print("Gold tables created successfully!")
    except Exception as e:
        print(f"Failed to create gold tables: {e}")


### Prepare Gold Data : Data transformation from gold to silver

In [16]:
def prepare_gold_data1(silver_df):
    try:
        silver_df['order_date'] = pd.to_datetime(silver_df['order_date'])
        silver_df['year'] = silver_df['order_date'].dt.year
        silver_df['month'] = silver_df['order_date'].dt.month

        yearly_grouped = silver_df.groupby(['year', 'region']).agg(
            total_sales=pd.NamedAgg(column='total_revenue', aggfunc='sum'),
            average_unit_price=pd.NamedAgg(column='unit_price', aggfunc='mean')
        ).reset_index()
        print("Gold data 1 prepared successfully.")
        return yearly_grouped
    except Exception as e:
        print(e)
        return None

In [17]:
def prepare_gold_data2(silver_df):
    try:
        silver_df['order_date'] = pd.to_datetime(silver_df['order_date'])
        silver_df['year'] = silver_df['order_date'].dt.year
        silver_df['month'] = silver_df['order_date'].dt.month

        monthly_grouped = silver_df.groupby(['year', 'month', 'region']).agg(
            total_sales=pd.NamedAgg(column='total_revenue', aggfunc='sum'),
            average_unit_price=pd.NamedAgg(column='unit_price', aggfunc='mean')
        ).reset_index()
        print("Gold data 2 prepared successfully.")
        return monthly_grouped
    except Exception as e:
        print(e)
        return None

In [18]:
def prepare_gold_data3(silver_df):
  try:
    silver_df['order_date'] = pd.to_datetime(silver_df['order_date'])
    silver_df['year'] = silver_df['order_date'].dt.year
    silver_df['month'] = silver_df['order_date'].dt
    country_grouped = silver_df.groupby(['country', 'year']).agg(
            total_sales=pd.NamedAgg(column='total_revenue', aggfunc='sum'),
            average_unit_price=pd.NamedAgg(column='unit_price', aggfunc='mean')
        ).reset_index()

    print("Gold data 3 prepared successfully.")
    return country_grouped
  except Exception as e:
    print(e)
    return None



### Insert gold data : Insert data into gold table

In [19]:
def insert_gold_data1(session, yearly_data):
    yearly_query = session.prepare("""
        INSERT INTO bigdata.sales100_gold_yearly (year, region, total_sales, average_unit_price)
        VALUES (?, ?, ?, ?);
    """)

    try:
        for idx, row in yearly_data.iterrows():
            session.execute(yearly_query, (row['year'], row['region'], row['total_sales'], row['average_unit_price']))

        print("Data inserted successfully into the gold table1.")
    except Exception as e:
        print(f"Failed to insert gold data: {e}")


In [20]:
def insert_gold_data2(session, monthly_data):

    monthly_query = session.prepare("""
        INSERT INTO bigdata.sales100_gold_monthly (year, month, region, total_sales, average_unit_price)
        VALUES (?, ?, ?, ?, ?);
    """)

    try:

        for idx, row in monthly_data.iterrows():
            session.execute(monthly_query, (row['year'], row['month'], row['region'], row['total_sales'], row['average_unit_price']))

        print("Data inserted successfully into the gold tables.")
    except Exception as e:
        print(f"Failed to insert gold data: {e}")


In [21]:
def insert_gold_data3(session, country_data):

    country_query = session.prepare("""
        INSERT INTO bigdata.sales100_gold_country (country, year, total_sales, average_unit_price)
        VALUES (?, ?, ?, ?);
    """)

    try:

        for idx, row in country_data.iterrows():
            session.execute(country_query, (row['country'], row['year'], row['total_sales'], row['average_unit_price']))
        print("Data inserted successfully into the gold tables.")
    except Exception as e:
        print(f"Failed to insert gold data: {e}")


## Bronze Operations

In [22]:
def bronze(session):
  try:
    # Get data from URL
    data = raw_data()
    print("Raw Data fetched successfully !!")

    data = clean_column_names(data)
    data = data_preprocessing(data)

    bronze_table = create_table(session,"bigdata.sales100_bronze")
    print("Bronze Table created successfully !!")
    bronze_table=insert_bronze_data(session, data, 'bigdata.sales100_bronze')

    return bronze_table

  except Exception as e:
    print(e)
    return None

## Silver Operations

In [23]:
def silver(session,bronze_table):
  try:
    silver_data = clean_bronze(session,bronze_table)
    silver_table = create_table(session,"bigdata.sales100_silver")
    print("Silver Table created successfully !!")
    silver_table=insert_silver_data(session, silver_data, 'bigdata.sales100_silver')
    return silver_table

  except Exception as e:
    print(e)
    return None


## Gold Operations

In [24]:
def gold(session, silver_table):
    try:
        silver_df = convert_to_dataframe(session, "bigdata.sales100_silver")
        print("Silver converted to dataframe.")
        yearly_gold_data = data_preprocessing(silver_df)
        yearly_gold_data = prepare_gold_data1(yearly_gold_data)

        monthly_gold_data = data_preprocessing(silver_df)
        monthly_gold_data = prepare_gold_data2(monthly_gold_data)

        country_gold_data = data_preprocessing(silver_df)
        country_gold_data = prepare_gold_data3(country_gold_data)

        create_table_gold(session)
        print("Gold tables created.")

        insert_gold_data1(session, yearly_gold_data)
        insert_gold_data2(session, monthly_gold_data)
        insert_gold_data3(session, country_gold_data)
        print("Gold data inserted successfully.")



    except Exception as e:
        print(e)

## Table view


In [26]:
def table_view(session,table_name):
  try:
    result = session.execute(f"SELECT * FROM {table_name} limit 2")
    for row in result:
      print(row)
  except Exception as e:
    print(e)

#  Meddallian Architecture

In [27]:
if __name__ == "__main__":

    print("Medallion Architecture ")
    print('\n')
    # Bronze
    bronze_table = bronze(session)
    print('--------  Bronze Table ---------\n')
    table_view(session,"bigdata.sales100_bronze")
    print('\n')


    # Silver
    silver_table = silver(session, bronze_table)
    print('--------  Silver Table ---------\n')
    table_view(session,"bigdata.sales100_silver")
    print('\n')

    # Gold
    gold_table = gold(session, silver_table)
    print('--------  Gold Table Yearly ---------\n')
    table_view(session,"bigdata.sales100_gold_yearly")
    print('--------  Gold Table Monthly ---------\n')
    table_view(session,"bigdata.sales100_gold_monthly")
    print('--------  Gold Table Country ---------\n')
    table_view(session,"bigdata.sales100_gold_country")
    print('\n')


Medallion Architecture 


Data retrieved successfully !!
Raw Data fetched successfully !!
Column names cleaned successfully !!
bigdata.sales100_bronze Table created successfully !!
Bronze Table created successfully !!
Bronze Data inserted successfully for all rows.
--------  Bronze Table ---------

Row(order_id=294530856, country='Italy', item_type='Cereal', order_date='2011-11-15', order_priority='M', region='Europe', sales_channel='Online', ship_date='2011-12-28', total_cost=829138.8125, total_profit=627217.1875, total_revenue=1456356.0, unit_cost=117.11000061035156, unit_price=205.6999969482422, units_sold=7080)
Row(order_id=274930989, country='Dominica', item_type='Household', order_date='2011-11-19', order_priority='C', region='Central America and the Caribbean', sales_channel='Offline', ship_date='2011-12-13', total_cost=3539891.75, total_profit=1167402.125, total_revenue=4707294.0, unit_cost=502.5400085449219, unit_price=668.27001953125, units_sold=7044)


Data converted success