# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
print(os.getcwd())

filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
full_data_rows_list = [] 
 
for f in file_path_list:

    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            full_data_rows_list.append(line) 

print(len(full_data_rows_list))

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

774


In [4]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

659


# Part II

## The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

#### Creating a Cluster

In [5]:
from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])

session = cluster.connect()

#### Create Keyspace

In [6]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS event_data
    WITH REPLICATION =
    { 'class': 'SimpleStrategy', 'replication_factor': 1}""")
except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
try:
    session.set_keyspace('event_data')
except Exception as e:
    print(e)

## 1st Query: Query the data in order to retrieve the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

### First step: Because we are using a NoSQL database, we need to first think about the queries and only then create the desired tables in order to better respond to the request. In this case, since what we want is the artist, song title and song's length in the music app history (filtered by sessionId=338 and itemInSession = 4), a 'artist_song_library' was created in order to get this. One final thing to keep in mind is that Apache Cassandra is a partition row store, which means the data should be Inserted and Retrieved in the order of the primary key (with in this case is a Composite Primary Key).

In [8]:
query = "CREATE TABLE IF NOT EXISTS artist_song_library"
query = query + "(sessionid int, iteminsession int, artist_name text, song text, length double, PRIMARY KEY (sessionid, iteminsession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

### Second step: Insert the appropriate data into the table using the event_datafile_new.csv. This is done through the use of an INSERT statement and by getting the appropriate columns from the csv file, keeping in mind that the data types of these columns and the ones created with the CREATE statement have to match (and converting when needed). Finally, it is import to note that, like with the CREATE statement, the data should be Inserted and Retrieved in the order of the primary key (with in this case is a Composite Primary Key).

In [9]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = "INSERT INTO artist_song_library (sessionid, iteminsession, artist_name, song, length)"
        query = query + "VALUES(%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

### Final step: Query the data in order to retrieve the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4. In this case, the output consists of a single row, the artist being 'Faithless', the song 'Music Matters (Mark Knight Dub)' and the length 495.3073

In [10]:
query = "SELECT artist_name, song, length from artist_song_library WHERE sessionid = 338 AND iteminsession = 4"
try:
    rows=session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(artist_name='Faithless', song='Music Matters (Mark Knight Dub)', length=495.3073)


### 2nd Query: Query the data in order to retrieve the name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

### First step: Once again, we need to think about the query first. With this in mind, a second table was created, user_library, in order to retrieve the artist, song (sorted by itemInSession) and the user's first and last name (for userid=10 and sessionid=182). One final thing to keep in mind is that Apache Cassandra is a partition row store, which means the data should be Inserted and Retrieved in the order of the primary key (with in this case is a Composite Primary Key).

In [11]:
query = "CREATE TABLE IF NOT EXISTS user_library"
query = query + "(user_id int, sessionid int, iteminsession int, artist_name text,\
                  song text, first_name text, last_name text, PRIMARY KEY ((user_id, sessionid), iteminsession))"
try:
    session.execute(query)
except Exception as e:
    print(e)      

### Second step: Insert the appropriate data into the table using the event_datafile_new.csv. This is done through the use of an INSERT statement and by getting the appropriate columns from the csv file, keeping in mind that the data types of these columns and the ones created with the CREATE statement have to match (and converting when needed). Finally, it is import to note that, like with the CREATE statement, the data should be Inserted and Retrieved in the order of the primary key (with in this case is a Composite Primary Key).

In [12]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = "INSERT INTO user_library (user_id, sessionid, iteminsession, artist_name, song, first_name, last_name)"
        query = query + "VALUES(%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))
                    

### Final step: Query the data in order to retrieve the name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182. This didn't ouput anything

In [13]:
query = "SELECT artist_name, song, first_name, last_name FROM user_library WHERE user_id=10 AND sessionid = 182"
try:
    rows=session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row)

## 3rd Query: Query the data in order to retrieve the every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

### First step: As said in the above queries, it is essential to first think about the queries. With this in mind, the table 'user_details_library' was created to answer this third query. One final thing to keep in mind is that Apache Cassandra is a partition row store, which means the data should be Inserted and Retrieved in the order of the primary key (with in this case is a Composite Primary Key).

In [14]:
query = "CREATE TABLE IF NOT EXISTS user_details_library"
query = query + "(song text, user_id int, first_name text, last_name text, PRIMARY KEY (song, user_id))"
try:
    session.execute(query)
except Exception as e:
    print(e)


### Second step: Insert the appropriate data into the table using the event_datafile_new.csv. This is done through the use of an INSERT statement and by getting the appropriate columns from the csv file, keeping in mind that the data types of these columns and the ones created with the CREATE statement have to match (and converting when needed). Finally, it is import to note that, like with the CREATE statement, the data should be Inserted and Retrieved in the order of the primary key (with in this case is a Composite Primary Key).

In [15]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = "INSERT INTO user_details_library (song, user_id, first_name, last_name)"
        query = query + "VALUES(%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

### Final step: Query the data in order to retrieve the every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'. Like the previous query, this didn't ouput anything.

In [16]:
query = "SELECT first_name, last_name FROM user_details_library WHERE song = 'All Hands Against His Own'"
try:
    rows=session.execute(query)
except Exception as e:
    print(e)
    
for r in rows:
    print(r)

### Drop the tables before closing out the sessions

In [17]:
query = "drop table artist_song_library"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table user_library"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table user_details_library"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [18]:
session.shutdown()
cluster.shutdown()