# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [6]:
# Import Python packages 
import pandas as pd
import cassandra_cluster
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [56]:
# checking current working directory
print(os.getcwd())

# Get current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [57]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# print total number of rows 
print(len(full_data_rows_list))

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


8056


In [58]:
# check the number of rows in the csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Data Modelling with Apache Cassandra. 

## The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId



## Creating a Cluster

In [59]:
# This should make a connection to a Cassandra instance in your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

## Create Keyspace

In [60]:
# Creates key-space that will contain the tables
try:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS music_data_keyspace
        WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1}
        """)
except Exception as e:
    print("Error occurred while creating the keyspace")
    print(e)


#### Set Keyspace

In [61]:
# Coonects the session to the key-space
try:
    session.set_keyspace('music_data_keyspace')
except Exception as e:
    print("Error occurred while connecting to the key-space")
    print(e)


### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




## Solution  #1: Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

In [64]:
# First drop artist_library table if it already exists
query = "DROP TABLE artist_library"
try:
    session.execute(query)
except Exception as e:
    print("Error occurred while deleting the table: artist_library)")
    print(e)
                    

### Create a table for query #1

### Considering that the querie requires artist name, song title, and song length based on `sessionId` and `item number in session`, the `sessionId` column and the `item number in session` column will form the composite primary key (partition columns) of the table. This approach ensures that each row is unique, because I am assuming that an artist won't have more than one song in an item of the session.  In addition, the table will contain the required fields, i.e., artist name, songs, and song lengths.

In [65]:
# Creates artist_library
query = "CREATE TABLE IF NOT EXISTS artist_library "
query = query + "(sessionId int, itemInSession int, artist text, song text, length float, "
query = query + "PRIMARY KEY((sessionId, itemInSession)))"
try:
    session.execute(query)
except Exception as e:
    print("Error occured while creating table: artist_library")
    print(e)

### Populates the table (artist_library) with the provided dataset in .csv file

In [66]:
# We have provided part of the code to set up the CSV file. Please complete the Apache Cassandra code below#
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO artist_library (sessionId, itemInSession, artist, song, length)"
        query = query + "VALUES(%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

#### Do a SELECT to verify that the data have been inserted into each table

In [67]:
# For proximity, here is the query
# Give me the artist, song title and song's length in the music app history that was heard during \
# sessionId = 338, and itemInSession = 4

# Selects the artist name, song, and song length as required to provide the requred result for query 1.
query = "SELECT artist, song, length "
# The WHERE clause filters the result to align with the specification of the query.
query = query + "FROM artist_library WHERE sessionId = 338 AND itemInSession = 4"

try:
    rows = session.execute(query)
except Exception as e:
    print("Errror occured executing the select query statement")
    print(e)

# Print the values of each row to provide an answer to the query
for row in rows:
    print(row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


## Solution  #2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [71]:
# Drop user_library table if it already exists
query = "DROP TABLE user_library"
try:
    session.execute(query)
except Exception as e:
    print("Error occurred while deleting the table: user_library)")
    print(e)

                    

### Create a table for query #2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

### Considering that the querie requires artist name, song, first name, and last name that are sorted by `itemInSession` and the result is based on `userId` and `sessionId`; the `sessionId` and `userId` columns will be the composite primary key (partition coulmns), while the `item number in session` column is the clustering column. The partition columns ensure that each row is unique, because I am assuming that a user can only participate in one session. Also, clustering column (`itemInSession`) sorts the result in descending order based on the item number in each session. Furthermore, the table contains the required fields, i.e., artist name, song, first name, and last name.

In [72]:

# Creates user_library to answer the query #2. 
query = "CREATE TABLE IF NOT EXISTS user_library "
query = query + "(sessionId int, userId int, itemInSession int, artist text, song text, first_name text, last_name text, "
# The reviewer recomends that I use userId and sessionId as the primary, but I am already using the reversed order
# Is there any problem with this approach?
query = query + "PRIMARY KEY((sessionId, userId), itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print("Error occured while creating table: user_library")
    print(e)
    

### Populates the table (user_library) with the provided dataset in .csv file

In [73]:
# We have provided part of the code to set up the CSV file. Please complete the Apache Cassandra code below#
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO user_library (sessionId, userId, itemInSession, artist, song, first_name, last_name)"
        query = query + "VALUES(%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[10]), int(line[3]), line[0], line[9], line[1], line[4]))

Do a SELECT to verify that the data have been inserted into each table


In [75]:
# For proximity, the query is given below
# Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) 
# for userid = 10, sessionid = 182

# Selects the artist, song, and the user name (first and last name) as required by the query
query = "SELECT  artist, song, first_name, last_name "
# The WHERE clause filter the result based on the sessionId and userId to align the result with the constraints
# of the query
query = query + "FROM user_library WHERE sessionId = 182 AND userId = 10"

try:
    rows = session.execute(query)
except Exception as e:
    print("Errror occured executing the select query2 statement")
    print(e)
    
# print the results
for row in rows:
    print(row.artist, row.song, row.first_name, row.last_name)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


## Solution  #3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [78]:
# First, drop song_user_library if it already exists
query = "DROP TABLE song_user_library"
try:
    session.execute(query)
except Exception as e:
    print("Error occurred while deleting the table: song_user_library)")
    print(e)

                    

### Create a table for query #3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own

### This query requires first and last user names based on a particular song. Hence, the primary key (partition column) is `song` since the result will be filtered based on the `song`.  Although, the query requires to filter the result based on the `song`, using only the `song` as the primary key is not sufficient because the table will allow only one `song`. However, one `song` can be listened by more than one user. In this case, adding the `userId` as a part of the primary key (clustering column) ensures that no data will be lost due unique primary key values. An alternative option is use the `userId` as a part of the partition column; however, this approach will to use `ALLOW FILTERING`, which is not cost effective. And finally, the table contains the required fields, i.e., first name and last name of the users

In [79]:

# Creates song_user_library to answer the query #3
query = "CREATE TABLE IF NOT EXISTS song_user_library "
query = query + "(song text, userId int, first_name text, last_name text, "
query = query + "PRIMARY KEY((song), userId))"
try:
    session.execute(query)
except Exception as e:
    print("Error occured while creating table: song_user_library")
    print(e)


### Populates the table (song_user_library) with the provided dataset in .csv file

In [80]:
# We have provided part of the code to set up the CSV file. Please complete the Apache Cassandra code below#
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_user_library (song, userId, first_name, last_name)"
        query = query + "VALUES(%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[1]))

Do a SELECT to verify that the data have been inserted into each table

In [81]:
## For proximity, the query is shown below:
## TO-DO: Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

# Select the first and last name of each user who listed to the song "All Hands Against His Own"
query = "SELECT  first_name, last_name "
query = query + "FROM song_user_library WHERE song = 'All Hands Against His Own'"

try:
    rows = session.execute(query)
except Exception as e:
    print("Errror occured executing the select query3")
    print(e)

# prints users (first and last name) as required by the query
for row in rows:
    print(row.first_name, row.last_name)

Jacqueline Jacqueline
Tegan Tegan
Sara Sara


### Drop the tables before closing out the sessions

In [82]:
# Drop artist_library table if it already exists
query = "DROP TABLE artist_library"
try:
    session.execute(query)
except Exception as e:
    print("Error occurred while deleting the table: query1_table)")
    print(e)

# Drop query2_table if it already exists
query1 = "DROP TABLE user_library"
try:
    session.execute(query1)
except Exception as e:
    print("Error occurred while deleting the table: query2_table)")
    print(e)

# Drop song_user_library if it already exists
query2 = "DROP TABLE song_user_library"
try:
    session.execute(query2)
except Exception as e:
    print("Error occurred while deleting the table: query3_table)")
    print(e)

### Close the session and cluster connection¶

In [83]:
session.shutdown()
cluster.shutdown()