Import Python packages

In [24]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

Creating list of filepaths to process original event csv data files

In [25]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [26]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list[:10])

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [27]:
# check the number of rows in the csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


Part II. Apache Cassandra coding portion of the project

Now we are ready to work with the CSV file event_datafile_new.csv, located within the Workspace directory. The event_datafile_new.csv contains the following columns:

-artist

-firstName of user

-gender of user

-item number in session

-last name of user

-length of the song

-level (paid or free song)

-location of the user

-sessionId

-song title

-userId

Creating a Cluster

In [28]:
from cassandra.cluster import Cluster

try:
    cluster = Cluster(['127.0.0.1'])
    
    # To establish connection and begin executing queries, need a session
    session = cluster.connect()

except Exception as e:
    print(e)

Create Keyspace

In [29]:
# Creating the projectSparkify keyspace
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}
    """)
    
except Exception as e:
    print(e)

Set Keyspace

In [30]:
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

1. Query the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
To get these informations, we should execute the following query: SELECT artist, song, title FROM table WHERE sessionID = 338 AND itemInSession = 4

First, we need to create the table, and then insert data into it. By the data we can observe from event_datafile_new.csv, one can define the types of each column as:

Artist - Text,
Song - Text,
Length - Float,
SessionID - Int,
ItemInSession - Int

In this table, I am considering SessionID as the partition key and ItemInSession as the clustering key. Naming this table as session_songs

In [31]:
query = "CREATE TABLE IF NOT EXISTS session_songs "
query = query + '(sessionId int, itemInSession int, artist text, song text, length float, PRIMARY KEY(sessionId, itemInSession))'

try:
    session.execute(query)
    print('Table session_song created')

except Exception as e:
    print(e)

Table session_song created


In [32]:
# Checking if the table is empty
try:
    rows = session.execute('SELECT COUNT(*) FROM session_songs')

except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(count=0)


In [33]:
# Adding some data into the new table
csv_file = 'event_datafile_new.csv'
query = "INSERT INTO session_songs (sessionId, itemInsession, artist, song, length) VALUES (%s, %s, %s, %s, %s)"

# reading csv file 
with open(csv_file, 'r', encoding = 'utf8', newline='') as csvfile: 
    # creating a csv reader object 
    csvreader = csv.reader(csvfile) 
    next(csvreader)

    # Reading each line and adding it to the table
    for line in csvreader:
        artist, firstName, gender, itemInSession, lastName, length, level, location, sessionId, song, userId = line

        try:
            session.execute(query, (int(sessionId), int(itemInSession), artist, song, float(length)))

        except Exception as e:
            print(f'The line was not inserted\nError: {e}')

#checking the inserted data
try:
    rows = session.execute('SELECT COUNT(*) FROM session_songs')

except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(count=6820)


In [34]:
# Answering the question "Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4"

try:
    rows = session.execute("SELECT artist, song, length FROM session_songs WHERE sessionId = 338 AND itemInSession = 4")
    
    for row in rows:
        print(f'Artist: {row.artist}, song name: {row.song}, length: {row.length}')
    
except Exception as e:
    print(e)

Artist: Faithless, song name: Music Matters (Mark Knight Dub), length: 495.30731201171875


2. Finding only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

SELECT artist, song, firstName, lastName FROM table WHERE sessionId = 10 AND userId = 182 ORDER BY itemInSession

First, we need to create the table, and then insert data into it. By the data we can observe from the event_datafile_new.csv, one can define the types of each column as:

1. Artist - Text
2. Song - Text
3. firstName - Text
4. lastName - Text
5. userId - Int
6. SessionID - Int
7. ItemInSession - Int

In this table, I am considering SessionID and userId as primary keys, meanwhile itemInSession is used as clustering key. Since different items in session implies in different records, lines with the same session ID and user ID would be erased when a new record was inserted into the table. The itemInSession column as a clustering key would solve this problem, making each record different from the previous.

Naming this table as user_session

In [35]:
query = "CREATE TABLE IF NOT EXISTS user_session "
query = query + '( userId int, sessionId int, itemInSession int, artist text, \
                   song text, firstName text, lastName text, \
                   PRIMARY KEY((userId, sessionId), itemInSession))'

try:
    session.execute(query)
    print('Table user_session created')

except Exception as e:
    print(e)

Table user_session created


In [36]:
# Checking if the table is empty
try:
    rows = session.execute('SELECT COUNT(*) FROM user_session')

except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(count=0)


In [37]:
# Adding some data into the new table
csv_file = 'event_datafile_new.csv'
query = "INSERT INTO user_session (userId, sessionId, itemInSession, artist, song, firstName, lastName) VALUES (%s, %s, %s, %s, %s, %s, %s)"

# reading csv file 
with open(csv_file, 'r', encoding = 'utf8', newline='') as csvfile: 
    # creating a csv reader object 
    csvreader = csv.reader(csvfile) 
    next(csvreader)

    # Reading each line and adding it to the table
    for line in csvreader:
        artist, firstName, gender, itemInSession, lastName, length, level, location, sessionId, song, userId = line

        try:
            session.execute(query, (int(userId), int(sessionId), int(itemInSession), artist, song, firstName, lastName))

        except Exception as e:
            print(f'The line was not inserted\nError: {e}')

In [38]:
#checking the inserted data

try:
    rows = session.execute('SELECT COUNT(*) FROM user_session')

except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(count=6820)


In [39]:
# Answering the question "Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182"

try:
    rows = session.execute("SELECT artist, song, firstName, lastName \
                           FROM user_session \
                           WHERE userId = 10 AND sessionId = 182 \
                           ORDER BY itemInSession" )
    
    for row in rows:
        print(f'Artist: {row.artist}, Song name: {row.song}, User: {row.firstname} {row.lastname}')
        
except Exception as e:
    print(e)

Artist: Down To The Bone, Song name: Keep On Keepin' On, User: Sylvie Cruz
Artist: Three Drives, Song name: Greece 2000, User: Sylvie Cruz
Artist: Sebastien Tellier, Song name: Kilometer, User: Sylvie Cruz
Artist: Lonnie Gordon, Song name: Catch You Baby (Steve Pitron & Max Sanna Radio Edit), User: Sylvie Cruz


3. Get every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In order to find each record in database with song equals to 'All hands against his own', we should execute the following query:

_SELECT firstName, lastName FROM table WHERE song = 'All Hands Against Hist Own'

First, we need to create the table, and then insert data into it. By the data we can observe from the event_datafile_new.csv, one can define the types of each column as:

1. firstName - Text
2. lastName - Text
3. song - Text
4. userId - Int
5. SessionID - Int
6. ItemInSession - Int

In this table, one would consider the song as the primary key and the columns userId, sessionId and itemInSession as clustering keys. In this configuration, it is possible to get every line of our database when the song is played by a customer, even when a user play the same music in a single session many times as wanted. However, as the requirement says, we only need to get first name and last name of the user for a given song name. Thus we can create a table with song and userId as primary keys.

Naming this table as user_songs

In [40]:
query = "CREATE TABLE IF NOT EXISTS user_songs "
query = query + '(song text, userId int, firstName text, lastName text, PRIMARY KEY(song, userId))'

try:
    session.execute(query)
    print('Table user_songs created')

except Exception as e:
    print(e)

Table user_songs created


In [41]:
# Checking if the table is empty
try:
    rows = session.execute('SELECT COUNT(*) FROM user_songs')

except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(count=0)


In [42]:
# Adding some data into the new table
csv_file = 'event_datafile_new.csv'
query = "INSERT INTO user_songs (song, userId, firstName, lastName) VALUES (%s, %s, %s, %s)"

# reading csv file 
with open(csv_file, 'r', encoding = 'utf8', newline='') as csvfile: 
    # creating a csv reader object 
    csvreader = csv.reader(csvfile) 
    next(csvreader)

    # Reading each line and adding it to the table
    for line in csvreader:
        artist, firstName, gender, itemInSession, lastName, length, level, location, sessionId, song, userId = line

        try:
            session.execute(query, (song, int(userId), firstName, lastName))

        except Exception as e:
            print(f'The line was not inserted\nError: {e}')

In [43]:
#checking the inserted data

try:
    rows = session.execute('SELECT COUNT(*) FROM user_songs')

except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(count=6618)


In [44]:
# Answering the question Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

try:
    rows = session.execute("SELECT firstName, lastName\
                           FROM user_songs \
                           WHERE song = 'All Hands Against His Own'" )
    
    for row in rows:
        print(f'User: {row.firstname} {row.lastname}')
        
except Exception as e:
    print(e)

User: Jacqueline Lynch
User: Tegan Levine
User: Sara Johnson


Drop the tables before closing out the sessions

In [45]:
# All tables in a list

queries_drop = ["DROP TABLE session_songs", 
                "DROP TABLE user_session ", 
                "DROP TABLE user_songs"]

try:
    for query in queries_drop:
        rows = session.execute(query)
        
except Exception as e:
    print(e)

Close the session and cluster connection

In [46]:
session.shutdown()
cluster.shutdown()