# Part I. ETL Pipeline for Pre-Processing the Files

In [30]:
# Importing Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [31]:
# checking current working directory
print(os.getcwd())

# Get current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Creating a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# joining the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [32]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
             
print(len(full_data_rows_list))

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


8056


In [33]:
# checking the number of rows in the csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Apache Cassandra coding portion of the project. 

## The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [34]:
# This should make a connection to a Cassandra instance of the local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [35]:
# Creating a Keyspace 
try:
    session.execute("""create keyspace if not exists sparkify2
                    with replication =
                    {'class' : 'SimpleStrategy' , 'replication_factor' : 1 }""")
except Exception as e:
    print(e)

#### Set Keyspace

In [36]:
# Setting KEYSPACE to the keyspace specified above
session.set_keyspace('sparkify2')

#### Creating queries to ask the following three questions of the data

#### Task 1: Getting the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

##### <span style="color:navy">FOR THIS QUERY SESSIONID AND ITEMINSESSION ARE CHOOSEN AS THE PRIMARY KEY BECAUSE WE WANT TO USE THE WHERE STATEMENT ON THESE ATTRIBUTES. SESSIONID WILL BE THE PARTITION KEY HERE AND ITEMINSESSION WILL BE THE CLUSTERING COLUMN</span>

In [37]:
query = "create table if not exists app_history "
query = query + "(sessionId int, itemInSession int, artist text, song text, length float, primary key (sessionId, ItemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [38]:
# Reading the csv file created above line by line to insert values in app_histry table in proper table.
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
## Assigning INSERT statements into the `query` variable
        query = "insert into app_history (sessionId, itemInSession, artist, song, length)"
        query = query + " values (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

#### Verifying that the data have been inserted into the table

In [39]:
# Adding the SELECT statement to verify the data was entered into the table
query = "select artist, song, length from app_history where sessionId = 338"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
for row in rows:
    print(row.artist, row.song, row.length)

Pixies Build High 89.36444091796875
The Roots / Jack Davey Atonement 155.95057678222656
Mike And The Mechanics A Beggar On A Beach Of Gold 275.1211853027344
Faithless Music Matters (Mark Knight Dub) 495.30731201171875


#### Task 2: Getting the name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

##### <span style="color:navy">FOR THIS QUERY USERID, SESSIONID AND ITEMINSESSION MAKE THE COMPOSITE PRIMARY KEY. USERID AND SESSIONID WILL BE THE COMPOSITE PARTITION KEY AND ITEMINSESSION WILL BE THE CLUSTERING COLUMN BECAUSE WE WANT TO SORT BY ITEMINSESSION. THE PARTITION KEY CHOICES ARE DUE TO THE WHERE CLAUSE WHICH WILL RUN ON USERID AND SESSIONID</span>

In [40]:
query = "create table if not exists music_library "
query = query + "(userId int, sessionId int, itemInSession int, artist text, song text, firstName text, lastName text, primary key ((userId, sessionId), itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
## Assigning INSERT statements into the `query` variable
        query = "insert into music_library (userId, sessionId, itemInSession, artist, song, firstName, lastName) "
        query = query + "values (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

#### Verifying that the data have been inserted into the table

In [41]:
# Adding the SELECT statement to verify the data was entered into the table
query = "select artist, song, firstName, lastName from music_library where userId = 10 and sessionId = 182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
for row in rows:
    print(row.artist, row.song, row.firstname, row.lastname)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


#### Task 3: Getting the user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

##### <span style="color:navy">FOR THIS QUERY SONG AND USERID MAKE THE COMPOSITE PRIMARY KEY. SONG WILL BE THE PARTITION KEY AND USERID WILL BE THE CLUSTERING COLUMN. THE PRIMARY KEY CHOICES ARE DUE TO THE WHERE CLAUSE WHICH NEEDS TO LOOKUP THE SONG COLUMN, BUT SINCE JUST SONG CLOMUN MIGHT NOT MAKE THE PRIMARY KEY UNIQUE USERID IS ALSO ADDED</span>

In [42]:
query = "create table if not exists song_runs "
query = query + "(song text, userId int, firstName text, lastName text, primary key(song, userId))"
try:
    session.execute(query)
except Exception as e:
    print(e)

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "insert into song_runs (song, userId, firstName, lastName)"
        query = query + " values (%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

#### Verifying that the data have been inserted into the table

In [43]:
# Adding the SELECT statement to verify the data was entered into the table
query = "select firstName, lastName from song_runs where song = 'All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
for row in rows:
    print(row.firstname, row.lastname)

Jacqueline Lynch
Tegan Levine
Sara Johnson


#### Drop the tables before closing out the sessions

In [44]:
session.execute("drop table app_history")
session.execute("drop table music_library")
session.execute("drop table song_runs")

<cassandra.cluster.ResultSet at 0x7ff77b0a0828>

#### Close the session and cluster connection¶

In [45]:
session.shutdown()
cluster.shutdown()

## OPTIONAL: Question for the reviewer
 
If you have any question about the starter code or your own implementation, please add it in the cell below. 

For example, if you want to know why a piece of code is written the way it is, or its function, or alternative ways of implementing the same functionality, or if you want to get feedback on a specific part of your code or get feedback on things you tried but did not work.

Please keep your questions succinct and clear to help the reviewer answer them satisfactorily. 

> **_Your question_**