# Data Engineering Nanodegree

# Project 02: Data Modeling with Apache Cassandra

Susanne Wuerl, August 2022

-- 3rd submission --

## Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [22]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [23]:
# checking current working directory
print(os.getcwd())

# Get current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

i=0
# Loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    i=i+1
print(i)

/Users/quirly/Desktop/Udacity_DataEngineering/Project-2-Data-Modeling-Cassandra
1


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [24]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list read file
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [25]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print("Number of rows: " + str(sum(1 for line in f)))

Number of rows: 6821


## Part II. Apache Cassandra Coding

The CSV file EVENTS contains the following columns:

- artist = column 0
- firstName of user = column 1
- gender of user = column 2
- item number in session = column 3
- last name of user = column 4
- length of the song = column 5
- level (paid or free song) = column 6
- location of the user= column 7
- sessionId = column 8
- song title = column 9
- userId = column 10

#### Creating a Cluster

First, an instance of Cassandra has to be created that will make a connection to the local machine. Please ensure Cassandra is running on the local machine before trying to create the instance. Right after instantiating a session has to be set up.

In [26]:
from cassandra.cluster import Cluster
cluster = Cluster()

session = cluster.connect()

#### Create Keyspace

In a Cassandra cluster, a keyspace is an outermost object that determines how data replicates on nodes. 

A keyspace is the top-level database object that controls the replication for the object it contains at each datacenter in the cluster. Keyspaces consist of core objects called column families (can be seen like tables in RDBMS), indexed rows, data types, data center awareness, replication factor, and keyspace strategy. Here *Simple Strategy* is chosen and as we will use only one node, our local machine, replication factor is set to 1. *Simple Strategy* is recommended for one or multiple nodes over one or multiple racks in a single data center.

In [27]:
try:
    session.execute("""
    CREATE KEYSPACE SWP2
    WITH REPLICATION =
    {'class':'SimpleStrategy', 'replication_factor':1}
    """)
except Exception as e:
    print(e)

Keyspace 'swp2' already exists


#### Set Keyspace

The keyspace 'SWP2' has now to be connected to the session created above.

In [28]:
try:
    session.set_keyspace('swp2')
except Exception as e:
    print(e)

## Queries for Three Analytical Questions

## Query 1

### (1) Artist, Song and Song Length of a Specific Item in a Session Shall be Found

We will use **session 338** and **item 4** for demo purposes.

**(1.1)** A table is created which contains the attributes to be queried (here session and item id) and the attributes that are to be returned as results. These results are dependent on the analytical question.

Expected results: 
    
* Artist, 
* song, 
*length of song

In [61]:
query="CREATE TABLE IF NOT EXISTS table_query0_select_songs_by_session_and_itemid"
query=query+"(artist text, song text, ItemInSession int, sessionId int, length float, PRIMARY KEY(sessionId,ItemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e)

#### (1.2) Insert Results into our Table Created

In a file all events recorded are stored in csv format. This file will be queried and the results matching our request will be returned. They will be
inserted in our created table. Which column in our CSV file (data source) shall be mapped to which column in our created table is defined by the index of the column. This means as ARTIST is our first column we use line[0].

In [62]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    
    for line in csvreader:
        query = "INSERT INTO table_query0_select_songs_by_session_and_itemid (artist,song,ItemInSession,sessionId,length)"
        query = query + "VALUES (%s,%s,%s,%s,%s)"
        # Assign which column element in csv file should be assigned for each column in the INSERT statement
        session.execute(query, (line[0], line[9],int(line[3]),int(line[8]),float(line[5])))

#### (1.3) Validation Check that Data Was Inserted

The table is queried and the results returned are shown as Pandas DataFrame.

In [65]:
query="SELECT sessionId, ItemInSession, artist,song,length FROM table_query0_select_songs_by_session_and_itemid WHERE sessionId=338 AND ItemInSession=4"

try:
    result=session.execute(query)
except Exception as e:
    print(e)
    
df=pd.DataFrame(result.all()) 
display(df)

Unnamed: 0,sessionid,iteminsession,artist,song,length
0,338,4,Faithless,Music Matters (Mark Knight Dub),495.307312


## Query 2

### (2) Artist, Song and User Name of a Specific UserID in a Specific Session shall be found

We will use **session 182** and **UserID 10** for demo purposes.

**(2.1)** A table is created which contains the attributes to be queried (here session and user id) and the attributes that are to be returned as results. These results are dependent on the analytical question.

Expected results: 
    
* Artist, 
* song, 
* user name

*UserId* and *SessionId* can be used as composite partition key because they are unique in combination. As we want to sort our results bei items in session,
we need to add *ItemsInSession* as clustering column to be able to sort.

In [66]:
query="CREATE TABLE IF NOT EXISTS table_query1_select_user_by_userid_session"
query=query+"(artist text, song text, sessionId int, userId int, userFirst text, userLast text, ItemInSession int, PRIMARY KEY((userId,sessionId),ItemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e)                

#### (2.2) Insert Results into our Table Created

In [67]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO table_query1_select_user_by_userid_session (artist,song,sessionId,userId,userFirst,userLast,ItemInSession)"
        query = query + "VALUES (%s,%s,%s,%s,%s,%s,%s)"
        # Assign which column element in csv file should be assigned for each column in the INSERT statement
        session.execute(query, (line[0], line[9],int(line[8]),int(line[10]),line[1],line[4],int(line[3])))

#### (2.3) Validation Check that Data Was Inserted

The songs a user (defined by his or her User ID) has listened to in a specific session are shown as Pandas DataFrame.

In [68]:
query="SELECT userId,sessionId,ItemInSession,song,userFirst,userLast FROM table_query1_select_user_by_userid_session WHERE sessionId=182 AND userId=10 ORDER BY ItemInSession"

try:
    result=session.execute(query)
except Exception as e:
    print(e)

df=pd.DataFrame(result.all()) 
display(df)

Unnamed: 0,userid,sessionid,iteminsession,song,userfirst,userlast
0,10,182,0,Keep On Keepin' On,Sylvie,Cruz
1,10,182,1,Greece 2000,Sylvie,Cruz
2,10,182,2,Kilometer,Sylvie,Cruz
3,10,182,3,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


## Query 3

### (3) A list of users that have been listening to a specific song shall be returned

We will use song **'All Hands Against His Own'** for demo purposes.

**(3.1)** A table is created which contains the attributes to be queried (here song) and the attributes that are to be returned as results. These results are dependent on the analytical question.

Expected results: 
    
* User First Name
* User Last Name

UserId and SessionId can be used as composite partition key because they are unique in combination. As we want to sort our results bei items in session, we need to add ItemsInSession as clustering column to be able to sort.

In [79]:
query="CREATE TABLE IF NOT EXISTS table_query2_select_user_by_song"
query=query+"(song text, ItemInSession int, sessionId int, userId int, userFirst text, userLast text, PRIMARY KEY(song,userId))"
try:
    session.execute(query)
except Exception as e:
    print(e)  

#### (3.2) Insert Results into our Table Created

In [80]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO table_query2_select_user_by_song (song,ItemInSession,sessionId,userId,userFirst,userLast)"
        query = query + "VALUES (%s,%s,%s,%s,%s,%s)"
        # Assign which column element in csv file should be assigned for each column in the INSERT statement
        session.execute(query, (line[9],int(line[3]),int(line[8]),int(line[10]),line[1],line[4]))

#### (3.3) Validation Check that Data Was Inserted

All users that have listened to a specific song are queried and shown.

Our Partition Key is a composite key (song, sessionId). However, we just want to query one of both which is *song*. Thus, ALLOW FILTERING is not needed and we prevent any data loss as on insertion that song in combination with userId is unique, as we just want all the users that have at least listened once to the song in the query. 

However, if we are interested also in the frequency, e.g. how many times a user listened to that song, then this format design would not be appropriate. We would then need to add *sessionId and ItemInSession* in our PRIMARY KEY. Otherwise, only the last listening event of a user would be stored/returned (as every listening event to the same song by this user before would be overwritten). We chose to omit *sessionId, ItemInSession* here as we are just interested in the users who have listened to a specific song and do not care about the frequency.

In [84]:
query="SELECT song, userId, userFirst,userLast,sessionId FROM table_query2_select_user_by_song WHERE song='All Hands Against His Own'"

try:
    result=session.execute(query)
except Exception as e:
    print(e)

df=pd.DataFrame(result.all()) 
display(df)

Unnamed: 0,song,userid,userfirst,userlast,sessionid
0,All Hands Against His Own,29,Jacqueline,Lynch,559
1,All Hands Against His Own,80,Tegan,Levine,611
2,All Hands Against His Own,95,Sara,Johnson,152


### Drop the tables before closing out the sessions

In [85]:
query="DROP TABLE IF EXISTS table_query0_select_songs_by_session_and_itemid"

try:
    session.execute(query)
except Exception as e:
    print(e)

query="DROP TABLE IF EXISTS table_query1_select_user_by_userid_session"

try:
    session.execute(query)
except Exception as e:
    print(e)

query="DROP TABLE IF EXISTS table_query2_select_user_by_song"

try:
    session.execute(query)
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [None]:
session.shutdown()
cluster.shutdown()