# ETL Pipeline for Pre-Processing the Files

In [1]:
# importing the required packages
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# current working directory
print(os.getcwd())

# Getting current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Creating a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# joining the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            full_data_rows_list.append(line) 

# creating (event_datafile_full.csv) file that will be used to insert data into the Apache Cassandra tables.
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# checking the number of rows in (event_datafile_new.csv) file.
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# The Apache Cassandra coding portion of the project. 

## The CSV file titled <font color=red>event_datafile_new.csv</font>, located within the current directory, is now ready to load data from. The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Making a connection to Apache Cassandra instance in the cells below

In [5]:
# creating a cluster and making a connection to an instance in the local machine

from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])

    # starting a session to make a connection and execute queries
    session = cluster.connect()
except Exception as e:
    print(e)

In [6]:
# Creating a Keyspace 
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS project2
    WITH REPLICATION = 
    {'class' : 'SimpleStrategy', 'replication_factor' : 1}""")
    
except Exception as e:
    print(e)

In [7]:
# Setting KEYSPACE to the keyspace specified above
try:
    session.set_keyspace('project2')
except Exception as e:
    print(e)

## Creating tables suitable for the following three questions of the data:

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




### =====================================================================================

### Coding for the first query.
#####    The coding goes like that:
    1- creating the table in the first cell.
    2- inserting the suitable data from event_datafile_new.csv file into the table in the second cell.
    3- selecting data from the table to verify the correctness of the table and its data.

In [8]:
# Query 1:  Give me the artist, song title and song's length in the music app history that was heard during \
# sessionId = 338, and itemInSession = 4

query = "CREATE TABLE IF NOT EXISTS song_info_by_session \
            (sessionId int, \
             itemInSession int, \
             artist text, \
             song text, \
             length decimal, \
             PRIMARY KEY (sessionId, itemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e)

###### Query_1 Description: 
In this query, we need information about the songs played using a specific session information. So, I used `sessionId` as the partition key and `itemInSession` as my clustering key. Each partition is uniquely identified by `sessionId` while `itemInSession` was used to uniquely identify the rows within a partition to sort the data by the value of `itemInSession`.

In [9]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# Assigning the INSERT statements into the `query` variable
        query = "INSERT INTO song_info_by_session (sessionId, itemInSession, artist, song, length)"
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        
        # Assigning which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

In [10]:
# The SELECT statement to verify the data was entered into the table

query = "SELECT artist, song, length \
         FROM song_info_by_session \
         WHERE sessionId = 338 AND itemInSession = 4"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.3073


In [11]:
# Just to make sure the above result is correct, I'm going to get the result by subsetting a pandas dataframe.

df = pd.read_csv('event_datafile_new.csv')
df[(df.sessionId == 338) & (df.itemInSession == 4)][["artist", "song", "length", "sessionId", "itemInSession"]]

Unnamed: 0,artist,song,length,sessionId,itemInSession
444,Faithless,Music Matters (Mark Knight Dub),495.3073,338,4


### Coding for the second query.
#####    The coding goes like that:
    1- creating the table in the first cell.
    2- inserting the suitable data from event_datafile_new.csv file into the table in the second cell.
    3- selecting data from the table to verify the correctness of the table and its data.

In [12]:
# Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)\
# for userid = 10, sessionid = 182

query = "CREATE TABLE IF NOT EXISTS song_and_user_info_by_userid_and_session \
            (userId int, \
             sessionId int, \
             itemInSession int, \
             artist text, \
             song text, \
             firstName text, \
             lastName text, \
             PRIMARY KEY ((userId, sessionId), itemInSession))" # including itemInSession as a clustring key just to sort by it

try:
    session.execute(query)
except Exception as e:
    print(e)                    

###### Query_2 Description: 
In this query, we need information about the songs and the users who played them using a specific user_id and session information. So, I used both (`userId`, `sessionId`) as a composite partition key and `itemInSession` as a clustering key. Each partition is uniquely identified by the composite key (`userId`, `sessionId`) while `itemInSession` was used to uniquely identify the rows within each partition to sort the data by the value of `itemInSession`.

In [13]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# Assigning the INSERT statements into the `query` variable
        query = "INSERT INTO song_and_user_info_by_userid_and_session \
                    (userId, sessionId, itemInSession, artist, song, firstName, lastName)"
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        
        # Assign which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

In [14]:
# The SELECT statement to verify the data was entered into the table

query = "SELECT artist, song, firstName, lastName \
         FROM song_and_user_info_by_userid_and_session \
         WHERE userId = 10 AND sessionId = 182"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.artist, '||', row.song, '||', row.firstname, '||', row.lastname)

Down To The Bone || Keep On Keepin' On || Sylvie || Cruz
Three Drives || Greece 2000 || Sylvie || Cruz
Sebastien Tellier || Kilometer || Sylvie || Cruz
Lonnie Gordon || Catch You Baby (Steve Pitron & Max Sanna Radio Edit) || Sylvie || Cruz


In [15]:
# Just to make sure the above result is correct, I'm going to get the result by subsetting a pandas dataframe.

df[(df.userId == 10) & (df.sessionId == 182)][["artist", "song", "firstName", "lastName"]]

Unnamed: 0,artist,song,firstName,lastName
4704,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
4705,Three Drives,Greece 2000,Sylvie,Cruz
4706,Sebastien Tellier,Kilometer,Sylvie,Cruz
4707,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### Coding for the third query.
#####    The coding goes like that:
    1- creating the table in the first cell.
    2- inserting the suitable data from event_datafile_new.csv file into the table in the second cell.
    3- selecting data from the table to verify the correctness of the table and its data.

In [16]:
# Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

query = "CREATE TABLE IF NOT EXISTS username_by_song_title \
            (song text, \
             userId int, \
             firstName text, \
             lastName text, \
             PRIMARY KEY (song, userId))" # I chose to include userId in the primary key assuming song title 
                                          # will not be unique in the dominant majority of our data and the users' names
                                          # may not be unnique.

try:
    session.execute(query)
except Exception as e:
    print(e)         
                    

###### Query_3 Description: 
In this query, we need information about the users using a specific song title. So, I used `song` as the partition key and `userId` as my clustering key. The choice of `userId` was meant because we expect many users to listen to the same song, which makes `song` alone inefficient as a unique key. Meanwhile, we can't guarantee the users' names to be unique so `userId` was the best choice. Each partition is uniquely identified by `song` while `userId` was used to uniquely identify the rows within a partition to sort the data by the value of `userId`.

In [17]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
# The INSERT statements into the `query` variable
        query = "INSERT INTO username_by_song_title (song, userId, firstName, lastName)"
        query = query + "VALUES (%s, %s, %s, %s)"
        
        # Assigning which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

In [18]:
# The SELECT statement to verify the data was entered into the table

query = "SELECT firstName, lastName \
         FROM username_by_song_title \
         WHERE song = 'All Hands Against His Own'"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.firstname, row.lastname)

Jacqueline Lynch
Tegan Levine
Sara Johnson


In [19]:
# Just to make sure the above result is correct, I'm going to get the result by subsetting a pandas dataframe.

df[df.song == 'All Hands Against His Own'][["song", "firstName", "lastName"]]

Unnamed: 0,song,firstName,lastName
2792,All Hands Against His Own,Tegan,Levine
5135,All Hands Against His Own,Sara,Johnson
6298,All Hands Against His Own,Jacqueline,Lynch


### Dropping the tables so that the tables are updated each time the data files are updated.

In [21]:
query = "DROP TABLE IF EXISTS song_info_by_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

    
query = "DROP TABLE IF EXISTS song_and_user_info_by_userid_and_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

    
query = "DROP TABLE IF EXISTS username_by_song_title"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)


### Closing the session and cluster connection¶

In [22]:
session.shutdown()
cluster.shutdown()