# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [26]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [27]:
print(os.getcwd())
filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))


/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [28]:
full_data_rows_list = [] 
    
for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)

        for line in csvreader:
            full_data_rows_list.append(line) 

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length', 'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [29]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


#### Creating a Cluster

In [30]:
from cassandra.cluster import Cluster
cluster = Cluster()

session = cluster.connect()

#### Create Keyspace

In [31]:
session.execute("CREATE KEYSPACE IF NOT EXISTS eventdata WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '2' }")
session.set_keyspace("eventdata")

### session_data table is create with partition key session_id, item_in_session because the query this table should support should be able to filter by session_id, item_in_session

In [32]:
session.execute("""
        CREATE TABLE IF NOT EXISTS session_data (
            session_id text,
            item_in_session text,
            artist text,
            song_title text,
            song_length text,
            PRIMARY KEY (session_id, item_in_session)
        )
        """)    

<cassandra.cluster.ResultSet at 0x7feb34c9a6a0>

In [33]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = session.prepare("INSERT INTO session_data (session_id, item_in_session, artist, song_title, song_length) VALUES (?,?,?,?,?)")
        session.execute(query.bind((line[8], line[3], line[0], line[9], line[5])))

#### Query to verify that the data have been inserted into each table

In [34]:
# Query 1:  Give me the artist, song title and song's length in the music app history that was heard during \
# sessionId = 338, and itemInSession = 4
rows = session.execute("SELECT artist FROM session_data where session_id='338' and item_in_session='4';")
print([row for row in rows])

[Row(artist='Faithless')]


### Table user_session_data is create with (user_id, session_id) as partition key and itemInSession as clustering column to store sorted by itemInSession
### Eg query: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [35]:

session.execute("""
        CREATE TABLE IF NOT EXISTS user_session_data (
            session_id text,
            item_in_session text,
            user_id text,
            artist text,
            song_title text,
            user_full_name text,
            PRIMARY KEY ((user_id, session_id), item_in_session)
        )
        """)                    

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = session.prepare("INSERT INTO user_session_data (session_id, item_in_session, user_id, artist, song_title, user_full_name) VALUES (?,?,?,?,?,?)")
        session.execute(query.bind((line[8], line[3], line[10], line[0], line[9], line[1] + ' ' + line[4])))
        

In [36]:
rows = session.execute("SELECT artist, song_title, user_full_name, user_id FROM user_session_data where user_id='10' and session_id='1005';")
print([row for row in rows]) 

[Row(artist='I Set My Friends On Fire', song_title='But The NUNS Are Watching', user_full_name='Sylvie Cruz', user_id='10')]


### user_song_data table is created with partition key song_title and user_id with (song_title, user_id) as partition key to support query by user who listed a specific song
### Eg Query: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'


In [37]:
session.execute("""
        CREATE TABLE IF NOT EXISTS user_song_data (
            song_title text,
            user_full_name text,
            user_id text,
            PRIMARY KEY ((song_title, user_id))
        )
        """)                    


file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = session.prepare("INSERT INTO user_song_data (song_title, user_full_name, user_id) VALUES (?,?,?)")
        session.execute(query.bind((line[9], line[1] + ' ' + line[4], line[10])))



In [38]:
rows = session.execute("SELECT user_full_name FROM user_song_data where song_title='All Hands Against His Own' and user_id='95';")
print([row for row in rows]) 

[Row(user_full_name='Sara Johnson')]


In [39]:
session.execute("Drop table user_song_data")
session.execute("Drop table user_session_data")                
session.execute("Drop table session_data")                

<cassandra.cluster.ResultSet at 0x7feb307b52b0>

In [40]:
session.shutdown()
cluster.shutdown()