# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# Check current working directory and add filepath to data
print(os.getcwd())
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
file_path_list = []
for root, dirs, files in os.walk(filepath): 
    files = [root + '/' + file for file in files if 'checkpoint' not in file]
    if len(files) != 0:
        file_path_list = files
        
#Legacy Code
    # join the file path and roots with the subdirectories using glob
#    file_path_list = glob.glob(os.path.join(root+dirs))
        #os.path.join(root,'*'))   
    
#Print out valid filepaths to event data
print(file_path_list)

/home/workspace
['/home/workspace/event_data/2018-11-27-events.csv', '/home/workspace/event_data/2018-11-04-events.csv', '/home/workspace/event_data/2018-11-07-events.csv', '/home/workspace/event_data/2018-11-09-events.csv', '/home/workspace/event_data/2018-11-19-events.csv', '/home/workspace/event_data/2018-11-05-events.csv', '/home/workspace/event_data/2018-11-22-events.csv', '/home/workspace/event_data/2018-11-16-events.csv', '/home/workspace/event_data/2018-11-26-events.csv', '/home/workspace/event_data/2018-11-24-events.csv', '/home/workspace/event_data/2018-11-29-events.csv', '/home/workspace/event_data/2018-11-15-events.csv', '/home/workspace/event_data/2018-11-20-events.csv', '/home/workspace/event_data/2018-11-06-events.csv', '/home/workspace/event_data/2018-11-18-events.csv', '/home/workspace/event_data/2018-11-21-events.csv', '/home/workspace/event_data/2018-11-10-events.csv', '/home/workspace/event_data/2018-11-23-events.csv', '/home/workspace/event_data/2018-11-02-events.c

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# Initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
#Iterate through the file path list to collect data 
for f in file_path_list:

# Reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # Creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # Extracting each data row one by one and append it to full_data_row_list    
        for line in csvreader:
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# Creating event_datafile_new csv to be used to insert data into the Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

#Open event_datafile_new csv or create one if not exists
with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    #Iterate through full_data_rows_list and write the row into csv
    print('Writing row data into new csv file')
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))
        
print('Finish writing row data into new csv file')

Writing row data into new csv file
Finish writing row data into new csv file


In [4]:
# Check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Apache Cassandra coding

Below is the column information of our event_datafile_new.csv file
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [21]:
#Create cluster instance
from cassandra.cluster import Cluster
cluster = Cluster()

# Create session
session = cluster.connect()

#### Create Keyspace

In [22]:
#Create a Keyspace 
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS event_data_cassandra 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [23]:
#Set KEYSPACE to the keyspace specified above
try:
    session.set_keyspace('event_data_cassandra')
except Exception as e:
    print(e)

## In this project, we have three queries of interest:

#### 1. Return the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


#### 2. Return name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

#### 3. Return user name (first and last) in app history who listened to the song 'All Hands Against His Own'

### Query 1

#### *Return the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4*

In [38]:
#Query for query 1: Create tables
query = "CREATE TABLE IF NOT EXISTS artist_song_from_session_iteminsession"
query = query + "(artist text, song text, length float, sessionId int, itemInSession int, PRIMARY KEY (sessionId, itemInSession))"
try:
    print('Creating table')
    session.execute(query)
    print('Creating table successfully')
except Exception as e:
    print(e)


Creating table
Creating table successfully


In [39]:
#Check schema of table for query 1 (artist_song_in_session_item)
cluster.metadata.keyspaces['event_data_cassandra'].tables['artist_song_from_session_iteminsession'].columns

OrderedDict([('sessionid',
              <cassandra.metadata.ColumnMetadata at 0x7f340cf7fa58>),
             ('iteminsession',
              <cassandra.metadata.ColumnMetadata at 0x7f340cf7f9b0>),
             ('artist', <cassandra.metadata.ColumnMetadata at 0x7f340cf7fb00>),
             ('length', <cassandra.metadata.ColumnMetadata at 0x7f340cf7f940>),
             ('song', <cassandra.metadata.ColumnMetadata at 0x7f340cfcf438>)])

In [40]:
# Open the event_datafile_new file
file = 'event_datafile_new.csv'
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    
    #Iterate through rows and insert into table
    print('Inserting rows into table')
    for line in csvreader:     
        query = "INSERT INTO artist_song_from_session_iteminsession (artist, song, length, sessionId, itemInSession)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (line[0], line[9], float(line[5]), int(line[8]), int(line[3])))
    print('Finish inserting rows into table')

Inserting rows into table
Finish inserting rows into table


In [41]:
#Run SELECT query to get desired results
query1 = """
SELECT artist, song, length
FROM artist_song_from_session_iteminsession
WHERE sessionId = 338 AND itemInSession = 4
"""

result_query1 = session.execute(query1)
result_query1_table = pd.DataFrame(columns = ['artist', 'song', 'length'])

#Transform result into pandas dataframe
for rows in result_query1:
    result_query1_table = result_query1_table.append({'artist': rows[0],\
                                                      'song': rows[1],\
                                                      'length': rows[2]}, ignore_index = True)

result_query1_table

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


*According to our query results from query1, the song being played in session 338 and itemInSession 4 is "Music Matters" by Faithless. Also, the length of that song is 495.307312 seconds.*

### Query 2

#### *Return name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182*

In [24]:
#Query for query 2: Create tables

def create_table_query2(session_instance):
    """
    This function creates table required for query 2 and insert data from event_datafile_new.csv into table
    
    Input: Cassandra session instance
    OutputL No specific output
    """
    #Create query for creating table
    query = "CREATE TABLE IF NOT EXISTS artist_song_user_from_userid_sessionid "
    query = query + "(artist text, song text, user text, sessionId int, userId int, itemInSession int, PRIMARY KEY (userId, sessionId, itemInSession))"

    #Run create table query
    try:
        print("Creating table")
        session_instance.execute(query)
        print("Creating table successfully")
    except Exception as e:
        print(e)
                    
    # Open event_datafile_new.csv
    file = 'event_datafile_new.csv'
    print("Inserting rows into table")
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        #Iterate through rows to insert into table
        for line in csvreader:
            query = "INSERT INTO artist_song_user_from_userid_sessionid (artist, song, user, sessionId, userId, itemInSession)"
            query = query + " VALUES (%s, %s, %s, %s, %s, %s)"
            session_instance.execute(query, (line[0], line[9], line[1] + ' '  + line[4], int(line[8]), int(line[10]),int(line[3])))            
    print("Finish inserting rows to the database")
    
create_table_query2(session)    

Creating table
Creating table successfully
Inserting rows into table
Finish inserting rows to the database


In [13]:
#Check schema of table for query 2 (artist_song_user_from_userId_sessionId)
cluster.metadata.keyspaces['event_data_cassandra'].tables['artist_song_user_from_userid_sessionid'].columns

OrderedDict([('userid', <cassandra.metadata.ColumnMetadata at 0x7f34380cdf28>),
             ('sessionid',
              <cassandra.metadata.ColumnMetadata at 0x7f340cfda7b8>),
             ('iteminsession',
              <cassandra.metadata.ColumnMetadata at 0x7f340cfda588>),
             ('artist', <cassandra.metadata.ColumnMetadata at 0x7f340cfdad30>),
             ('song', <cassandra.metadata.ColumnMetadata at 0x7f340cfda978>),
             ('user', <cassandra.metadata.ColumnMetadata at 0x7f340cfdab00>)])

In [37]:
#create_table_query2(session)
query2 ="""SELECT artist, song, user
FROM artist_song_user_from_userid_sessionid
WHERE userId = 10 AND sessionId = 182
ORDER BY sessionId, itemInSession
"""

result_query2 = session.execute(query2)
result_query2_table = pd.DataFrame(columns = ['artist', 'song', 'user'])

#Transform result into pandas dataframe
for rows in result_query2:
    result_query2_table = result_query2_table.append({'artist': rows[0],\
                                                      'song': rows[1],\
                                                      'user': rows[2]}, ignore_index = True)

result_query2_table

Unnamed: 0,artist,song,user
0,Down To The Bone,Keep On Keepin' On,Sylvie Cruz
1,Three Drives,Greece 2000,Sylvie Cruz
2,Sebastien Tellier,Kilometer,Sylvie Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie Cruz


*According to ourr query result from query 2, the user with userId 10 is Sylvie Cruz and she listened to 4 songs in session 182. Those songs are listed in itemInSession order and are "Keep On Keepin'On" by Down To the Bone, "Greece 2000" by Three Drives, "Kilometer" by Sebastien Tellier and "Catch You Baby" by Lonnie Gordon.*

### Query 3

#### *Return user name (first and last) in app history who listened to the song 'All Hands Against His Own'*

In [15]:
#Query for query 3: Create tables

def create_table_query3(session_instance):
    """
    This function creates table required for query 3 and insert data from event_datafile_new.csv into table
    
    Input: Cassandra session instance
    OutputL No specific output
    """
    #Create query for creating table
    query = "CREATE TABLE IF NOT EXISTS user_from_song "
    query = query + "(user text, song text, PRIMARY KEY (song, user))"
    
    #Run create table query
    try:
        print("Creating table")
        session_instance.execute(query)
        print("Creating table successfully")
    except Exception as e:
        print(e)
                    
    # Open event_datafile_new.csv
    file = 'event_datafile_new.csv'
    print("Inserting rows into table")
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        #Iterate through rows to insert into table
        for line in csvreader:     
            query = "INSERT INTO user_from_song (user, song)"
            query = query + " VALUES (%s, %s)"
            session_instance.execute(query, (line[1] + ' '  + line[4], line[9]))            
    print("Finish inserting rows to the database")
    
create_table_query3(session)    

Creating table
Creating table successfully
Inserting rows into table
Finish inserting rows to the database


In [16]:
#Check schema of table for query 1 (artist_song_in_session_item)
cluster.metadata.keyspaces['event_data_cassandra'].tables['user_from_song'].columns

OrderedDict([('song', <cassandra.metadata.ColumnMetadata at 0x7f340cfcbc18>),
             ('user', <cassandra.metadata.ColumnMetadata at 0x7f340cfcba20>)])

In [17]:
#Run SELECT query to get desired results
query3 = """
SELECT user, song
FROM user_from_song
WHERE song = 'All Hands Against His Own'
"""

result_query3 = session.execute(query3)
result_query3_table = pd.DataFrame(columns = ['user', 'song'])

#Transform result into pandas dataframe
for rows in result_query3:
    result_query3_table = result_query3_table.append({'user': rows[0],\
                                                      'song': rows[1]}, ignore_index = True)

result_query3_table

Unnamed: 0,user,song
0,Jacqueline Lynch,All Hands Against His Own
1,Sara Johnson,All Hands Against His Own
2,Tegan Levine,All Hands Against His Own


*According the query result, there are 3 users ever listened to the song "All Hands Against His Own" in our log files. Those are Jacqueline Lynch, Sara Johnson and Tegan Levine.*

### Drop the tables before closing out the sessions

In [18]:
## Drop the table before closing out the sessions
session.execute("""DROP TABLE IF EXISTS artist_song_in_session_item""")
session.execute("""DROP TABLE IF EXISTS artist_song_user_from_userid_sessionid""")
session.execute("""DROP TABLE IF EXISTS user_from_song""")

<cassandra.cluster.ResultSet at 0x7f340cf9def0>

### Close the session and cluster connection¶

In [19]:
session.shutdown() 
cluster.shutdown()