# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
## Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:

# print working directory '/home/workspace'
print(os.getcwd())

# the filepath is '/home/workspace/event_data'. Get all directories and sub-directories.
filepath = os.getcwd() + '/event_data'

# The list of files and filepaths will be created in 'home/workspace/event_data'
for root, dirs, files in os.walk(filepath):
    
## join the file path('home/workspace/event_data') and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    # print the file_path_list
    print(file_path_list)

/home/workspace
['/home/workspace/event_data/2018-11-18-events.csv', '/home/workspace/event_data/2018-11-02-events.csv', '/home/workspace/event_data/2018-11-03-events.csv', '/home/workspace/event_data/2018-11-13-events.csv', '/home/workspace/event_data/2018-11-14-events.csv', '/home/workspace/event_data/2018-11-10-events.csv', '/home/workspace/event_data/2018-11-22-events.csv', '/home/workspace/event_data/2018-11-30-events.csv', '/home/workspace/event_data/2018-11-17-events.csv', '/home/workspace/event_data/2018-11-05-events.csv', '/home/workspace/event_data/2018-11-23-events.csv', '/home/workspace/event_data/2018-11-12-events.csv', '/home/workspace/event_data/2018-11-29-events.csv', '/home/workspace/event_data/2018-11-08-events.csv', '/home/workspace/event_data/2018-11-19-events.csv', '/home/workspace/event_data/2018-11-28-events.csv', '/home/workspace/event_data/2018-11-26-events.csv', '/home/workspace/event_data/2018-11-04-events.csv', '/home/workspace/event_data/2018-11-16-events.c

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initialize the list array
full_data_rows_list = [] 
    
## for every filepath in the file path list (all files in '/home/workspace/event_data')
for f in file_path_list:

## reading csv file (each file in 'home/workspace/event_data') 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        ## creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
## extracting each data row one by one and append it and create full_data_rows_list file        
        for line in csvreader:
            ##print(line)
            full_data_rows_list.append(line) 
        #print(full_data_rows_list)
# get total number of rows for full_data_rows_list 
print(len(full_data_rows_list))
# print the first five rows in the list
print(full_data_rows_list[:5])


3429
[['A Fine Frenzy', 'Logged In', 'Anabelle', 'F', '0', 'Simpson', '267.91138', 'free', 'Philadelphia-Camden-Wilmington, PA-NJ-DE-MD', 'PUT', 'NextSong', '1.54104E+12', '256', 'Almost Lover (Album Version)', '200', '1.54138E+12', '69'], ['Nirvana', 'Logged In', 'Aleena', 'F', '0', 'Kirby', '214.77832', 'paid', 'Waterloo-Cedar Falls, IA', 'PUT', 'NextSong', '1.54102E+12', '237', 'Serve The Servants', '200', '1.54138E+12', '44'], ['Television', 'Logged In', 'Aleena', 'F', '1', 'Kirby', '238.49751', 'paid', 'Waterloo-Cedar Falls, IA', 'PUT', 'NextSong', '1.54102E+12', '237', 'See No Evil  (Remastered LP Version)', '200', '1.54138E+12', '44'], ['JOHN COLTRANE', 'Logged In', 'Aleena', 'F', '2', 'Kirby', '346.43546', 'paid', 'Waterloo-Cedar Falls, IA', 'PUT', 'NextSong', '1.54102E+12', '237', 'Blues To Bechet (LP Version)', '200', '1.54138E+12', '44'], ['NOFX', 'Logged In', 'Aleena', 'F', '3', 'Kirby', '80.79628', 'paid', 'Waterloo-Cedar Falls, IA', 'PUT', 'NextSong', '1.54102E+12', '237'

In [4]:
# create a smaller event data csv file called event_datafile_new csv that will be used to insert data into the
# Apache Cassandra tables used for the three queries.
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

# create a new file event_datafile_new.csv
with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    # label the columns in each row in event_datafile_new.csv  
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    # for each row in the original list ,if first column is empty 
    # don't do anything ,just continue the loop until the first column has a value. If it has value then input
    # columns 1,3,4,5,6,7,8,9,13,14,17 in event_data_new.csv. 
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [5]:
# count the number of rows in your event_datafile_new csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

2893


In [6]:
# print the first 5 rows in event_datafile_new.csv in a dataframe format
yourfile = pd.read_csv('event_datafile_new.csv',nrows=5)
yourfile

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,A Fine Frenzy,Anabelle,F,0,Simpson,267.91138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",256,Almost Lover (Album Version),69
1,Nirvana,Aleena,F,0,Kirby,214.77832,paid,"Waterloo-Cedar Falls, IA",237,Serve The Servants,44
2,Television,Aleena,F,1,Kirby,238.49751,paid,"Waterloo-Cedar Falls, IA",237,See No Evil (Remastered LP Version),44
3,JOHN COLTRANE,Aleena,F,2,Kirby,346.43546,paid,"Waterloo-Cedar Falls, IA",237,Blues To Bechet (LP Version),44
4,NOFX,Aleena,F,3,Kirby,80.79628,paid,"Waterloo-Cedar Falls, IA",237,It's My Job To Keep Punk Rock Elite,44


In [7]:
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

# create a file 'event_datafile_new2.csv' where first name and last name are combined as username to input in
# the Cassandra tables for the last two queries.
with open('event_datafile_new2.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    # For new file 'event_datafile_new2.csv' label the columns 
    writer.writerow(['artist','username','gender','itemInSession','length',\
                'level','location','sessionId','song','userId'])
    # for each row in the original list ,if first column is empty 
    # don't do anything ,just continue the loop until the first column has a value. If it has value then input
    # in event_data_new2.csv, columns 1, the concatenation of the firstname and lastname columns as the username
    # column and then the remaining columns ( 4,5,7,8,9,13,14,17)
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], (row[2]+' '+row[5]), row[3], row[4], row[6], row[7], row[8], row[12], row[13], row[16]))

In [8]:
# print the first 5 rows in event_datafile_new2.csv in a dataframe format
yourfile2 = pd.read_csv('event_datafile_new2.csv',nrows=5)
yourfile2

Unnamed: 0,artist,username,gender,itemInSession,length,level,location,sessionId,song,userId
0,A Fine Frenzy,Anabelle Simpson,F,0,267.91138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",256,Almost Lover (Album Version),69
1,Nirvana,Aleena Kirby,F,0,214.77832,paid,"Waterloo-Cedar Falls, IA",237,Serve The Servants,44
2,Television,Aleena Kirby,F,1,238.49751,paid,"Waterloo-Cedar Falls, IA",237,See No Evil (Remastered LP Version),44
3,JOHN COLTRANE,Aleena Kirby,F,2,346.43546,paid,"Waterloo-Cedar Falls, IA",237,Blues To Bechet (LP Version),44
4,NOFX,Aleena Kirby,F,3,80.79628,paid,"Waterloo-Cedar Falls, IA",237,It's My Job To Keep Punk Rock Elite,44


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [9]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [10]:
# Create a Keyspace named keyspace_song. The keyspace will use one node for replication. 
# Class is simplestrategy as you are only running a one node cluster.
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS keyspace_song 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)


#### Set Keyspace

In [11]:
# Set KEYSPACE to keyspace_song specified above
try:
    session.set_keyspace('keyspace_song')
except Exception as e:
    print(e)




### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




In [12]:
#### Query 1:
# We need to find the artist, song title and song's length in the music app history list that was heard during
# sessionId = 338, and itemInSession = 4. To do this we first create a table, song_length_session, 
# to contain all columns in table.
# Since the query wants matches based on sessionId and itemInSession, the columns, sessionId and itemInSession
# will be our primary key. sessionId is the partition key and itemInSession is the clustering column.
# The partition key is used to determine what node gets the data.
# The clustering column specifies the order that the data is arranged inside the partition.
# Both sessionId and itemInSession are in the WHERE clause in the query.
query1 = "CREATE TABLE IF NOT EXISTS song_length_session "
query1 = query1 + "(sessionId int, itemIdSession int, artist text, song text, length float, \
PRIMARY KEY (sessionId, itemIdSession))" 

try:
    session.execute(query1)
except Exception as e:
    print(e)


                    

In [13]:
# The file , 'event_datafile_new.csv' will be used to enter values in the song_length_session table 
file = 'event_datafile_new.csv'

# open the 'event_datafile_new.csv' file
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# The INSERT statement will insert sessionId, itemIdSession, artist, song and length into the `query1` variable 
# for the song_length_session table.
        query1 = "INSERT INTO song_length_session(sessionId, itemIdSession, artist, song, length)"
        query1 = query1 + "VALUES (%s, %s, %s, %s, %s)"
        # The column elements from the music app history list are assigned for the sessionId, 
        # itemIdSession, artist, song, and length columns in the INSERT statement.
        session.execute(query1, (int(line[8]), int(line[3]), str(line[0]), str(line[9]), float(line[5])))

#### Do a SELECT to verify that the data have been inserted into each table

In [14]:
# The SELECT statement will verify that the data was entered into the song_length_session table
query1 = "SELECT * FROM song_length_session LIMIT 5"
try:
    rows = session.execute(query1)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.sessionid, row.itemidsession, row.artist, row.song, row.length)



23 0 Regina Spektor The Calculation (Album Version) 191.08526611328125
23 1 Octopus Project All Of The Champs That Ever Lived 250.95791625976562
23 2 Tegan And Sara So Jealous 180.06158447265625
23 3 Dragonette Okay Dolores 153.39056396484375
23 4 Lil Wayne / Eminem Drop The World 229.58975219726562


In [15]:
# The following query will get the artist, song title and song's length in the music app history list that was heard during
# sessionId = 338, and itemInSession = 4
query1 = "SELECT * FROM song_length_session WHERE sessionId = 338 AND itemIdSession = 4"
try:
    rows = session.execute(query1)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song, row.length)


Faithless Music Matters (Mark Knight Dub) 495.30731201171875


### COPY AND REPEAT THE ABOVE THREE CELLS FOR EACH OF THE THREE QUESTIONS

In [16]:
### Query 2
# We need to find name of artist, song(sorted by itemInSession) and user(first and last name)
# for userid = 10, sessionid = 182. To do this we create a table, song_user_session, consisting of userId, sessionId,
# itemInSession, artist, song and username(fistname and lastname). 
# Our partition key is userId and sessionId. ItemInSession is the clustering column sorted in ascending order. 
# The partition key specifies particular node and clustering column denotes sorting(in this case the default
# which is ascending) within each partition. 

query2 = "CREATE TABLE IF NOT EXISTS song_user_session "
query2 = query2 + "(userId int, sessionId int, itemInSession int, artist text, song text, username text, \
PRIMARY KEY ((userId, sessionId), itemInSession)) WITH CLUSTERING ORDER BY (itemInSession ASC)" 

try:
    session.execute(query2)
except Exception as e:
    print(e)

In [17]:
# Here the file 'event_datafile_new2.csv' is used for the song_user_session table
file = 'event_datafile_new2.csv'

# open the 'event_datafile_new2.csv' file
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# The INSERT statement will insert userId, sessionId, itemInSession, artist, song, and username into 
# the `query2` variable for the song_user_session table. 
        query2 = "INSERT INTO song_user_session(userId, sessionId, itemInSession, artist, song, username)"
        query2 = query2 + "VALUES (%s, %s, %s, %s, %s, %s)"
        # The column elements from the music app history list are assigned for the userId, sessionId, 
        # itemIdSession, artist, song, and username columns in the INSERT statement.
        session.execute(query2, (int(line[9]), int(line[7]), int(line[3]), str(line[0]), str(line[8]), str(line[1])))

In [18]:
# The SELECT statement is used to verify the data was entered into the song_user_session table
query2 = "SELECT * FROM song_user_session LIMIT 5"
try:
    rows = session.execute(query2)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.userid, row.sessionid, row.iteminsession, row.artist, row.song, row.username)



26 1068 0 1 Mile North Black Lines Ryan Smith
26 1068 1 USS (Ubiquitous Synergy Seeker) Man Makes The Zoo Ryan Smith
26 1068 2 EsmÃÂ©e Denters / Justin Timberlake Love Dealer (Featuring Justin Timberlake) Ryan Smith
26 1068 3 Train Hey_ Soul Sister Ryan Smith
26 1068 6 The Pussycat Dolls / Snoop Dogg Bottle Pop Ryan Smith


In [19]:
# Use the SELECT statement to get the name of the artist, song(sorted by itemInSession) and user(first and last name)
# for userid = 10, sessionid = 182 from the song_user_session table. 
query2 = "SELECT * FROM song_user_session WHERE userId = 10 AND sessionId = 182" 

try:
    rows = session.execute(query2)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song, row.username)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


In [29]:
#### Query 3
# We need to find every user name (first and last) in the music app history list who listened to the song
# 'All Hands Against His Own' 
# To do this we create table user_song_allHands with userid , song and username. 
# We use compound primary key of song and userId. Song is our partition key and userId is our
# clustering key. Since song is unique in this query it will be the partition key which will go
# on one node. For a quicker read you would want to put song on a single node and search only that node.
# The clustering column will be userId since it is an integer and specifies the the order that the 
# data is arranged inside the single partition.
query3 = "CREATE TABLE IF NOT EXISTS user_song_allHands "
query3 = query3 + "(userId int, song text, username text, PRIMARY KEY(song,userId))"


try:
    session.execute(query3)
except Exception as e:
    print(e)

In [30]:
# Here the file 'event_datafile_new2.csv' is used for the user_song_allHands table
file = 'event_datafile_new2.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# The INSERT statement will insert userId, song, and username into the `query3` variable 
# for the user_song_allHands table.
        query3 = "INSERT INTO user_song_allHands(userId, song, username)"
        query3 = query3 + "VALUES (%s, %s, %s)"
        # The column elements from the music app history list are assigned for the userId, song 
        # and username columns in the INSERT statement. 
        session.execute(query3, (int(line[9]),str(line[8]),str(line[1])))

In [31]:
# The SELECT statement is used to verify that data was entered into the table user_song_allHands table
query3 = "SELECT * FROM user_song_allHands LIMIT 5"
try:
    rows = session.execute(query3)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.userid, row.song, row.username)

44 Too Tough (1994 Digital Remaster) Aleena Kirby
73 My Place Jacob Klein
25 Misfit Love Jayden Graves
95 Eat To Live (Amended Version) Sara Johnson
9 Hey_ Soul Sister Wyatt Scott


In [32]:
# The query will give every user name (first and last) in the music app history who listened to the song
# 'All Hands Against His Own'
query3 = "SELECT * FROM user_song_allHands WHERE song = 'All Hands Against His Own'"
try:
    rows = session.execute(query3)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.username)


Jacqueline Lynch
Sara Johnson


### Drop the tables before closing out the sessions

In [None]:
# Drop the song_length_session table
query = "drop table song_length_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)


In [None]:
# Drop the song_user_session table
query = "drop table song_user_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)


In [28]:
# Drop the user_song_allHands table
query = "drop table user_song_allHands"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)


### Close the session and cluster connection¶

In [None]:
session.shutdown()
cluster.shutdown()