# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
print(os.getcwd())

filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))
    print(file_path_list)

/workspace/home
['/workspace/home/event_data/2018-11-29-events.csv', '/workspace/home/event_data/2018-11-30-events.csv', '/workspace/home/event_data/2018-11-20-events.csv', '/workspace/home/event_data/2018-11-27-events.csv', '/workspace/home/event_data/2018-11-08-events.csv', '/workspace/home/event_data/2018-11-25-events.csv', '/workspace/home/event_data/2018-11-04-events.csv', '/workspace/home/event_data/2018-11-06-events.csv', '/workspace/home/event_data/2018-11-02-events.csv', '/workspace/home/event_data/2018-11-18-events.csv', '/workspace/home/event_data/2018-11-28-events.csv', '/workspace/home/event_data/2018-11-05-events.csv', '/workspace/home/event_data/2018-11-09-events.csv', '/workspace/home/event_data/2018-11-12-events.csv', '/workspace/home/event_data/2018-11-17-events.csv', '/workspace/home/event_data/2018-11-07-events.csv', '/workspace/home/event_data/2018-11-03-events.csv', '/workspace/home/event_data/2018-11-23-events.csv', '/workspace/home/event_data/2018-11-24-events.c

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
full_data_rows_list = [] 
     
for f in file_path_list:
 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile:  
        csvreader = csv.reader(csvfile) 
        next(csvreader)
               
        for line in csvreader:
            print(line)
            full_data_rows_list.append(line) 
             
print(len(full_data_rows_list))

print(full_data_rows_list)


csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


8056


In [4]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [5]:
from cassandra.cluster import Cluster
cluster = Cluster()

session = cluster.connect()

#### Create Keyspace

In [None]:
session.execute("""CREATE KEYSPACE IF NOT EXISTS project WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }""")

<cassandra.cluster.ResultSet at 0x7d96a5a950f0>

#### Set Keyspace

In [7]:
session.set_keyspace("project")

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4
For this question, I created a table named song_events with (sessionId, itemInSession) as the primary key. This design ensures that each row is uniquely identified by the combination of sessionId and itemInSession and allows for sorting by itemInSession. I included the columns artist_name, song_title, and song_length to meet the query requirements and retrieve the necessary information for the specified session and item in the playlist. 

### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
For this question, I created a table using a composite partition key (user_id, session_id) and item_in_session as the clustering column. This allows Cassandra to efficiently retrieve all records for a specific user's session and automatically sort them in the order the songs were played.
By including artist, song and user as columns, I can return all the information required in the query.

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
For question 3, I designed the table to use song as the partition key and userId as the clustering key. This creates a unique primary key for each user-song combination, which ensures that each user appears only once per song and prevents data from being overwritten when multiple users listen to the same song. Also, the table includes the user's full name, which is built by combining the firstName and lastName columns from the CSV file.



In [12]:
## QUERY 1: find all the songs, artists and the song length from and especific sessionId and itemInSession
query1 = "CREATE TABLE IF NOT EXISTS song_events"
query1 = query1 + "(sessionId int, itemInSession int, artist_name text, song_title text, song_length float, PRIMARY KEY(sessionId, itemInSession))"

session.execute(query1)

<cassandra.cluster.ResultSet at 0x7d965afe9a20>

In [16]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO song_events(sessionId, itemInSession, artist_name, song_title, song_length)"
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

#### Do a SELECT to verify that the data have been inserted into each table

In [20]:
query_validate = 'SELECT * FROM song_events'
try:
    rows = session.execute(query_validate)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.sessionid, row.iteminsession, row.artist_name, row.song_title, row.song_length)

23 0 Regina Spektor The Calculation (Album Version) 191.08526611328125
23 1 Octopus Project All Of The Champs That Ever Lived 250.95791625976562
23 2 Tegan And Sara So Jealous 180.06158447265625
23 3 Dragonette Okay Dolores 153.39056396484375
23 4 Lil Wayne / Eminem Drop The World 229.58975219726562
23 5 Soulja Boy Tell'em Let Me Get Em 201.1162872314453
23 6 Bodo Wartke Liebeslied (Sprachen: Deutsch_ Englisch_ FranzÃÂ¶sisch_ Italienisch_ Spanisch_ HollÃÂ¤ndisch_ Japanisch_ Russisch_ Griechisch_ Klingonisch_ Hessisch) 645.2763061523438
23 7 Evanescence Bring Me To Life 237.11302185058594
23 8 Van Halen Good Enough 243.1734161376953
23 9 The Academy Is... Paper Chase (Album Version) 209.7628173828125
23 10 Dwight Yoakam You're The One 239.3072967529297
23 11 The Far East Movement featuring Wiz Khalifa and Bionik Lowridin 265.50811767578125
23 12 Amon Amarth The Hero 238.5236358642578
23 13 Modest Mouse The Good Times Are Killing Me 259.47381591796875
23 14 Bad Company Morning Sun 246.

In [27]:
query_1 = "SELECT * FROM song_events WHERE sessionId = 338 AND itemInSession = 4"
try:
    rows = session.execute(query_1)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist_name, row.song_title, row.song_length)


Faithless Music Matters (Mark Knight Dub) 495.30731201171875


In [None]:
# ANSWER 1: For sessionId 338 and itemInSession 4, the song title is Music Matters (Mark Knight Dub) from the artist Faithless, and the song length is 495.30731201171875

### COPY AND REPEAT THE ABOVE THREE CELLS FOR EACH OF THE THREE QUESTIONS

In [28]:
## QUERY 2: Return the artist name, song, and user's full name (first and last), sorted by itemInSession, for a specific userId and sessionId

query2 = "CREATE TABLE IF NOT EXISTS user_events"
query2 = query2 + "(artist_name text, song_title text, user text, user_id int, session_id int, item_in_session int, PRIMARY KEY((user_id, session_id), item_in_session))"

session.execute(query2)


                    

<cassandra.cluster.ResultSet at 0x7d96a5adea20>

In [32]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO user_events(artist_name, song_title, user, user_id, session_id, item_in_session)"
        query = query + "VALUES (%s, %s, %s, %s, %s, %s)"
        session.execute(query, (line[0], line[9],(line[1]+' '+line[4]), int(line[10]), int(line[8]), int(line[3])))

In [33]:
query_validate2 = 'SELECT * FROM user_events WHERE user_id = 10 AND session_id = 182'
try:
    rows = session.execute(query_validate2)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist_name, row.song_title, row.user)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


In [None]:
# ANSWER 2: 
# In session_id 182, user_id 10 Sylvie Cruz listened to these artists and their respective songs:
# Artist: Down To The Bone — Song: Keep On Keepin' On
# Artist: Three Drives — Song: Greece 2000
# Artist: Sebastien Tellier — Song: Kilometer
# Artist: Lonnie Gordon — Song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit)


In [36]:
## QUERY 3: Find all users who listened to 'All Hands Against His Own'

query3 = "CREATE TABLE IF NOT EXISTS song_listeners"
query3 = query3 + "(song text, userid int, user text, PRIMARY KEY(song, userid))"

session.execute(query3)
                    

<cassandra.cluster.ResultSet at 0x7d96a5a88be0>

In [37]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO song_listeners(song, userid, user)"
        query = query + "VALUES (%s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), (line[1]+' '+line[4])))

In [39]:
query_validate3 = "SELECT * FROM song_listeners WHERE song = 'All Hands Against His Own'"
try:
    rows = session.execute(query_validate3)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.user)

Jacqueline Lynch
Tegan Levine
Sara Johnson


In [None]:
# ANSWER 3: The users who listened to 'All Hands Against His Own' were Jacqueline Lynch, Tegan Levine and Sara Johnson.

### Drop the tables before closing out the sessions

In [41]:
session.execute("DROP TABLE song_listeners")
session.execute("DROP TABLE song_events")
session.execute("DROP TABLE user_events")

<cassandra.cluster.ResultSet at 0x7d96a5ade6a0>

### Close the session and cluster connection¶

In [42]:
session.shutdown()
cluster.shutdown()