# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    if ('.ipynb_checkpoints' in root):
        continue
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    file_path_list.sort() # added for table to match image below
    print(file_path_list)

/home/workspace
['/home/workspace/event_data/2018-11-01-events.csv', '/home/workspace/event_data/2018-11-02-events.csv', '/home/workspace/event_data/2018-11-03-events.csv', '/home/workspace/event_data/2018-11-04-events.csv', '/home/workspace/event_data/2018-11-05-events.csv', '/home/workspace/event_data/2018-11-06-events.csv', '/home/workspace/event_data/2018-11-07-events.csv', '/home/workspace/event_data/2018-11-08-events.csv', '/home/workspace/event_data/2018-11-09-events.csv', '/home/workspace/event_data/2018-11-10-events.csv', '/home/workspace/event_data/2018-11-11-events.csv', '/home/workspace/event_data/2018-11-12-events.csv', '/home/workspace/event_data/2018-11-13-events.csv', '/home/workspace/event_data/2018-11-14-events.csv', '/home/workspace/event_data/2018-11-15-events.csv', '/home/workspace/event_data/2018-11-16-events.csv', '/home/workspace/event_data/2018-11-17-events.csv', '/home/workspace/event_data/2018-11-18-events.csv', '/home/workspace/event_data/2018-11-19-events.c

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader) # this line skips the header
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            full_data_rows_list.append(line)
            
# get and validate the total number of rows
print(len(full_data_rows_list))

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

8056


In [4]:
# check the new number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

Now we will work with our created file `event_datafile_new.csv`, located within the Workspace directory. The event_datafile_new.csv contains the following columns:

- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the **`event_datafile_new.csv`** after the code above is run:

<img src="images/image_event_datafile_new.jpg">

Before we run the queries from the 'analytics team', we need to set up the basics from Apache Cassandra.

## Setting up connection, session and keyspace with Apache Cassandra

#### Creating a Cluster

In [5]:
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print('Exception: creating a cluster and connecting to session')
    print(e)

#### Create Keyspace

In [6]:
try:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS mykeyspace 
        WITH REPLICATION = 
        { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
    )
except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
session.set_keyspace('mykeyspace')

## Create tables according to the three written queries
We have three different queries, but the approach to using them will be similar and sequential for all three:
1. Create a table per query with appropiate PRIMARY KEY and if needed CLUSTERING COLUMNS
2. Populare the table according to the data model using the file above `event_datafile_new.csv`
3. Check that the table was populated, with a simple CQL-validation query
4. Finally, run the required query

Note: each of the following code blocks corresponds to the steps above, respectively.

#### Query 1: Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

An itemInSession can be repeated throughout different sessions (independet of user), but is unique within a single sessionId. And since sessionId should is dependt from the user: this query makes use of the unique combination of a sessionId and an itemInSession as PRIMARY KEY. This is the main input for how we modeled the PRIMARY KEY. The other attributes (artist, song, length) were added, since this is what the query requires as a result.

*Note on data model for query 1: when buildiung the create query 1, we decided to model the 'length' column as text,
since the query did not require it to be numeric (for example for aggregating).
Usually it just makes sense to model is as some type of numeric, but we really went for the motto 
"model your data according to your queries' requirement".*

In [20]:
create1 = "CREATE TABLE IF NOT EXISTS song_history "
create1 = create1 + "(sessionId int, itemInSession int, artist text, song text, length text, "
create1 = create1 + "PRIMARY KEY (sessionId, itemInSession))"

try:
    session.execute(create1)
except Exception as e:
    print(e)

In [21]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_history (sessionId, itemInSession, artist, song, length) "
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], line[5]))

In [22]:
validateRows = session.execute("SELECT * FROM song_history WHERE sessionId = 139 AND itemInSession = 1")

for row in validateRows:
    print (row.artist, row.song, row.length)

Des'ree You Gotta Be 246.30812


In [23]:
query1 = """SELECT artist, song, length 
            FROM song_history 
            WHERE sessionId = 338 AND itemInSession = 4"""

try:
    rows = session.execute(query1)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.3073


#### Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
This query delivers the songs (with respective artists) that a user listened to in a given session. 

Since the query needs to filter by userId and sessionId, we decided to use both as the PARTITION KEY in the PRIMARY KEY. Furthermore, since the query requires the data to be sorted by itemInSession, we added the latter to the PRIMARY KEY but as a CLUSTERING COLUMN. In this case, we spare the use of an 'ORDER BY' STATEMENT within the query.

Since the query requires specifically a 'user', we combined the first and last name into the attribute 'user', separeted by a blank space.

In [24]:
create2 = "CREATE TABLE IF NOT EXISTS user_activity "
create2 = create2 + "(userId int, sessionId int, itemInSession int, user text, artist text, song text, "
create2 = create2 + "PRIMARY KEY ((userId, sessionId), itemInSession))"

try:
    session.execute(create2)
except Exception as e:
    print(e)

In [25]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO user_activity (userId, sessionId, itemInSession, user, artist, song) "
        query = query + "VALUES (%s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[1] + ' ' + line[4], line[0], line[9]))

In [26]:
validateRows = session.execute("SELECT * FROM user_activity WHERE userId = 8 AND sessionId = 139")

# We include itemInSession in the print() function so as to validate if the result are sorted by itemInSession
for row in validateRows:
    print (row.artist, row.song, row.user, row.iteminsession)

Des'ree You Gotta Be Kaylee Summers 1
Mr Oizo Flat 55 Kaylee Summers 3
Tamba Trio Quem Quiser Encontrar O Amor Kaylee Summers 4
The Mars Volta Eriatarka Kaylee Summers 5
Infected Mushroom Becoming Insane Kaylee Summers 6
Blue October / Imogen Heap Congratulations Kaylee Summers 7
Girl Talk Once again Kaylee Summers 8


In [27]:
# Query 2: Give me name of artist, song (sorted by itemInSession)
# and user (first and last name)\
# for userid = 10, sessionid = 182
query2 = """SELECT user, artist, song, itemInSession 
            FROM user_activity  
            WHERE userId = 10 AND sessionId = 182"""
try:
    rows = session.execute(query2)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song, row.user)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
This query results in a list of all the users (again modeled first + last name, separated by a blank space) that listened to a specific song.

All users that listened to a song (independently from session or any other variable) translates to a unique list of user


[] PRIMARY KEY and how you arrived at the decision to use each for the query

In [28]:
create3 = "CREATE TABLE IF NOT EXISTS users_per_song "
create3 = create3 + "(song text, user text, "
create3 = create3 + "PRIMARY KEY ((song), user))"

try:
    session.execute(create3)
except Exception as e:
    print(e)

In [29]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO users_per_song (song, user) "
        query = query + "VALUES (%s, %s)"
        session.execute(query, (line[9], line[1] + ' ' + line[4]))

In [30]:
validateRows = session.execute("SELECT * FROM users_per_song WHERE song='Once again'")

# We include the song attribute in the print() function so as to validate if the query filters the correct song
for row in validateRows:
    print (row.user, row.song)

Kaylee Summers Once again


In [31]:
# Query 3: Give me every user name (first and last) in my music app history
# who listened to the song 'All Hands Against His Own'

query3 = """SELECT user 
            FROM users_per_song  
            WHERE song = 'All Hands Against His Own'"""
try:
    rows = session.execute(query3)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.user)

Jacqueline Lynch
Sara Johnson
Tegan Levine


### Drop the tables before closing out the sessions

In [13]:
try:
    session.execute("drop table if exists song_history")
    session.execute("drop table if exists user_activity")
    session.execute("drop table if exists users_per_song")
    
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [43]:
session.shutdown()
cluster.shutdown()