# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [5]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [6]:
# Create Keyspace:
"""
The goal is to connect with a cassandra instance in the local machine so that we can write and execute queries later. For creating new keyspace, CREATE KEYSPACE IF NOT EXISTS keyspace_name WITH REPLICATION={'class':..., 'replication_factor':1} syntax is used. Most of the time, we just need simple strategy and 1 replication factor unless we need more copies of the data.
"""
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity
    WITH REPLICATION = {
        'class': 'SimpleStrategy',
        'replication_factor': 1
    }
""")

<cassandra.cluster.ResultSet at 0x7f0008c49208>

#### Set Keyspace

In [7]:
# Keyspace setup:
"""
Settting up a KEYSPACE named as 'udacity' to the keyspace specified above.
"""
session.set_keyspace(keyspace='udacity')

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask three questions in the following.

### **TASK I:**  
A data table named as "artist_song_session" will be created here as we want to retrieve information about artist name, song title and length of song in the music app history that was heard during sessionId=338 and itemInSession=4.

In [8]:
# STEP I:
# Create table artist_song_session for data retrieval later:
"""
For creating new tables, CREATE TABLE IF NOT EXISTS table_name is used.
"""
query1 = "CREATE TABLE IF NOT EXISTS artist_song_session"
query1 = query1 + "(sessionId int, itemInSession int, artist text, song text, length float, PRIMARY KEY(sessionId, itemInSession));"
session.execute(query=query1)                    

<cassandra.cluster.ResultSet at 0x7f0008c49940>

In [9]:
# STEP II:
# Insert data into table artist_song_session:
"""
Firstly, CSV file is set up with variable named as "file". Then, use a csv.reader object to read the entire CSV file. Lastly, the for loop is iterating through each line in CSV file and insert the corresponding data (sessionId, itemInSession, artist, song, length) into table artist_song_session via SQL syntax INSERT INTO.
"""
file = 'event_datafile_new.csv'
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        
        ## TO-DO: Assign the INSERT statements into the `query` variable
        query = "INSERT INTO artist_song_session (sessionId, itemInSession, artist, song, length)"
        query = query + "VALUES (%s, %s, %s, %s, %s);"
        
        ## TO-DO: Assign which column element should be assigned for each column in the INSERT statement.
        session.execute(query=query, parameters=(
            int(line[8]), # session id is column 9
            int(line[3]), # item in session is column 4
            line[0], # artist is column 1
            line[9], # song title is column 10
            float(line[5]) # length is column 6
            )
        )

In [10]:
# STEP III:
# Verify that the INSERT INTO SQL syntax from above is working correctly
"""
Checking that data was entered into the table artist_song_session without errors. In order to do so, here I use SELECT statement to retrieve the record when session id is 338 and itemInSession is 4
"""
query_select = "SELECT artist, song, length FROM artist_song_session WHERE sessionId=338 AND itemInSession=4;"
rows = session.execute(query_select)
for row in rows:
    print(row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


### **TASK II:**  
A data table named as "artist_song_user" will be created here as we want to know only the artist name, song title and user name in the music app history that was heard during sessionId=182 and with userId=10, so that we can know the group of users who heard this song with a specific artist.

In [11]:
# STEP I:
# Create table artist_song_user for data retrieval later:
"""
For creating new tables, CREATE TABLE IF NOT EXISTS table_name is used.
"""
query2 = "CREATE TABLE IF NOT EXISTS artist_song_user"
query2 = query2 + "(userId int, sessionId int, itemInSession int, artist text, song text, firstName text, lastName text, PRIMARY KEY((userId, sessionId), itemInSession));"
session.execute(query2)

<cassandra.cluster.ResultSet at 0x7f0008c359b0>

In [12]:
# STEP II:
# Insert data into table artist_song_user:
"""
Same with the previous one, a csv.reader object is used here to read through the entire CSV file. Then, the for loop is iterating through each line in CSV file and insert the corresponding data (userId, itemInSession, artist, song, firstName, lastName) into table artist_song_user via SQL syntax INSERT INTO.
"""
file = 'event_datafile_new.csv'
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        
        ## TO-DO: Assign the INSERT statements into the `query` variable
        query = "INSERT INTO artist_song_user (userId, sessionId, itemInSession, artist, song, firstName, lastName)"
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s);"
        
        ## TO-DO: Assign which column element should be assigned for each column in the INSERT statement.
        session.execute(query=query, parameters=(
            int(line[10]), # user id is column 11
            int(line[8]), # session id is column 9
            int(line[3]), # item in session is column 4
            line[0], # artist is column 3
            line[9], # song title is column 10
            line[1], # first name is column 2
            line[4] # last name is column 5
            )
        )

In [13]:
# STEP III:
# Verify that the INSERT INTO SQL syntax from above is working correctly
"""
The script here is to make sure that data was inserted into the table artist_song_user without errors by using SELECT statement to check the record with session id is 182 and user id is 10.
"""
query_select_2 = "SELECT artist, song, firstName, lastName FROM artist_song_user WHERE userId=10 AND sessionId=182;"
rows = session.execute(query=query_select_2)
for row in rows:
    print(f"Artist: {row.artist}, Song: {row.song}, First Name: {row.firstname}, Last Name: {row.lastname}\n")

Artist: Down To The Bone, Song: Keep On Keepin' On, First Name: Sylvie, Last Name: Cruz

Artist: Three Drives, Song: Greece 2000, First Name: Sylvie, Last Name: Cruz

Artist: Sebastien Tellier, Song: Kilometer, First Name: Sylvie, Last Name: Cruz

Artist: Lonnie Gordon, Song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit), First Name: Sylvie, Last Name: Cruz



### **TASK III:**  
A data table named as "user_song" will be created here as we want to know both first name and last name for all users who listened to the song 'All Hands Against His Own'.

In [14]:
# STEP I:
# Create table user_song for data retrieval later:
"""
SQL syntax `CREATE TABLE IF NOT EXISTS table_name` is used here to create a new table named as user_song. Besides, features we want to keep track on are song title, user id, first name and last name.
"""
query3 = "CREATE TABLE IF NOT EXISTS user_song "
query3 = query3 + "(song text, userId int, firstName text, lastName text, PRIMARY KEY (song, userId));"
session.execute(query=query3)

<cassandra.cluster.ResultSet at 0x7f0008c87e80>

In [15]:
# STEP II:
# Insert data into table user_song:
"""
Here, the csv.reader object is used here to read through the entire CSV file. Then, the for loop is iterating through each line in CSV file and insert the corresponding data (song, userId, firstName, lastName) into table user_song via SQL syntax INSERT INTO.
"""
file = 'event_datafile_new.csv'
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO user_song (song, userId, firstName, lastName)"
        query = query + "VALUES (%s, %s, %s, %s);"
        session.execute(query=query, parameters=(
            line[9], # song title is column 10
            int(line[10]), # user id is column 11
            line[1], # first name is column 2
            line[4] # last name is column 5
            )
        )

In [16]:
# STEP III:
# Verify that the INSERT INTO SQL syntax from above is working correctly
"""
The script here is to make sure that data was inserted into the table user_song without errors by using SELECT statement to check the record with the song name 'All Hands Against His Own'.
"""
query_select_3 = "SELECT firstName, lastName FROM user_song WHERE song='All Hands Against His Own';"
rows = session.execute(query=query_select_3)
for row in rows:
    print(row.firstname, row.lastname)

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop the tables before closing out the sessions

In [17]:
# Before closing all sessions, we need to drop all tables we've created, so here DROP TABLE syntax is used to do so.
session.execute("DROP TABLE artist_song_session")
session.execute("DROP TABLE artist_song_user")
session.execute("DROP TABLE user_song")

<cassandra.cluster.ResultSet at 0x7f0008c3d4e0>

### Close the session and cluster connection¶

In [18]:
session.shutdown()
cluster.shutdown()