# Data Modeling with Apache Cassandra

The project focuses on data modeling techniques tailored for Apache Cassandra, a distributed NoSQL database system.

There are three questions:
-  the `artist`, `song title` and `song's length` in the music app history that was heard during `sessionId` = 338, and `itemInSession` = 4

- The name of `artist`, `song` (sorted by `itemInSession`) and user (`first` and` last name`) for `userid` = 10, `sessionid` = 182

- `User name (first and last)` in music app history who listened to the `song` 'All Hands Against His Own'

# Part 1: ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [103]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [104]:
print(os.getcwd())

filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

/workspace/home


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [105]:
full_data_rows_list = [] 
    
for f in file_path_list:

    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        for line in csvreader:
            full_data_rows_list.append(line) 
            


csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

In [106]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part 2: Data Modeling

#### Creating a Cluster

In [107]:
from cassandra.cluster import Cluster
cluster = Cluster()

session = cluster.connect()

#### Create Keyspace

In [108]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [109]:
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print(e)

#### Create df

In [110]:
df = pd.read_csv('event_datafile_new.csv')
df.head(3)

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Hoobastank,Cierra,F,0,Finley,241.3971,free,"Richmond, VA",132,Say The Same,96
1,Mark Knopfler,Cierra,F,1,Finley,249.3122,free,"Richmond, VA",132,Why Aye Man,96
2,Mogwai,Cierra,F,2,Finley,341.28934,free,"Richmond, VA",132,We're No Here,96


In [111]:
df.shape

(6820, 11)

#### Cassandra-Pandas configuration
ref: https://stackoverflow.com/questions/41247345/python-read-cassandra-data-into-pandas

In [112]:
def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

In [113]:
session.row_factory = pandas_factory
session.default_fetch_size = None

# Query 1
    

#### the `artist`, `song title` and `song's length` in the music app history that was heard during `sessionId` = 338, and `itemInSession` = 4

#### Create table

In [114]:
query = "CREATE TABLE IF NOT EXISTS music_app_history "
query2 = """(
        sessionId int,
        itemInSession int,
        artist text,
        song text,
        length float,
        PRIMARY KEY (sessionId, itemInSession)
    )"""
query = query + query2
try:
    session.execute(query)
except Exception as e:
    print(e)            

#### Insert data

In [115]:
for index, row in df.iterrows():
    query = "INSERT INTO music_app_history (sessionId, itemInSession, artist, song, length) VALUES (%s, %s, %s, %s, %s)"    
    session.execute(query, (row["sessionId"], row["itemInSession"], row["artist"], row["song"], row["length"]))

#### Select

In [116]:
query = "SELECT artist, song, length FROM music_app_history WHERE sessionId = 338 AND itemInSession = 4;"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [117]:
rows._current_rows

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


# Query 2

#### The name of `artist`, `song` (sorted by `itemInSession`) and user (`first` and` last name`) for `userid` = 10, `sessionid` = 182

#### Create table

In [118]:
query = "CREATE TABLE IF NOT EXISTS music_library "
query2 = """(userId int,
    sessionId int,
    itemInSession int,
    artist text,
    song text,
    firstName text,
    lastName text,
    PRIMARY KEY (userid, sessionid, itemInSession)
);"""
query = query + query2
try:
    session.execute(query)
except Exception as e:
    print(e)

#### Insert

In [119]:
for index, row in df.iterrows():
    query = "INSERT INTO music_library (userId, sessionId, itemInSession, artist, song, firstName, lastName) VALUES (%s, %s, %s, %s, %s, %s, %s);"
    session.execute(query, (row["userId"], row["sessionId"], row["itemInSession"], row["artist"], row["song"], row["firstName"], row["lastName"]))

#### Select

In [120]:
query = """SELECT artist, song, firstName, lastName
FROM music_library
WHERE userId = 10 AND sessionId = 182;"""

try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [121]:
rows._current_rows

Unnamed: 0,artist,song,firstname,lastname
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


# Query 3

###  `User name (first and last)` in music app history who listened to the `song` 'All Hands Against His Own'

#### Create table

In [125]:
query = "CREATE TABLE IF NOT EXISTS song_history "
query2 = """(
    song text,
    firstName text,
    lastName text,
    userId int,
    PRIMARY KEY (song, firstName, lastName, userId)
);"""
query = query + query2
try:
    session.execute(query)
except Exception as e:
    print(e)

#### Insert

In [127]:
for index, row in df.iterrows():
    query = "INSERT INTO song_history (song, firstName, lastName, userId) VALUES (%s, %s, %s, %s);"
    session.execute(query, (row["song"], row["firstName"], row["lastName"], row["userId"]))

#### Select

In [128]:
query = """SELECT firstName, lastName
FROM song_history
WHERE song='All Hands Against His Own'"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [129]:
rows._current_rows

Unnamed: 0,firstname,lastname
0,Jacqueline,Lynch
1,Sara,Johnson
2,Tegan,Levine


### Drop the tables

In [130]:
query = "DROP table music_app_history"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [131]:
query = "DROP table music_library"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [132]:
query = "DROP table song_history"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [133]:
session.shutdown()
cluster.shutdown()