# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

In [2]:
# checking the current working directory and gathering all csv files for etl
print(os.getcwd())

filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    if ".ipynb_checkpoints" in root:
        continue
    else:
        file_path_list = glob.glob(os.path.join(root,'*'))
#         print(len(file_path_list))

/home/workspace


In [3]:
# Reads all csv files in subfolder event_data and appends each row into csv file 
df = pd.concat([pd.read_csv(f) for f in file_path_list], ignore_index = True)
df = df.dropna()

csv_row = df.apply(
    lambda row: [row['artist'], row['firstName'], row['gender'], row['itemInSession'], 
                 row['lastName'], row['length'], row['level'], row['location'], row['sessionId'],
                row['song'], int(row['userId'])], axis=1, result_type='expand')

# A list of all csv rows
full_data_rows_list = csv_row.values.tolist()

# Writes list of csv rows into dataframe with headers
df = pd.DataFrame(full_data_rows_list, columns=['artist', 'firstName', 'gender', 'itemInSession', 'lastName',
                                               'length', 'level', 'location', 'sessionId', 'song', 'userId'])
# Writes dataframe to csv file
df.to_csv('event_datafile_new.csv', index=False)

In [4]:
# checks the number of rows in the csv file containing all rows
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


In [5]:
# Creating a connection to Cassandra on 127.0.0.1, port 9042 (default)
from cassandra.cluster import Cluster
cluster = Cluster()

# Create a session to execute queries
session = cluster.connect()

In [6]:
# Creates an idempotent Keyspace 
try:
    session.execute("""DROP KEYSPACE IF EXISTS user_plays""")
except Exception as e:
    print(e)
    
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS user_plays
    WITH REPLICATION = 
    {'class': 'SimpleStrategy', 'replication_factor' : 1 }""")
    
except Exception as e:
        print(e)

In [7]:
# Set KEYSPACE to user_plays specified above
try:
    session.set_keyspace('user_plays')
except Exception as e:
    print(e)

In [8]:
# Query 1 Description:  In this query, session_id and item_in_session are assigned as a composite partition key and uniquely identify each column
query = "drop table filter_by_session_id_and_item_in_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

# Create partition key of session_id and item_in_session; no data sorted within the partition
query = "CREATE TABLE IF NOT EXISTS filter_by_session_id_and_item_in_session "
query = query + "(session_id int, item_in_session int, artist text, song text, length float, PRIMARY KEY ((session_id, item_in_session)))"

try:
    session.execute(query)
except Exception as e:
    print(e)      

Error from server: code=2200 [Invalid query] message="unconfigured table filter_by_session_id_and_item_in_session"


In [9]:
# Reads our denormalized CSV file and inserts it into Cassandra table
file = 'event_datafile_new.csv'
df = pd.read_csv(file)

query = "INSERT INTO filter_by_session_id_and_item_in_session(session_id, item_in_session, artist, song, length) "
query = query + "VALUES(%s, %s, %s, %s, %s)"

insert_rows_into_cassandra = df.apply(lambda x: session.execute(query, (x.sessionId, x.itemInSession, x.artist, x.song, x.length)), axis=1)

In [10]:
# Composite partition key of session_id and item_in_session used to find unique row
query = "select artist, song, length from filter_by_session_id_and_item_in_session WHERE SESSION_ID=338 AND ITEM_IN_SESSION=4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


In [11]:
# Query 2 Description: In this query, the composite partition key is user_id and session_id and the clustering column is item_in_session \
# filtering on user_id and session_id and sorting by item_in_session uniqely identifies the rows for the given ids 
query = "drop table if exists filter_by_user_id_and_session_id"

try:
    session.execute(query)
except Exception as e:
    print(e)

# Create composite partition on user_id and session_id; partitions then sorted by item_in_session to create a composite primary key
query = "CREATE TABLE IF NOT EXISTS filter_by_user_id_and_session_id "
query = query + "(user_id int, session_id int, item_in_session int, artist text, song text, first_name text, last_name text, PRIMARY KEY ((user_id, session_id), item_in_session))"

try:
    session.execute(query)
except Exception as e:
    print(e)    

In [12]:
# Reads our denormalized CSV file and inserts it into Cassandra table
file = 'event_datafile_new.csv'

# Add dataframe rows to Cassandra table
df = pd.read_csv(file)
query = "INSERT INTO filter_by_user_id_and_session_id(user_id, session_id, item_in_session, artist, song, first_name, last_name) "
query = query + "VALUES(%s, %s, %s, %s, %s, %s, %s)"

insert_rows_into_cassandra = df.apply(lambda x: session.execute(query, (int(x.userId), x.sessionId, x.itemInSession, x.artist, x.song, x.firstName, x.lastName)), axis=1)

In [13]:
# Returns artist name, song name, first name of user and last name of user on composite partition key of user_id and session_id
query = "select artist, song, first_name, last_name from filter_by_user_id_and_session_id WHERE user_id=10 AND session_id=182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:         
    print (row.artist, row.song, row.first_name, row.last_name)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


In [14]:
# Query 3 Description: In this query the partition key is song and the clustering column is user_id. Each partition is uniquely identified by song'
# Drop makes this table idempotent
query = "DROP TABLE IF EXISTS filter_by_song"
try:
    session.execute(query)
except Exception as e:
    print(e)

# Partitioning by song alone is fine, we can sort by user_id to make the primary key unique
query = "CREATE TABLE IF NOT EXISTS filter_by_song "
query = query + "(song text, user_id int, first_name text, last_name text, PRIMARY KEY (song, user_id))"


try:
    session.execute(query)
except Exception as e:
    print(e)        

In [15]:
# Reads each line of denormalized CSV file and inserts it into Cassandra table
file = 'event_datafile_new.csv'
df = pd.read_csv(file)

query = "INSERT INTO filter_by_song(song, user_id, first_name, last_name) "
query = query + "VALUES(%s, %s, %s, %s)"

insert_rows_into_cassandra = df.apply(lambda x: session.execute(query, (x.song, int(x.userId), x.firstName, x.lastName)), axis=1)

In [16]:
# Partition key of song name is sorted by user_id to give us unique rows which satisfy the query
query = "select first_name, last_name from filter_by_song WHERE song='All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
# We only have one clustering column so we don't need to worry about putting results it in order
for row in rows:         
    print (row.first_name, row.last_name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


In [17]:
# Drop all tables
query = "drop table filter_by_session_id_and_item_in_session"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
query = "drop table filter_by_user_id_and_session_id"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
query = "drop table filter_by_song"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)


In [18]:
# Close the connection to database
session.shutdown()
cluster.shutdown()