## Demo Cassandra
In this section, we learn how to manipulate data on Cassandra.

Required:
pip install cassandra-driver

We cover:
* CRUD operations
* Table to Dataframe

Documentation: https://docs.datastax.com/en/developer/python-driver/3.25/

In [1]:
!pip3 install cassandra-driver



### CRUD - Cassandra

In [1]:
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement, BatchStatement
from datetime import datetime
from uuid import uuid1, uuid4

In [2]:
def create_connection():    
    cluster = Cluster(['127.0.0.1'],protocol_version=3)
    session = cluster.connect()
    return session

In [3]:
session = create_connection()
print('connected')

connected


In [4]:
# create keyspaces
def create_keyspaces(session):
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS pydb
        WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' }
        """)

    print("setting keyspace...")
    session.set_keyspace("pydb")
    print("done")

In [5]:
create_keyspaces(session)

setting keyspace...
done


In [6]:
# create table

session.execute("""
    CREATE TABLE IF NOT EXISTS product (
        id uuid,
        name text,
        quantity int,
        price float,
        created timestamp,
        PRIMARY KEY (id)
    )
    """)

print('created a table')

created a table


In [7]:
# insert data

def insert_data(session):    
    for i in range(5):        
        id = i + 1
        now = datetime.utcnow()
        v4_uuid = uuid4()
        session.execute("""
            INSERT INTO product (id, name, quantity, price, created) 
            VALUES(%s, %s, %s, %s, %s)
        """,  (v4_uuid, "product " + str(id), 2 + 3*i, 1.2+2.3*i, now)            
        )
    print('inserted data')

In [8]:
insert_data(session)

inserted data


In [9]:
# read all

def read_all(session):
    id = None
    rows = session.execute('SELECT id, name, quantity, price, created FROM product')
    for row in rows:
        print(row.id,"|", row.name,"|", row.quantity,"|", "{:.2f}".format(row.price),"|", row.created)
        if id == None:
            id = row.id
    
    return id    

In [10]:
selected_id = read_all(session)

392d4ffe-319e-42c8-892f-16ec1f00b501 | product 3 | 8 | 5.80 | 2021-04-05 10:47:52.655000
b08b7ef3-e6cb-44d6-bf57-d21e48a34be0 | product 1 | 2 | 1.20 | 2021-04-05 10:47:52.584000
221fd48c-f2d9-465b-a4fa-5ca6f761af85 | product 5 | 14 | 10.40 | 2021-04-05 10:47:52.659000
e4e233bf-ff8e-4372-9a02-8ca821155970 | product 4 | 11 | 8.10 | 2021-04-05 10:47:52.657000
cebac8ca-84a3-4e7e-b095-c39a3ef68f3b | product 2 | 5 | 3.50 | 2021-04-05 10:47:52.652000


In [11]:
# update data
def update_data(session,id):    
    updated_name = "updated-product"
    quantity = 99
    price = 9.99
    session.execute("""
        UPDATE product SET name=%s, quantity=%s, price=%s 
        WHERE id=%s
    """,  (updated_name, quantity, price, id)            
    )
    print('updated data')

In [12]:
update_data(session, selected_id)

# verification
read_all(session)

updated data
392d4ffe-319e-42c8-892f-16ec1f00b501 | updated-product | 99 | 9.99 | 2021-04-05 10:47:52.655000
b08b7ef3-e6cb-44d6-bf57-d21e48a34be0 | product 1 | 2 | 1.20 | 2021-04-05 10:47:52.584000
221fd48c-f2d9-465b-a4fa-5ca6f761af85 | product 5 | 14 | 10.40 | 2021-04-05 10:47:52.659000
e4e233bf-ff8e-4372-9a02-8ca821155970 | product 4 | 11 | 8.10 | 2021-04-05 10:47:52.657000
cebac8ca-84a3-4e7e-b095-c39a3ef68f3b | product 2 | 5 | 3.50 | 2021-04-05 10:47:52.652000


UUID('392d4ffe-319e-42c8-892f-16ec1f00b501')

In [13]:
# delete data

def delete_data(session,id):    

    session.execute("""
        DELETE FROM product WHERE id=%s
    """,  (id,)            
    )
    print('deleted data')

In [14]:
delete_data(session, selected_id)

# verification
read_all(session)

deleted data
b08b7ef3-e6cb-44d6-bf57-d21e48a34be0 | product 1 | 2 | 1.20 | 2021-04-05 10:47:52.584000
221fd48c-f2d9-465b-a4fa-5ca6f761af85 | product 5 | 14 | 10.40 | 2021-04-05 10:47:52.659000
e4e233bf-ff8e-4372-9a02-8ca821155970 | product 4 | 11 | 8.10 | 2021-04-05 10:47:52.657000
cebac8ca-84a3-4e7e-b095-c39a3ef68f3b | product 2 | 5 | 3.50 | 2021-04-05 10:47:52.652000


UUID('b08b7ef3-e6cb-44d6-bf57-d21e48a34be0')

In [15]:
# delete all data

def delete_all_data(session):    

    session.execute("TRUNCATE product")
    print('deleted all data')

In [16]:
delete_all_data(session)

# verification
read_all(session)

deleted all data


## Cassandra - Pandas


sqlalchemy does not support for NoSQL like Redis

In [17]:
# prepara data
insert_data(session)
print('data loaded')    

inserted data
data loaded


In [18]:
# load Cassandra to pandas
import pandas as pd

rows = session.execute('SELECT id, name, quantity, price, created FROM product')
    
df = pd.DataFrame(list(rows))
df

Unnamed: 0,id,name,quantity,price,created
0,b9356d5a-8a08-41fd-9010-6afa2f60fc74,product 2,5,3.5,2021-04-05 10:49:34.599
1,543ae2f9-551b-4413-ba00-d1a031059336,product 1,2,1.2,2021-04-05 10:49:34.593
2,ce5e8cc4-2f6f-4818-a07c-7e959a2220ef,product 3,8,5.8,2021-04-05 10:49:34.604
3,5de8d6d8-39c8-4a8c-a1f5-e9b65148dbc2,product 4,11,8.1,2021-04-05 10:49:34.607
4,0db24278-1bc5-4a84-89d5-40c014a73557,product 5,14,10.4,2021-04-05 10:49:34.609


In [19]:
# load dataframe
products = {'name': ['Product A1', 'Product A2', 'Product A3'],           
            'code': ['E01', 'E02', 'E03'],
            'price': [2.1, 3.6, 3.5],
            'quantity': [5, 7, 3],
            'created': ['2020-10-18 20:12:21', '2020-10-18 20:12:21', '2020-10-18 20:12:21']
           }

dataFrame   = pd.DataFrame(data=products) 
dataFrame

Unnamed: 0,name,code,price,quantity,created
0,Product A1,E01,2.1,5,2020-10-18 20:12:21
1,Product A2,E02,3.6,7,2020-10-18 20:12:21
2,Product A3,E03,3.5,3,2020-10-18 20:12:21


In [20]:
# insert dataframe to Cassandra

# create table - a primary key is required in Cassandra
session.execute("""
    CREATE TABLE IF NOT EXISTS exproduct (
        id uuid,
        name text,
        quantity int,
        price float,
        created timestamp,
        PRIMARY KEY (id)
    )
    """)

# # prepare insert statement
prepared_inserted = session.prepare('INSERT INTO exproduct(id,name,quantity,price,created) VALUES (?, ?, ?, ?, ?)')
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)

for item in dataFrame.itertuples():
    created = datetime.strptime(item.created, '%Y-%m-%d %H:%M:%S')
    batch.add(prepared_inserted,(uuid4(), item.name, item.quantity, item.price, created))

# execute
session.execute(batch)
print('bulk insert data')

bulk insert data


In [21]:
# retrieve data 
rows = session.execute('SELECT name, quantity, price, created FROM exproduct')
    
df = pd.DataFrame(list(rows))
df

Unnamed: 0,name,quantity,price,created
0,Product A1,5,2.1,2020-10-18 20:12:21
1,Product A2,7,3.6,2020-10-18 20:12:21
2,Product A3,3,3.5,2020-10-18 20:12:21


## Drop Keys

In [22]:
# Delete keys
session.execute("DROP KEYSPACE pydb")
print('done')

done


In [23]:
# close connection
session.shutdown()