# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [128]:
# Import relevant Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
import collections

#### Creating list of filepaths to process original event csv data files

In [129]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    print(root)
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
#     print(file_path_list[0:3])

/home/guillaume/documents/data/udacity/data_engineering/data_modelisation/projects/data_modeling_cassandra
/home/guillaume/documents/data/udacity/data_engineering/data_modelisation/projects/data_modeling_cassandra/event_data


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [130]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [131]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Connect to the local Apache Cassandra database

In [132]:
#The following will setup and run a local Apache Cassandra database inside a Docker container
#We automatize this process using docker-compose (you need it installed on your system for this to work)
!docker-compose up -d
!docker ps

Creating network "data_modeling_cassandra_cass-local-network" with driver "bridge"
Creating cass-nodeA ... 
[1Bting cass-nodeA ... [32mdone[0mCONTAINER ID   IMAGE       COMMAND                  CREATED        STATUS                  PORTS                                                                                                                NAMES
f674ca9ad583   cassandra   "docker-entrypoint.s…"   1 second ago   Up Less than a second   7001/tcp, 0.0.0.0:7000->7000/tcp, :::7000->7000/tcp, 7199/tcp, 0.0.0.0:9042->9042/tcp, :::9042->9042/tcp, 9160/tcp   cass-nodeA


#### Creating a Cluster

In [133]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster

try: 
    cluster = Cluster(['127.0.0.1']) # Local instance of Apache Cassandra, created with docker
    session = cluster.connect() # establishing connection, setting up session and begin executing queries
except Exception as e:
    print(e)

#### Create Keyspace

In [134]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS cassandra_project
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [135]:
try:
    session.set_keyspace('cassandra_project')
except Exception as e:
    print(e)

## Questions the analytics teams wants answered

**Question 1 :** Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


**Question 2 :** Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

**Question 3 :** Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




## Question 1

The expected results are name of the artist, title of the song and length of the track based on sessionId and itemInSession.

Since we  want to retrieve data, we're going to design a **SELECT** statement. We can translate the question into the following query :

```sql
SELECT name of the artist, title of the song, length of the song
FROM TABLE_NAME
WHERE sessionId = 338 AND itemInSession = 4
```

As we know the SELECT query and since we are working with a NoSQL database, we will now CREATE our table to effectively answer the query. We will add NOT EXISTS to the CREATE statement to check if the table exists and only create the table if it does not exist.

- TABLE NAME: We will name this table **song_session** as per Rubric requirements for tables names as alphanumeric; also as the details of the table is for songs, the name seems relevant.
- COLUMNS : We need the name of the artist, the title of the song and the length of the track. Hence we will select and name our columns **artist_name, song_title, song_length, session_id, item_in_session**
- PRIMARY KEY: The primary key for the table should uniquely identify each row. We need results based on session_id and item_in_session; so we need these both as the primary key. Selecting only one will throw a filtering error as we have not set item_in_session in the primary key, also ALLOW_FILTERING is not an Apache Cassandra good practice and thus not allowed for the project)*

In [136]:
#Creating the table with appropriate columns to answer the query

query = """
CREATE TABLE IF NOT EXISTS song_session (
session_id INT,
item_in_session INT,
artist_name TEXT,
song_title TEXT,
song_length FLOAT, 
PRIMARY KEY (session_id, item_in_session)
)
"""

try:
    session.execute(drop_query)
    session.execute(query)
except Exception as e:
    print(e)        

In [137]:
#Inserting data into the newly created table

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    
    csvreader = csv.reader(f)
    next(csvreader) # this skips the header
    
    for line in csvreader:
        query = """
        INSERT INTO song_session (
        session_id,
        item_in_session,
        artist_name,
        song_title,
        song_length
        )
        """
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        
        try:
            session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))
        except Exception as e:
            print(e)

**Question 1:** Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

translates into

**Query 1:**
```sql
SELECT artist_name, song_title, song_length
FROM song_session
WHERE session_id = 338 AND item_in_session = 4
```

In [138]:
#Running this cell will execute the SELECT query and print the data retrieved from the Cassandra DB

query = """
SELECT artist_name, song_title, song_length
FROM song_session
WHERE session_id = 338 AND item_in_session = 4
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)


q1_result = {}
    
for row in rows:
#uncomment line below to check raw results
#     print (row.artist_name, row.song_title, row.song_length)
    
    #creating lists or appending to them if they already exists
    if 'artist_name' in q1_result:
        q1_result['artist_name'].append(row.artist_name)
    else:
        q1_result['artist_name'] = [row.artist_name]
        
    if 'song_title'in q1_result:
        q1_result['song_title'].append(row.song_title)
    else:
        q1_result['song_title'] = [row.song_title]
        
    if 'song_length' in q2_result:
        q2_result['song_length'].append(row.song_length)
    else:
        q2_result['song_length'] = [row.song_length]

#Mounting the dict into a dataframe
df = pd.DataFrame.from_dict(q1_result)

#Printing results
df.style.set_table_attributes('style="font-size: 15px"').hide_index()

artist_name,song_title
Faithless,Music Matters (Mark Knight Dub)


## Question 2

The expected results are name of the artist, song (sorted by itemInSession) and user (first and last name) based on userId and sessionId.

Since we  want to retrieve data, we're going to design a **SELECT** statement. We can translate the question into the following query :

```sql
SELECT name of the artist, title of the song, user firstname, user lastname
FROM TABLE_NAME
WHERE userId = 10 AND sessionId = 182
```

As we know the SELECT query and since we are working with a NoSQL database, we will now CREATE our table to effectively answer the query. We will add NOT EXISTS to the CREATE statement to check if the table exists and only create the table if it does not exist.

- TABLE NAME: We will name this table **song_playlist_session** as per Rubric requirements for tables names as alphanumeric; also as the details of the table is for songs, the name seems relevant.
- COLUMNS: We need the name of the artist, the title of the song, the user's firstname and lastname. Hence we will select and name our columns **artist_name, song_title, user_firstname, user_lastname, user_id, session_id, item_in_session**
- PRIMARY KEY: The primary key for the table should uniquely identify each row. We need results based on **user_id** and **session_id** ; so we need these both as the primary key. We use both **user_id** and **session_id** as partition keys so sessions from the same user are stored together, and we're using **item_in_session** as a clustering column.

In [139]:
#Creating the table with appropriate columns to answer the query

query = """
CREATE TABLE IF NOT EXISTS song_playlist_session (
user_id INT,
session_id INT,
item_in_session INT,
artist_name TEXT,
song_title TEXT,
user_firstname TEXT,
user_lastname TEXT,
PRIMARY KEY ((user_id, session_id), item_in_session)
)
"""

try:
    session.execute(drop_query)
    session.execute(query)
except Exception as e:
    print(e)           

In [140]:
#Inserting data into the newly created table

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    
    csvreader = csv.reader(f)
    next(csvreader) # this skips the header
    
    for line in csvreader:
        query = """
        INSERT INTO song_playlist_session (
        user_id,
        session_id,
        item_in_session,
        artist_name,
        song_title,
        user_firstname,
        user_lastname
        )
        """
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        
        try:
            session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))
        except Exception as e:
            print(e)

**Question 2:** Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

translates into

**Query 2:**
```sql
SELECT artist_name, song_title, user_firstname, user_lastname
FROM song_playlist_session
WHERE user_id = 10 AND session_id = 182
```

In [141]:
#Running this cell will execute the SELECT query and print the data retrieved from the Cassandra DB

query = """
SELECT artist_name, song_title, user_firstname, user_lastname
FROM song_playlist_session
WHERE user_id = 10 AND session_id = 182
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

#initializing empty dict to store query results    
q2_result = {}

#Itering through the results and loading them into the dictionnary
for row in rows:
#uncomment line below to check raw results
#     print (row.artist_name, row.song_title, row.user_firstname, row.user_lastname)

    #creating lists or appending to them if they already exists
    if 'artist_name' in q2_result:
        q2_result['artist_name'].append(row.artist_name)
    else:
        q2_result['artist_name'] = [row.artist_name]
        
    if 'song_title'in q2_result:
        q2_result['song_title'].append(row.song_title)
    else:
        q2_result['song_title'] = [row.song_title]
        
    if 'user_firstname' in q2_result:
        q2_result['user_firstname'].append(row.user_firstname)
    else:
        q2_result['user_firstname'] = [row.user_firstname]
        
    if 'user_lastname' in q2_result:
        q2_result['user_lastname'].append(row.user_lastname)
    else:
        q2_result['user_lastname'] = [row.user_lastname]
    
#Mounting the dict into a dataframe
df_2 = pd.DataFrame.from_dict(q2_result)

#Printing results
df_2.style.set_table_attributes('style="font-size: 15px"').hide_index()

artist_name,song_title,user_firstname,user_lastname
Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
Three Drives,Greece 2000,Sylvie,Cruz
Sebastien Tellier,Kilometer,Sylvie,Cruz
Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio Edit),Sylvie,Cruz


## Question 3

The expected results are user's firstname and lastname based on the song they listened to.

Since we  want to retrieve data, we're going to design a **SELECT** statement. We can translate the question into the following query :

```sql
SELECT *
FROM user_song
WHERE song_title = 'All Hands Against His Own'
```

As we know the SELECT query and since we are working with a NoSQL database, we will now CREATE our table to effectively answer the query. We will add NOT EXISTS to the CREATE statement to check if the table exists and only create the table if it does not exist.

- TABLE NAME: We will name this table **user_song_session** as per Rubric requirements for tables names as alphanumeric; also as the details of the table is for songs, the name seems relevant.
- COLUMNS: We need the name of the artist, the title of the song, the user's firstname and lastname. Hence we will select and name our columns **user_firstname, user_lastname, song_title, user_id**
- PRIMARY KEY: The primary key for the table should uniquely identify each row. We need results based on **song_title** and **user_id** so we need these both as the primary key.

In [142]:
#Creating the table with appropriate columns to answer the query

query = """
CREATE TABLE IF NOT EXISTS user_song_session (
song_title TEXT,
user_id INT,
user_firstname TEXT,
user_lastname TEXT,
PRIMARY KEY (song_title, user_id)
)
"""
try:
    session.execute(drop_query)
    session.execute(query)
except Exception as e:
    print(e) 
                    

In [143]:
#Inserting data into the newly created table

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    
    csvreader = csv.reader(f)
    next(csvreader) # this skips the header
    
    for line in csvreader:
        query = """
        INSERT INTO user_song_session (
        song_title,
        user_id,
        user_firstname,
        user_lastname
        )
        """
        query = query + " VALUES (%s, %s, %s, %s)"
        
        try:
            session.execute(query, (line[9], int(line[10]), line[1], line[4]))
        except Exception as e:
            print(e)

**Question 3:** Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

translates into

**Query 3:**
```sql
SELECT *
FROM user_song_session
WHERE song_title = 'All Hands Against His Own'
```

In [144]:
#Running this cell will execute the SELECT query and print the data retrieved from the Cassandra DB

query = """
SELECT *
FROM user_song_session
WHERE song_title = 'All Hands Against His Own'
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

#initializing empty dict to store query results      
q3_result = {}
    
for row in rows:
#uncomment line below to check raw results
#     print (row.user_firstname, row.user_lastname)
    
    #creating lists or appending to them if they already exists
    if 'user_firstname' in q3_result:
        q3_result['user_firstname'].append(row.user_firstname)
    else:
        q3_result['user_firstname'] = [row.user_firstname]
        
    if 'user_lastname' in q3_result:
        q3_result['user_lastname'].append(row.user_lastname)
    else:
        q3_result['user_lastname'] = [row.user_lastname]

#Mounting the dict into a dataframe
df_3 = pd.DataFrame.from_dict(q3_result)

#Printing results
df_3.style.set_table_attributes('style="font-size: 15px"').hide_index()

user_firstname,user_lastname
Jacqueline,Lynch
Tegan,Levine
Sara,Johnson


### Dropping the tables before closing out the sessions

In [145]:
## Dropping the tables before closing out the sessions

drop_song_session = "DROP TABLE IF EXISTS song_session"
drop_song_playlist_session = "DROP TABLE IF EXISTS song_playlist_session"
drop_user_song_session = "DROP TABLE IF EXISTS user_song_session"

try:
    session.execute(drop_song_session)
    session.execute(drop_song_playlist_session)
    session.execute(drop_user_song_session)
except Exception as e:
    print(e)

### Closing the session and cluster connection¶

In [146]:
session.shutdown()
cluster.shutdown()

### Cleaning step

The following cell will stop and remove the Docker container created for this project.

In [147]:
#Check docker container
!docker ps

#stops the container
!docker stop cass-nodeA

#removes the container
!docker rm cass-nodeA

#removes the network created for the DB
!docker network rm data_modeling_cassandra_cass-local-network

#Checks container and network have been successfully removed
!docker ps --all --quiet
!docker network ls

f674ca9ad583
cass-nodeA
cass-nodeA
data_modeling_cassandra_cass-local-network
NETWORK ID     NAME      DRIVER    SCOPE
5370b6b4be51   bridge    bridge    local
9ce1d7672488   host      host      local
2f97e5dccf82   none      null      local
