# sparkifyks: An Apache Cassandra keyspace

This notebook creates an Apache Cassandra keyspace sparkifyks for the music app, Sparkify. The data is modeled on the following 3 queries:
    
    1. artist, song title and song's length for sessionId=338, and itemInSession=4
    2. artist, song (sorted by itemInSession) and user (firstName and lastName) for userid=10, sessionid=182
    3. every user name (firstName and lastName) for the song 'All Hands Against His Own'

In [22]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

from cassandra.cluster import Cluster

#### list of filepaths to process original event csv data files, that's partitioned by date.

In [23]:
print(os.getcwd())

filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

/home/abdel-raouf/Data_Modeling_With_Cassandra


#### Create a single csv data file from the event data csv files to populate Apache Casssandra tables.

In [24]:
full_data_rows_list = [] 
    
for f in file_path_list:
    
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            full_data_rows_list.append(line) 
            
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


#### Check the number of rows in combined events csv file.

In [25]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


#### Create a Cluster and a session connection.

In [26]:
try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create and set the keyspace.

In [27]:
try:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS sparkifyks
        WITH REPLICATION =
        {'class' : 'SimpleStrategy', 'replication_factor' : 2}
    """)
    
except Exception as e:
    print(e)

session.set_keyspace('sparkifyks')

### Query_1

artist, song and length for sessionId=338 and itemInSession=4

#### Primary key selection: 
- Here we have two fields, `session_id` is the partition key, and `item_in_session` is the clustering key.
- Partitioning is done by `session_id` and within that partition, rows are ordered by the `item_in_session`

In [28]:
query = "CREATE TABLE IF NOT EXISTS songs_heard "
query = query + "(artist_name TEXT, title TEXT, duration FLOAT, session_id INT, item_in_session INT, PRIMARY KEY (session_id, item_in_session))" 
try:
    session.execute(query)
except Exception as e:
    print(e)

In [29]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = "INSERT INTO songs_heard (artist_name, title, duration, session_id, item_in_session)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (line[0], line[9], float(line[5]), int(line[8]), int(line[3])))

In [30]:
query = "SELECT artist_name, title, duration FROM songs_heard WHERE session_id=338 AND item_in_session=4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

data = []    
for row in rows:
    data.append([row.artist_name, row.title, row.duration])
    
df = pd.DataFrame(data, columns = ['artist_name', 'title', 'duration'])
df

Unnamed: 0,artist_name,title,duration
0,Faithless,Music Matters (Mark Knight Dub),495.307312


##### Response: 
At sessionId=338 and itemInsession=4, the artist was 'Faithless', and the song was 'Music Matters (Mark Knight Dub)' with a duration of 495.307312 seconds.

### Query_2

artist, song (sorted by itemInSession) and user (firstName and lastName) for userid=10 and sessionid=182

#### Primary key selection:
- Here we have three fields, `user_id` and `session_id` is the combined partition key, while `item_in_session` is the clustering key.
- Partitioning is done by `user_id` and `session_id` combined. As a result, sessions from the same user are stored together.
- Clustering key is `item_in_session`, as within each partition rows are ordered by the `item_in_session`.

In [31]:
query = "CREATE TABLE IF NOT EXISTS users_history "
query = query + "(artist_name TEXT, song_title TEXT, first_name TEXT, last_name TEXT, user_id INT, session_id INT, item_in_session INT, PRIMARY KEY((user_id, session_id), item_in_session))"
try:
    session.execute(query)
except Exception as e:
    print(e)        

In [32]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = "INSERT INTO users_history (artist_name, song_title, first_name, last_name, user_id,  session_id, item_in_session)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (line[0], line[9], line[1], line[4], int(line[10]), int(line[8]), int(line[3])))

In [33]:
query = "SELECT artist_name, song_title, first_name, last_name FROM users_history WHERE user_id=10 AND session_id=182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

data= []    
for row in rows:
    data.append([row.artist_name, row.song_title, row.first_name, row.last_name])
    
df = pd.DataFrame(data, columns = ['artist_name', 'song_title', 'first_name', 'last_name'])
df

Unnamed: 0,artist_name,song_title,first_name,last_name
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


##### Response: 
In sessionId=182, userId=10 Sylvie Cruz listened to 4 songs: 'Keep on Keepin' On' by 'Down To The Bone', 'Greece 2000' by 'Three Drives', 'Kilometer' by 'Sebastien Tellier', and 'Catch You Baby (Steve Pitron & Max Sanna Radio Edit)' by 'Lonnie Gordon'.

### Query_3

all users (firstName and lastName) who listened to the song 'All Hands Against His Own'

#### Primary key selection:
- Here we have two fields, `song_title` is the partition key, and `user_id` is the clustering key.
- Partitioning is done by `song_title` and within that partition, rows are ordered by the `user_id`

In [34]:
query = "CREATE TABLE IF NOT EXISTS users_per_song "
query = query + "(first_name TEXT, last_name TEXT, song_title TEXT, user_id INT, PRIMARY KEY (song_title, user_id))"
try:
    session.execute(query)
except Exception as e:
    print(e)               

In [35]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO users_per_song (first_name, last_name, song_title, user_id)"
        query = query + " VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[1], line[4], line[9], int(line[10])))

In [36]:
query = "SELECT first_name, last_name FROM users_per_song WHERE song_title='All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

data = []
for row in rows:
    data.append([row.first_name, row.last_name])
    
df = pd.DataFrame(data, columns = ['first_name', 'last_name'])
df

Unnamed: 0,first_name,last_name
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


Response: Three users listened to the song 'All Hands Against His Own': Jacqueline Lynch, Tegan Levine and Sara Johnson.

### Drop tables before closing out the sessions.

In [37]:
query_1 = "DROP TABLE IF EXISTS songs_heard"
query_2 = "DROP TABLE IF EXISTS users_history"
query_3 = "DROP TABLE IF EXISTS user_per_song"

try:
    session.execute(query_1)
    session.execute(query_2)
    session.execute(query_3)
except Exception as e:
    print(e)

### Close session and cluster connection.

In [38]:
session.shutdown()
cluster.shutdown()