In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

Creating list of filepaths to process original event csv data files

In [2]:
# Get current working directory
cwd = os.getcwd()

# Get the list of subdirectories of /event_data/ leading to  *.csv files using glob
file_path_list=glob.glob(cwd+'/event_data/**/*.csv', recursive=True)

Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# creating a smaller event data csv file called event_datafile_new.csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


Part II.  the Apache Cassandra coding portion of the project. 

In [5]:
# Make a connection to a Cassandra instance on local machine (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])

# create a session on cassandra
session = cluster.connect()

In [6]:
# Create a Keyspace 
try:
   session.execute("""CREATE KEYSPACE IF NOT EXISTS udacity 
                      WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}""")
    
except Exception as e:
    print(e)

In [7]:
# Set KEYSPACE to the keyspace specified above
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

In [8]:
# Query 1:  Gives the artist, song title and song's length in the music app history that was heard during \
# sessionId = 338, and itemInSession = 4 \
# We have created the table song_info_by_session_and_item_in_session and set the composite key (session_id, iteminsession) as primary key. 

query1="CREATE TABLE IF NOT EXISTS song_info_by_session_and_item_in_session"
query1= query1 + "(artist_name text, song_title text, song_len float, session_id int, iteminsession int, PRIMARY KEY(session_id, iteminsession))"
try:
    session.execute(query1)
except Exception as e:
    print (e)

# open the CSV file and read the content
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    
    next(csvreader) # skip header
    
    #for each line in csvreader
    for line in csvreader: 
        
## Assign the INSERT statements into the `query` variable
        query = "INSERT INTO song_info_by_session_and_item_in_session (artist_name, song_title, song_len, session_id, iteminsession)"
        query = query + "VALUES(%s,%s,%s,%s,%s)"
        
        ## Assign which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (line[0], line[9], float(line[5]), int(line[8]), int(line[3])))


## SELECT statement to verify the data was entered into the table
select="""SELECT artist_name, song_title, song_len, session_id, iteminsession 
                       FROM  song_info_by_session_and_item_in_session
                       WHERE session_id = 338 AND iteminsession = 4"""
try:
    result=session.execute(select)  
except Exception as e:
    print(e) 
    
for row in result:
    print(row.artist_name, row.song_title, row.song_len, row.session_id, row.iteminsession)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875 338 4


In [9]:
## Query 2: name of artist, song (sorted by itemInSession) and user (first and last name)\
## for userid = 10, sessionid = 182 order by item in session \ 
## We have created the table song_info_by_user_and_session and set the composite partition key (user_id, session_id) \
## and item in session as clustering column. 

query2="CREATE TABLE IF NOT EXISTS song_info_by_user_and_session"
query2= query2 + "(artist_name text, song_title text, song_len float, user_first_name text, user_last_name text, user_id int, session_id int, iteminsession int, PRIMARY KEY((user_id, session_id), iteminsession))"
try:
    session.execute(query2)
except Exception as e:
    print (e)
    
# open the CSV file and read the content
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    
    #for each line in csvreader
    for line in csvreader:
        
## Assign the INSERT statements into the `query` variable
        query = "INSERT INTO song_info_by_user_and_session (artist_name, song_title, song_len, user_first_name, user_last_name, user_id, session_id, iteminsession)"
        query = query + "VALUES(%s,%s,%s,%s,%s,%s,%s,%s)"
        ## Assign which column element should be assigned for each column in the INSERT statement.`
        session.execute(query, (line[0], line[9], float(line[5]), line[1], line[4], int(line[10]), int(line[8]), int(line[3])))
                    
## SELECT statement to verify the data was entered into the table
select="""SELECT artist_name, song_title, song_len, user_first_name, user_last_name, user_id, session_id 
                       FROM  song_info_by_user_and_session
                       WHERE user_id = 10 AND session_id = 182
                       ORDER BY iteminsession"""
try:
    result=session.execute(select)  
except Exception as e:
    print(e) 
    
for row in result:
    print(row.artist_name, row.song_title, row.song_len, row.user_first_name, row.user_last_name, row.user_id, row.session_id)
                    

Down To The Bone Keep On Keepin' On 333.7660827636719 Sylvie Cruz 10 182
Three Drives Greece 2000 411.6370849609375 Sylvie Cruz 10 182
Sebastien Tellier Kilometer 377.73016357421875 Sylvie Cruz 10 182
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) 181.2109832763672 Sylvie Cruz 10 182


In [10]:
## Query 3: Gives every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own' \
## We have created the table song_info_by_title and set composite key (song_title, user_id) as primary key where song_title is the partition key. 

query3="CREATE TABLE IF NOT EXISTS song_info_by_title"
query3= query3 + "(user_first_name text, user_last_name text, song_title text, user_id, PRIMARY KEY(song_title, user_id))"
try:
    session.execute(query3)
except Exception as e:
    print (e)
    
# open the CSV file and read the content
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    #for each line in csvreader
    for line in csvreader:
        
## Assign the INSERT statement into the `query` variable
        query = "INSERT INTO song_info_by_title (user_first_name, user_last_name, song_title, user_id)"
        query = query + "VALUES(%s,%s,%s,%s)"
        ## TO-DO: Assign which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (line[1], line[4], line[9]))
                    
## SELECT statement to verify the data was entered into the table
select="""SELECT user_first_name, user_last_name, song_title
                       FROM  song_info_by_title
                       WHERE song_title = 'All Hands Against His Own'
                       GROUP BY song_title, user_id """
try:
    result=session.execute(select)  
except Exception as e:
    print(e) 
    
for row in result:
    print(row.user_first_name, row.user_last_name, row.song_title)
                    

Jacqueline Lynch All Hands Against His Own


In [11]:
## Drop the table before closing out the sessions

query = "drop table music_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
query = "drop table music_play"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
query = "drop table music_listen"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)


In [12]:
#Close the seession and cluster
session.shutdown()
cluster.shutdown()