# Part I. ETL Pipeline for Pre-Processing the Files

## CODE TO BE RUN FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking current working directory
#print(os.getcwd())

# Get current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join('/home/workspace/event_data/*.csv'))
    #print(file_path_list)

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# total number of rows 
print(len(full_data_rows_list))
# get an idea what the list of event data rows will look like
print(full_data_rows_list[0:10])

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))
        

8056
[['Barry Tuckwell/Academy of St Martin-in-the-Fields/Sir Neville Marriner', 'Logged In', 'Mohammad', 'M', '0', 'Rodriguez', '277.15873', 'paid', 'Sacramento--Roseville--Arden-Arcade, CA', 'PUT', 'NextSong', '1.54051E+12', '961', 'Horn Concerto No. 4 in E flat K495: II. Romance (Andante cantabile)', '200', '1.54328E+12', '88'], ['Jimi Hendrix', 'Logged In', 'Mohammad', 'M', '1', 'Rodriguez', '239.82975', 'paid', 'Sacramento--Roseville--Arden-Arcade, CA', 'PUT', 'NextSong', '1.54051E+12', '961', 'Woodstock Inprovisation', '200', '1.54328E+12', '88'], ['Building 429', 'Logged In', 'Mohammad', 'M', '2', 'Rodriguez', '300.61669', 'paid', 'Sacramento--Roseville--Arden-Arcade, CA', 'PUT', 'NextSong', '1.54051E+12', '961', 'Majesty (LP Version)', '200', '1.54328E+12', '88'], ["The B-52's", 'Logged In', 'Gianna', 'F', '0', 'Jones', '321.54077', 'free', 'New York-Newark-Jersey City, NY-NJ-PA', 'PUT', 'NextSong', '1.54087E+12', '107', 'Love Shack', '200', '1.54328E+12', '38'], ['Die Mooskirc

In [4]:
# check the number of rows in the csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Apache Cassandra code

The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [5]:
# This should make a connection to a Cassandra instance on local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [6]:
# Create a Keyspace 

try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
# Set KEYSPACE to the keyspace specified above
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

In [8]:
# Create a pandas dataframe event_data_df for better overview over data
event_data_df = pd.read_csv("event_datafile_new.csv")

In [9]:
event_data_df.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Barry Tuckwell/Academy of St Martin-in-the-Fie...,Mohammad,M,0,Rodriguez,277.15873,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Horn Concerto No. 4 in E flat K495: II. Romanc...,88
1,Jimi Hendrix,Mohammad,M,1,Rodriguez,239.82975,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Woodstock Inprovisation,88
2,Building 429,Mohammad,M,2,Rodriguez,300.61669,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Majesty (LP Version),88
3,The B-52's,Gianna,F,0,Jones,321.54077,free,"New York-Newark-Jersey City, NY-NJ-PA",107,Love Shack,38
4,Die Mooskirchner,Gianna,F,1,Jones,169.29914,free,"New York-Newark-Jersey City, NY-NJ-PA",107,Frisch und g'sund,38


In [10]:
# check which data Query 1 should output
event_data_df[(event_data_df['sessionId'] == 338) & (event_data_df['itemInSession'] == 4)][['artist', 'song', 'length']]

Unnamed: 0,artist,song,length
444,Faithless,Music Matters (Mark Knight Dub),495.3073


In [11]:
# check which data Query 2 should output
event_data_df[(event_data_df['sessionId'] == 182) & (event_data_df['userId'] == 10)][['artist', 'song', 'firstName', 'lastName']]

Unnamed: 0,artist,song,firstName,lastName
4704,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
4705,Three Drives,Greece 2000,Sylvie,Cruz
4706,Sebastien Tellier,Kilometer,Sylvie,Cruz
4707,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


In [12]:
# check which data Query 3 should output
event_data_df[(event_data_df['song'] == 'All Hands Against His Own')][['firstName', 'lastName']]

Unnamed: 0,firstName,lastName
2792,Tegan,Levine
5135,Sara,Johnson
6298,Jacqueline,Lynch


<h3>Query 1:<h3>

Query to be modelled:
    
Give me the artist, song title and song's length in music app history that was heard during sessionId=338, itemInSession=4

<h3>Output 1:<h3>

The expected output is : "artist, song and length"
based on: "sessionId and itemInSession"

From the above we know the query to get the data will be a SELECT statement like:

SELECT artist, song, length
FROM sessionId_itemInSession_table 
WHERE sessionId = 338 AND itemInSession = 4

As we know the SELECT query, we can move to CREATE table query.
We will add NOT EXIST to the CREATE statement to check if the table exists and only create the table if it does not exist.

Column Names: We need artist, song title and length of the track on query upon sessionId and itemInSession. 
Hence we will select "artist", "song", "length", "sessionId" and "itemInSession" as the name of the columns.

Primary Key: The PRIMARY key for the table should uniquely identify each row in the table.
For us we need results based on "sessionId" and "itemInSession".

In [13]:
# modelling table 1

query = "CREATE TABLE IF NOT EXISTS sessionId_itemInSession_table "
query = query + """(
                   sessionId int, 
                   itemInSession int, 
                   artist text, 
                   song text, 
                   length float, 
                   PRIMARY KEY (sessionId, itemInSession)
                   )"""
try:
    session.execute(query)
except Exception as e:
    print(e)   

In [14]:
# INSERT statement for table 1
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
## Assign the INSERT statements into the `query` variable
        query = """INSERT INTO sessionId_itemInSession_table (
                                                              sessionId, 
                                                              itemInSession,
                                                              artist, 
                                                              song, 
                                                              length
                                                              )"""
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        ## Assign which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

In [15]:
# SELECT statement to verify the data was entered into table 1
query = """SELECT artist, song, length FROM sessionId_itemInSession_table 
           WHERE sessionId = 338 AND itemInSession = 4"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


<h3>Query 2:<h3>

Query to be modelled:
    
Give me the artist, song (sorted by itemInSession) and user(first and last name) for userid = 10, sessionid = 182

<h3>Output 2:<h3>

The expected output  is : "Name of the artist, title of the song and first and last name of the user"
based on: "userId and sessionId"

From the above we know the query to get the data will be a SELECT statement like:

SELECT artist, song, firstName, lastName 
FROM userId_sessionId_table WHERE userdId = 10 AND sessionId = 182

As we know the SELECT query, we can move to CREATE table query.

Column Names: We need name of the artist, title of the song and first and last name of the user on query upon userId and sessionId.
itemInSession determines the order of the song listing. 
Hence we will select "artist", "song", "firstName", "lastName", "userId", "sessionId"
and "itemInSession" as the name of the columns.

Primary Key: The PRIMARY key for the table should uniquely identify each row in the table.
For us we need results based on "userId" and "sessionId". "itemInSession" sorts the listing of the songs.

In [16]:
# modelling table 2

query = "CREATE TABLE IF NOT EXISTS userId_sessionId_table "
query = query + """(
                    userId int, 
                    sessionId int,
                    itemInSession int,
                    artist text, 
                    song text, 
                    firstName text,
                    lastName text, 
                    PRIMARY KEY ((userId, sessionId), itemInSession)
                    )"""
try:
    session.execute(query)
except Exception as e:
    print(e)

In [17]:
# INSERT statement for table 2
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
## Assign the INSERT statements into the `query` variable
        query = """INSERT INTO userId_sessionId_table (
                                                       userId, 
                                                       sessionId,
                                                       itemInSession, 
                                                       artist, 
                                                       song, 
                                                       firstName, 
                                                       lastName
                                                       )"""
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        ## Assign which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

In [19]:
# SELECT statement to verify the data was entered into table 2
query = """SELECT artist, song, firstName, lastName FROM userId_sessionId_table 
           WHERE userId = 10 AND sessionId = 182"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.artist, row.song, row.firstname, row.lastname)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


<h3>Query 3:<h3>

Query to be modelled:
Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

<h3>Output 3:<h3>

The expected output is : "firstName, lastName of the user"
based on: "song title"

From the above we know the query to get the data will be a SELECT statement like:

SELECT firstName, lastName 
FROM song_table 
WHERE song = 'All Hands Against His Own

As we know the SELECT query, we can move to CREATE table query.

Column Names: We need first and last name of the user on query upon song title. In order to be able to identify every user uniquely, we also
need the column "userId".
Hence we will select "firstName", "lastName", "song" and "userId"
as the name of the columns.

Primary Key:
We need results based on "song", 
so we choose song as the partition key and userId as the clustering column in order to be able to uniquely identify each row in the table.

In [20]:
# modelling table 3
query = "CREATE TABLE IF NOT EXISTS song_table "
query = query + """(
                    song text, 
                    userId int, 
                    firstName text, 
                    lastName text, 
                    PRIMARY KEY (song, userId)
                    )"""
try:
    session.execute(query)
except Exception as e:
    print(e)  

In [21]:
# INSERT statement for table 3
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
## Assign the INSERT statements into the `query` variable
        query = """INSERT INTO song_table (
                                           song, 
                                           userId,
                                           firstName, 
                                           lastName
                                           )"""
        query = query + " VALUES (%s, %s, %s, %s)"
        ## Assign which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

In [22]:
# SELECT statement to verify the data was entered into table 3
query = """SELECT firstName, lastName FROM song_table 
           WHERE song = 'All Hands Against His Own'"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.firstname, row.lastname)

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop the tables before closing out the sessions

In [23]:
# Dropping tables before closing out the sessions
query = "drop table if exists sessionId_itemInSession_table"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
query = "drop table if exists userId_sessionId_table"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
query = "drop table if exists song_table"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [24]:
session.shutdown()
cluster.shutdown()