### Part I. ETL Pipeline for Pre-Processing the Files

**Import Python packages**

In [2]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

**Creating list of filepaths to process original event csv data files**

In [43]:
# checking your current working directory
print(os.getcwd())

# Get current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
#     print(file_path_list)

/home/workspace


**Processing the files to create the data file csv that will be used for Apache Casssandra tables**

In [10]:
# initiate an empty list that will be stored the data from each file
full_data_rows_list = []

for f in file_path_list:
    # read csv file using pandas library
    df = pd.read_csv(f)
    # extract each data row and append it to the list
    for index, row in df.iterrows():
        full_data_rows_list.append(row.tolist())

In [15]:
# create a small event csv data file called event_datafile_new.csv 
# and use it to insert data into the Cassandra tables
df = pd.DataFrame(full_data_rows_list)[[0, 2, 3, 4, 5, 6, 7, 8, 12, 13, 16]]
df.columns = ['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId']
# drop missing artists and save
df[df['artist'].notnull()].to_csv('event_datafile_new.csv', index=False)
# check the number of rows of the event_datafile_new.csv file
pd.read_csv('event_datafile_new.csv').shape[0]

6820

### Part II. Complete the Apache Cassandra coding portion of your project. 

Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

### Begin writing your Apache Cassandra code in the cells below

**Creating a Cluster**

In [16]:
# make a connection to a Cassandra instance your local machine 

from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])

# To establish connection and begin executing queries, need a session
session = cluster.connect()

**Create Keyspace**

In [17]:
# Create a Keyspace 
try:
    session.execute("""CREATE KEYSPACE IF NOT EXISTS sparkifydb
                    WITH REPLICATION =
                    {'class': 'SimpleStrategy',
                     'replication_factor': 1
                    }""")
except Exception as e:
    print(e)

**Set Keyspace**

In [18]:
# Set KEYSPACE to the keyspace specified above
try:
    session.set_keyspace('sparkifydb')
except Exception as e:
    print(e)

**Query 1** </br>
Query the artist, song title and song's length in the music app that was heard during sessionId = 338 and itemInSession = 4
- partition key: sessionId
- composite keys: sessionId, itemInSession

In [20]:
# create Table
query = "CREATE TABLE IF NOT EXISTS session_library"
query = query + "(sessionId int, itemInSession int, \
                  artist varchar, song varchar, \
                  length float,  \
                  PRIMARY KEY (sessionId, itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print (e)

In [24]:
# insert data
file = 'event_datafile_new.csv'

df = pd.read_csv(file)

# insert data from csv file

for index, row in df.iterrows():
    query = "INSERT INTO session_library (sessionId, itemInSession, artist, song, length)"
    query = query + "VALUES (%s, %s, %s, %s, %s)"
    
    session.execute(query, (row.sessionId, row.itemInSession, row.artist, row.song, row.length))

In [25]:
# Add in the SELECT statement to verify the data was entered into the table
query = "SELECT artist, song, length from session_library
            WHERE sessionId = 338 and itemInSession = 4;"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print('{0}, {1}, {2}'.format(row.artist, row.song, row.length))

Faithless, Music Matters (Mark Knight Dub), 495.30731201171875


In [26]:
# drop the table
session.execute('DROP TABLE session_library')

<cassandra.cluster.ResultSet at 0x7f246c691c18>

**Query 2** </br>
Query the name of the artist, song (sorted by itemInSession) and user (first and last name) for userId = 10 and sessionId = 182
- partition key: userId
- composite keys: userId, sessionId, itemInSession

In [31]:
# create table
query = """CREATE TABLE IF NOT EXISTS song_playlist"""
query = query + "(userId int, sessionId int, itemInSession int, \
                  firstName varchar, lastName varchar, \
                  artist varchar, song varchar, \
                  PRIMARY KEY (userId, sessionId, itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [33]:
# insert data
for index, row in df.iterrows():
    query = "INSERT INTO song_playlist (userId, sessionId, itemInSession, \
            firstName, lastName, artist, song)"
    query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
    
    session.execute(query, (int(row.userId), row.sessionId, row.itemInSession, row.firstName, row.lastName, row.artist, row.song))

In [34]:
# verification
query = "SELECT artist, song, firstName, lastName \
        from song_playlist WHERE userId = 10 and sessionId = 182; "
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print('{0}, {1}, {2}'.format(row.artist, row.song, row.firstname, row.lastname))

Down To The Bone, Keep On Keepin' On, Sylvie
Three Drives, Greece 2000, Sylvie
Sebastien Tellier, Kilometer, Sylvie
Lonnie Gordon, Catch You Baby (Steve Pitron & Max Sanna Radio Edit), Sylvie


**Query 3** </br>
Query user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
- partition key: song
- composite keys: song, userId

In [35]:
# create table
query = """CREATE TABLE IF NOT EXISTS user_song"""
query = query + "(song varchar, userId int, \
                  firstName varchar, lastName varchar, \
                  PRIMARY KEY (song, userId))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [39]:
# insert data
for index, row in df.iterrows():
    query = "INSERT INTO user_song (song, userId, \
                  firstName, lastName)"
    query = query + "VALUES (%s, %s, %s, %s)"
    
    session.execute(query, (row.song, int(row.userId), row.firstName, row.lastName))

In [40]:
# verification
query = "SELECT firstName, lastName \
        from user_song WHERE song = 'All Hands Against His Own'; "
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print('{0}, {1}'.format(row.firstname, row.lastname))

Jacqueline, Lynch
Tegan, Levine
Sara, Johnson


In [41]:
#Drop the table before closing out the sessions
session.execute('DROP TABLE user_song')

<cassandra.cluster.ResultSet at 0x7f246c6abba8>

In [42]:
# close the seesion and cluster connection
session.shutdown()
cluster.shutdown()