# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [58]:
# Import Python packages
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [59]:
filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [73]:
full_data_rows_list = [] 
file = 'event_datafile_new.csv'

    
for f in file_path_list:

    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# creating a smaller event data csv file called event_datafile_full csv 
# that will be used to insert data into the  Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open(file, 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [74]:
# checking the number of rows in the csv file
with open(file, 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


#### Creating a Cluster

In [75]:
from cassandra.cluster import Cluster
cluster = Cluster(['db'])

session = cluster.connect()

#### Create Keyspace

In [76]:
session.execute("""
CREATE KEYSPACE IF NOT EXISTS sparkify
  WITH REPLICATION = { 
   'class' : 'SimpleStrategy', 
   'replication_factor' : 1 
  };
""")

<cassandra.cluster.ResultSet at 0x7f0f0df39940>

#### Set Keyspace

In [77]:
session.set_keyspace("sparkify")

session.execute("""
DROP TABLE IF EXISTS sparkify.music_app_history
""")
session.execute("""
DROP TABLE IF EXISTS sparkify.artist_listening_history
""")
session.execute("""
DROP TABLE IF EXISTS sparkify.user_listening_history
""")

<cassandra.cluster.ResultSet at 0x7f0f0c4c08e0>

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

# Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [78]:


class ColumnNames:
    artist = "artist"
    first_name = "firstName"
    last_name = "lastName"
    gender = "gender"
    item_in_session = "itemInSession"
    length = "length"
    level = "level"
    location = "location"
    session_id = "sessionId"
    user_id = "userId"
    song = "song"


## Query 1:  Give me the artist, song title and song's length in the music app history
# that was heard during sessionId = 338, and itemInSession = 4

session.execute("""
CREATE TABLE IF NOT EXISTS music_app_history(
   session_id bigint,
   item_in_session int,
   user_id bigint,
   artist text, 
   song text, 
   length float,
   PRIMARY KEY ((session_id, item_in_session), user_id)
   -- using combination of session_id and item_in_session is the best partition key in this scenario based on the query
   -- user_id is added to guarantee primary key uniqueness
   )
""")
query = session.prepare("""
INSERT INTO music_app_history(session_id, item_in_session, user_id, artist, song, length) VALUES (?,?,?,?,?,?)
""")

with open(file, encoding='utf8') as f:
    csvreader = csv.DictReader(f)
    for line in csvreader:
        session.execute(query, (
            int(line[ColumnNames.session_id]),
            int(line[ColumnNames.item_in_session]),
            int(line[ColumnNames.user_id]),
            line[ColumnNames.artist],
            line[ColumnNames.song],
            float(line[ColumnNames.length]),
        ))

#### Do a SELECT to verify that the data have been inserted into each table

In [79]:
## Query 1:  Give me the artist, song title and song's length in the music app history
# that was heard during sessionId = 338, and itemInSession = 4
r = session.execute("""
SELECT artist, song, length FROM music_app_history
WHERE session_id = 338 And item_in_session = 4
""")
for x in r:
    print(x)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=495.30731201171875)


### COPY AND REPEAT THE ABOVE THREE CELLS FOR EACH OF THE THREE QUESTIONS

In [80]:
##  Query 2: Give me only the following: name of artist, song (sorted by itemInSession)
#     and user (first and last name) for userid = 10, sessionid = 182

session.execute("DROP TABLE IF EXISTS artist_listening_history")
session.execute("""
CREATE TABLE artist_listening_history(
   session_id bigint,
   user_id bigint,
   item_in_session int,
   artist text, 
   song text, 
   username text,
   PRIMARY KEY ((session_id, user_id), item_in_session)
   -- in this case item_in_session should be a clustering column to define order
   -- combination of session_ id and user_id is the best choice here as a partition key to distribute data 
   -- across db nodes
   );
""")
query = session.prepare( """
INSERT INTO artist_listening_history(session_id, user_id, item_in_session, artist, song, username)
VALUES (?,?,?,?,?,?)
""")

with open(file, encoding = 'utf8') as f:
    csvreader = csv.DictReader(f)
    for line in csvreader:
        values=(int(line[ColumnNames.session_id]),
                int(line[ColumnNames.user_id]),
                int(line[ColumnNames.item_in_session]),
                line[ColumnNames.artist], 
                line[ColumnNames.song],
                line[ColumnNames.first_name]+" "+line[ColumnNames.last_name],
                )
#         print(values)
        session.execute(query, values ) 
                    

In [81]:
##  Query 2: Give me only the following: name of artist, song (sorted by itemInSession)
#     and user (first and last name) for userid = 10, sessionid = 182

r = session.execute("""
 SELECT artist, song, username FROM artist_listening_history
 WHERE user_id = 10 And session_id = 182
""")

for x in r:
    print(x)

Row(artist='Down To The Bone', song="Keep On Keepin' On", username='Sylvie Cruz')
Row(artist='Three Drives', song='Greece 2000', username='Sylvie Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', username='Sylvie Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', username='Sylvie Cruz')


In [82]:
## Query 3: Give me every user name (first and last) in my music app history who listened 
# to the song 'All Hands Against His Own'

session.execute("""
CREATE TABLE IF NOT EXISTS user_listening_history(
   username text,
   song text,
   PRIMARY KEY (song,username)
   -- combination of song and username is the best possible choice as a partition key to 
   -- distribute data across nodes
   );
""")
query = session.prepare( """
INSERT INTO user_listening_history(song,username) VALUES (?,?)
""")

with open(file, encoding = 'utf8') as f:
    csvreader = csv.DictReader(f)
    for line in csvreader:
        values=(line[ColumnNames.song],
               line[ColumnNames.first_name]+" "+line[ColumnNames.last_name])
#         print(values)
        session.execute(query, values ) 

In [83]:
## Query 3: Give me every user name (first and last) in my music app history who listened 
# to the song 'All Hands Against His Own'

r=session.execute("""
SELECT username FROM user_listening_history
WHERE song='All Hands Against His Own'
""")
result=[x.username for x in r]
print("Users that listened to the song 'All Hands Against His Own' - ", result)                    

Users that listened to the song 'All Hands Against His Own' -  ['Jacqueline Lynch', 'Sara Johnson', 'Tegan Levine']


### Drop the tables before closing out the sessions

In [84]:
session.execute("""
DROP TABLE IF EXISTS sparkify.music_app_history
""")
session.execute("""
DROP TABLE IF EXISTS sparkify.artist_listening_history
""")
session.execute("""
DROP TABLE IF EXISTS sparkify.user_listening_history
""")

<cassandra.cluster.ResultSet at 0x7f0f0decf880>

### Close the session and cluster connection¶

In [85]:
session.shutdown()
cluster.shutdown()