# Data Modelling and ETL Pipeline for Apache Cassandra

In [44]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

In [45]:
# checking current working directory
print(os.getcwd())

# Get current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    

/home/workspace


In [46]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [47]:
# check the number of rows in csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


In [48]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])

# To establish connection and begin executing queries, need a session
session = cluster.connect()

In [49]:
# Create a Keyspace
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkifydb
    WITH REPLICATION =
    {'class': 'SimpleStrategy', 'replication_factor': 1}""" # Replication factor of 1 as Cassandra instance running only on one local machine
                   )
except Exception as e:
    print(e)

In [50]:
# Set KEYSPACE to the keyspace specified above
try:
    session.set_keyspace('sparkifydb')
except Exception as e:
    print(e)

In [51]:
## Query 1:  Give me the artist, song title and song's length in the music app history that was heard during \
## sessionId = 338, and itemInSession = 4

# In order to model this query we will need to have a table with 5 columns containing information about artist name, title of song, duration of song, 
# the session in the app during which this song was heard and which item of the session it was

query = "CREATE TABLE IF NOT EXISTS song_info"
# Use a composition of Partition key sessionId and Clustering column itemInsession as the Primary key in order to obtain a unique combination
# for each row. It makes sense to include sessionId and itemInSession into the primary key as through this query 
# we are looking for information about a song being played at a specific sessionId and for a specific itemInSession
query = query + "(sessionId int, itemInSession int, artist text, song_title text, song_length float, PRIMARY KEY(sessionId,itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

                    

In [52]:
# Set up the CSV file
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# INSERT data from the csv file into the table song_info
        query = "INSERT INTO song_info (sessionId, itemInSession, artist, song_title, song_length)"
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        ## Assign the appropriate column from the csv file to the corressponding column in the table
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

In [53]:
# In order to verify that the correct data was entered in the table, a SELECT statement can be executed to return the artist name, title of song
# and duration of song that was heard during a specific sessionId and a itemInSession.

query = """SELECT * FROM song_info 
        WHERE sessionId = %s
        AND itemInSession = %s
"""
try:
    rows = session.execute(query, (338, 4)) # In this case, we will use sessionId of 338 and itemInSession of 4 but it can be applied for 
                                            # any combination of sessionId and itemInSession in their respective order
except Exception as e:
    print(e)

for row in rows:
    print("Artist name: " + row.artist, "\nTitle of Song: " + row.song_title, "\nDuration of Song: " + str(row.song_length))
    


Artist name: Faithless 
Title of Song: Music Matters (Mark Knight Dub) 
Duration of Song: 495.30731201171875


In [54]:
## Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)\
## for userid = 10, sessionid = 182

# In order to model this query we will need to have a table with 7 columns containing information about artist name, title of song,
# first name of the user, last name of the user, the Id of the user, the session in the app during which this song was heard and which item of the session it was

query = "CREATE TABLE IF NOT EXISTS user_details"
# Use a composition of Partition key userId, Clustering column 1 sessionid and Clustering column 2 itemInsession as the Primary key in order to 
# obtain a unique combination for each row. It makes sense to include userId and sessionId into the primary key as through this query 
# we are looking for information about the song being played and the details about the person playing that song for a specific userId 
# at a specific sessionId. itemInSession was also included in the composition in order to use it to sort the data
query = query + "(userId int, sessionId int, itemInSession int, artist text, song_title text, first_name text, last_name text, PRIMARY KEY(userId,sessionId,itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)


                    

In [55]:
# Set up the CSV file
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# INSERT data from the csv file into the table user_details
        query = "INSERT INTO user_details (userId, sessionId, itemInSession, artist, song_title, first_name, last_name)"
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        ## Assign the appropriate column from the csv file to the corressponding column in the table
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

In [56]:
# In order to verify that the correct data was entered in the table, a SELECT statement can be executed to return the artist name, title of song
# user first name and last name for a specific userId during a specific sessionId 

query = """SELECT * FROM user_details
        WHERE userId = %s
        AND sessionId = %s
"""
try:
    rows = session.execute(query, (10, 182)) # In this case, we will use userId of 10 and sessionId of 182 but it can be applied for 
                                            # any combination of userId and sessionId in their respective order
except Exception as e:
    print(e)

for row in rows:
    print("Artist name: " + row.artist, "\nTitle of Song: " + row.song_title, "\nUser Name: " + row.first_name + " " + row.last_name)

Artist name: Down To The Bone 
Title of Song: Keep On Keepin' On 
User Name: Sylvie Cruz
Artist name: Three Drives 
Title of Song: Greece 2000 
User Name: Sylvie Cruz
Artist name: Sebastien Tellier 
Title of Song: Kilometer 
User Name: Sylvie Cruz
Artist name: Lonnie Gordon 
Title of Song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit) 
User Name: Sylvie Cruz


In [57]:
## Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

# In order to model this query we will need to have a table with 4 columns containing information about user first name, user last name, userId
# and title of song

query = "CREATE TABLE IF NOT EXISTS user_details_for_specific_song"
# Use a composition of title of song as Partition key and clustering column userId as Primary key in order to 
# obtain a unique combination for each row. It makes sense to include title of song and userId into the primary key as through this query 
# we are looking for information about all the users who played a specific song 
query = query + "(song_title text, userId int, first_name text, last_name text, PRIMARY KEY(song_title,userId))"
try:
    session.execute(query)
except Exception as e:
    print(e)
                    

In [58]:
# Set up the CSV file
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# INSERT data from the csv file into the table user_details_for_specific_song
        query = "INSERT INTO user_details_for_specific_song (song_title, userId, first_name, last_name)"
        query = query + "VALUES (%s, %s, %s, %s)"
        ## Assign the appropriate column from the csv file to the corressponding column in the table
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

In [59]:
# In order to verify that the correct data was entered in the table, a SELECT statement can be executed to return the user name and title of song

query = """SELECT * FROM user_details_for_specific_song
        WHERE song_title = %s
"""
try:
    rows = session.execute(query, ('All Hands Against His Own',)) # In this case, we will use the song with title "All Hands Against His Own" 
                                            # but it can be applied for any song title
except Exception as e:
    print(e)

for row in rows:
    print("Title of Song: " + row.song_title, "\nUser Name: " + row.first_name + " " + row.last_name)

Title of Song: All Hands Against His Own 
User Name: Jacqueline Lynch
Title of Song: All Hands Against His Own 
User Name: Tegan Levine
Title of Song: All Hands Against His Own 
User Name: Sara Johnson


In [60]:
##Drop the table before closing out the sessions
session.execute("DROP TABLE IF EXISTS song_info")
session.execute("DROP TABLE IF EXISTS user_details")
session.execute("DROP TABLE IF EXISTS user_details_for_specific_song")

<cassandra.cluster.ResultSet at 0x7f03407ce940>

In [61]:
# Close the session and cluster connection
session.shutdown()
cluster.shutdown()