In [1]:
!pip install cassandra-driver

Collecting cassandra-driver
  Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.2 kB)
Collecting geomet<0.3,>=0.1 (from cassandra-driver)
  Downloading geomet-0.2.1.post1-py3-none-any.whl.metadata (1.0 kB)
Downloading cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.9/3.9 MB[0m [31m28.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading geomet-0.2.1.post1-py3-none-any.whl (18 kB)
Installing collected packages: geomet, cassandra-driver
Successfully installed cassandra-driver-3.29.2 geomet-0.2.1.post1


In [2]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

In [3]:
# This secure connect bundle is autogenerated when you download your SCB,
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': 'secure-connect-cassandra.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("cassandra-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

if session:
  print('Connected!')
else:
  print("An error occurred.")



Connected!


In [4]:
# Connect to the example Keyspace
session = cluster.connect()

# Load the CSV data into a pandas DataFrame
import pandas as pd
df = pd.read_csv('sales_100.csv')

In [5]:
df['Order Date'] = pd.to_datetime(df['Order Date'], format='%m/%d/%Y').dt.strftime('%Y-%m-%d')
df['Ship Date'] = pd.to_datetime(df['Ship Date'], format='%m/%d/%Y').dt.strftime('%Y-%m-%d')

In [25]:
session.execute("""Create table if not exists cassandra.bronze(region text, country text, itemType text, salesChannel text, orderPriority text, orderDate Date, OrderID int PRIMARY KEY)""")
insert_query = session.prepare("INSERT INTO cassandra.bronze(region, country, itemType, salesChannel, orderPriority, orderDate, OrderID) VALUES (?, ?, ?, ?, ?, ?, ?)")

for index, row in df.iterrows():
    session.execute(insert_query, (row['Region'], row['Country'], row['Item Type'], row['Sales Channel'], row['Order Priority'], row['Order Date'], row['Order ID']))

print("Data inserted successfully.")

Data inserted successfully.


In [26]:
select_query = "SELECT * FROM bronze;"
rows = session.execute(select_query)

if rows:
    data = pd.DataFrame(list(rows))
    print(data)
else:
    print("No data found in the bronze table.")

      orderid             country   itemtype   orderdate orderpriority  \
0   571997869             Vanuatu     Fruits  2013-11-03             C   
1   349235904          Mauritius     Clothes  2012-11-17             M   
2   440306556               India     Snacks  2012-10-10             L   
3   667593514             Morocco    Clothes  2013-09-14             M   
4   520480573           Indonesia  Household  2011-09-28             C   
..        ...                 ...        ...         ...           ...   
94  252889239            Thailand       Meat  2015-02-04             C   
95  830192887           Sri Lanka     Fruits  2011-11-07             L   
96  925136649              Serbia    Clothes  2016-07-06             L   
97  824714744  Dominican Republic  Baby Food  2011-08-25             H   
98  572335612             Vanuatu     Cereal  2014-06-20             C   

                               region saleschannel  
0               Australia and Oceania       Online  
1    

In [None]:
# let's clean the data to create a silver table

In [27]:
print("Raw columns:", data.columns)
data.columns = data.columns.str.strip()
print("Cleaned columns:", data.columns)

Raw columns: Index(['orderid', 'country', 'itemtype', 'orderdate', 'orderpriority',
       'region', 'saleschannel'],
      dtype='object')
Cleaned columns: Index(['orderid', 'country', 'itemtype', 'orderdate', 'orderpriority',
       'region', 'saleschannel'],
      dtype='object')


In [None]:
#after modification of dates and cleaning data the data can now be called a silver data
#lets push the data into silver table;

In [29]:
session.execute("""Create table if not exists cassandra.silver(region text, country text, itemType text, salesChannel text, orderPriority text, orderDate Date, OrderID int PRIMARY KEY)""")
insert_query = """
    INSERT INTO silver(region, country, itemtype, saleschannel, orderpriority, orderdate,
                              orderid)
    VALUES (%s, %s, %s, %s, %s, %s, %s);
"""


for _, row in data.iterrows():
    session.execute(insert_query, (
        row['region'], row['country'], row['itemtype'], row['saleschannel'], row['orderpriority'],
        row['orderdate'], row['orderid'])
    )

In [None]:
#now lets create gold tables
#gold1

In [34]:
query = "SELECT * FROM silver where itemtype = 'Clothes' ALLOW FILTERING"
rows = session.execute(query)
df = pd.DataFrame(rows)

In [32]:
session.execute("""Create table if not exists cassandra.gold1(region text, country text, itemType text, salesChannel text, orderPriority text, orderDate Date, OrderID int PRIMARY KEY)""")

<cassandra.cluster.ResultSet at 0x7ae6accbf160>

In [36]:
insert_query = """
    INSERT INTO gold1(region, country, itemtype, saleschannel, orderpriority, orderdate,
                              orderid)
    VALUES (%s, %s, %s, %s, %s, %s, %s);
"""


for _, row in df.iterrows():
    session.execute(insert_query, (
        row['region'], row['country'], row['itemtype'], row['saleschannel'], row['orderpriority'],
        row['orderdate'], row['orderid'])
    )

In [43]:
query = "SELECT * FROM silver where country = 'India' ALLOW FILTERING"
rows = session.execute(query)
df = pd.DataFrame(rows)

In [38]:
session.execute("""Create table if not exists cassandra.gold2(region text, country text, itemType text, salesChannel text, orderPriority text, orderDate Date, OrderID int PRIMARY KEY)""")

<cassandra.cluster.ResultSet at 0x7ae6eab09780>

In [44]:
insert_query = """
    INSERT INTO gold2(region, country, itemtype, saleschannel, orderpriority, orderdate,
                              orderid)
    VALUES (%s, %s, %s, %s, %s, %s, %s);
"""


for _, row in df.iterrows():
    session.execute(insert_query, (
        row['region'], row['country'], row['itemtype'], row['saleschannel'], row['orderpriority'],
        row['orderdate'], row['orderid'])
    )

In [45]:
query = "SELECT * FROM silver where orderpriority = 'H' ALLOW FILTERING"
rows = session.execute(query)
df = pd.DataFrame(rows)

In [46]:
session.execute("""Create table if not exists cassandra.gold3(region text, country text, itemType text, salesChannel text, orderPriority text, orderDate Date, OrderID int PRIMARY KEY)""")

<cassandra.cluster.ResultSet at 0x7ae6eab0bee0>

In [47]:
insert_query = """
    INSERT INTO gold3(region, country, itemtype, saleschannel, orderpriority, orderdate,
                              orderid)
    VALUES (%s, %s, %s, %s, %s, %s, %s);
"""


for _, row in df.iterrows():
    session.execute(insert_query, (
        row['region'], row['country'], row['itemtype'], row['saleschannel'], row['orderpriority'],
        row['orderdate'], row['orderid'])
    )