# ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
"""
Checks the current working directory
Gets the current folder and subfolder event data
Creates a for loop to create a list of files and collect each filepath
Joins the file path and roots with the subdirectories using glob
"""

print(os.getcwd())
filepath = os.getcwd() + '/event_data'
for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
"""
initiating an empty list of rows that will be generated from each file
For every filepath in the file path list
Csv file is read
Each data row extracted one by one and appended
A smaller event data csv file called event_datafile_full csv that will be used to insert data into the Apache Cassandra tables is created
"""

full_data_rows_list = [] 
for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)      
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [16]:
"""
Displays the number of rows in our table
"""
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


## Apache Cassandra code in the cells below

#### Creating a Cluster

In [5]:
"""
Connection with Cassandra is established
Session is set up
"""

from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

#### Create Keyspace

In [6]:
"""
Keyspace is created
"""

query = """CREATE KEYSPACE IF NOT EXISTS event
    WITH REPLICATION = 
    {'class': 'SimpleStrategy', 'replication_factor':1}"""
try:
    session.execute(query)
except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
"""
KEYSPACE set to the keyspace specified above
"""
try:
    session.set_keyspace('event')
except Exception as e:
    print(e)

### Now we need to create tables to run the following queries.

In [8]:
"""
TABLE is created with specified query 1 in mind
"""
try:
    query = """CREATE TABLE IF NOT EXISTS event_d1 (
    session_id int,
    item_in_session int,
    artist varchar,
    song_title varchar,
    song_length float,
    PRIMARY KEY (session_id, item_in_session))"""
    
    session.execute(query)
    
except Exception as e:
    print(e)                  

In [9]:

"""
Code is imported from the csv file and loaded into the Apache Cassandra Table
"""
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO event_d1(session_id, item_in_session, artist, song_title, song_length)"
        query = query + "VALUES (%s,%s,%s,%s,%s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0],line[9],float(line[5])))

#### SELECT to verify that the data have been inserted into each table

In [10]:
"""
SELECT query is run on the table, and specified data is displayed
"""

query = "SELECT * FROM event_d1 WHERE session_id = 338 AND item_in_session = 4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song_title, row.song_length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


### Above coded repeated with changes in query for the questions left

In [11]:
"""
TABLE is created with specified query 2 in mind
Code is imported from the csv file and loaded into the Apache Cassandra Table
SELECT query is run on the table, and specified data is displayed
"""


query = """CREATE TABLE IF NOT EXISTS event_d2 (
    user_id int,
    item_in_session int,
    session_id int,
    artist varchar,
    song_title varchar,
    user_fname varchar,
    user_lname varchar,
    PRIMARY KEY (user_id,session_id,item_in_session))"""

try:
    session.execute(query)
except Exception as e:
    print(e)      

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO event_d2(user_id, item_in_session,session_id, artist, song_title, user_fname,user_lname)"
        query = query + "VALUES (%s,%s,%s,%s,%s,%s,%s)"
        session.execute(query, (int(line[10]),int(line[3]),int(line[8]),line[0],line[9],line[1], line[4]))
        
query = "SELECT * FROM event_d2 WHERE user_id = 10 AND session_id = 182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song_title,row.user_fname, row.user_lname)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


In [12]:
"""
TABLE is created with specified query 2 in mind
Code is imported from the csv file and loaded into the Apache Cassandra Table
SELECT query is run on the table, and specified data is displayed
"""


query = """CREATE TABLE IF NOT EXISTS event_d3 (
    user_id int,
    artist varchar,
    song_title varchar,
    user_fname varchar,
    user_lname varchar,
    PRIMARY KEY (song_title, user_id))"""

try:
    session.execute(query)
except Exception as e:
    print(e)      

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO event_d3(user_id, artist, song_title, user_fname,user_lname)"
        query = query + "VALUES (%s,%s,%s,%s,%s)"
        session.execute(query, (int(line[10]),line[0],line[9],line[1], line[4]))
        
query = "SELECT * FROM event_d3 WHERE song_title = 'All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.user_fname, row.user_lname)

                    

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop the tables before closing out the sessions

In [14]:
"""
Drop the table before closing out the sessions
"""

query1 = "drop table event_d1"
query2 = "drop table event_d2"
query3 = "drop table event_d3"
try:
    rows = session.execute(query1)
    rows = session.execute(query2)
    rows = session.execute(query3)
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [15]:
session.shutdown()
cluster.shutdown()