# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
# extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
-  0 : artist 
-  1 : firstName of user
-  2 : gender of user
-  3 : item number in session
-  4 : last name of user
-  5 : length of the song
-  6 : level (paid or free song)
-  7 : location of the user
-  8 : sessionId
-  9 : song title
- 10 : userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [5]:
# Let us make a connection to a Cassandra instance the local machine: (127.0.0.1)

from cassandra.cluster import Cluster
try: 
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [6]:
# Here we will create a KEYSPACE : First we will check 'IF NOT EXISTS' an then create a KeySpace as 'udacity'; to check if a keyspace with the same name already exists
# We are using REPLICATION class as 'SimpleStrategy' and 'replication_factor' as 1 which creates as keyspace for a single node evaluation cluster

try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }""")

except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
# We will Set KEYSPACE as 'udacity' which was created above using 'set_keyspace()'

try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




------------

###  __Question-1 :__ Give me the artist, song title and song's length in music app history that was heard during sessionId=338,  itemInSession=4

In [8]:
# Our objective is to find artist, song title and song's length based on sessionId and itemInSession; First we need to think create a table based on the query we are going to use.
# Before creating, DROP the Table if it exists so that we can quickly regenerate the table if needed by executing this cell.
query = "DROP TABLE IF EXISTS song_session "
try:
    session.execute(query)
except Exception as e:
    print(e)

# Let's Create a Table `music_library` with PRIMARY KEY sessionId and itemInSession(as we need to query based on sessionId and itemInSession ) 
# And artist name, song title and song's length which we need to find. We will set the datatypes as per the event_datafile_new.csv, ie: sessionId as int
query = "CREATE TABLE IF NOT EXISTS song_session "
query = query + "(sessionId int, itemInSession int, artist_name text, song_title text, song_length float, PRIMARY KEY (sessionId, itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)        

In [9]:
# Here we will load the csv file and insert sessionId ,itemInSession ,artist_name ,song_title ,song_length into music_library by iterating through each row
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # this skips the header
    for line in csvreader:
        query = "INSERT INTO song_session (sessionId , itemInSession , artist_name , song_title , song_length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)" # This is the INSERT query to insert values into TABLE : music_library
        ## Now will pass the values to the INSERT statement for each row; we will select values as line[index] and set values based on dtype.eg: for int int(line[index])
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5]))) #This will execute the query above and INSERT the values in the music_library Table

#### Here we will do a SELECT to verify that the data have been inserted into each table

In [10]:
## Verify if Records were added to music_library using a SELECT statement : we will execute the query as asked in Q1

query = "SELECT artist_name , song_title , song_length FROM song_session WHERE sessionId = 338 and itemInSession = 4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print("Artist name: {}, Song name: {}, Song length: {}".format(row.artist_name, row.song_title, row.song_length))

Artist name: Faithless, Song name: Music Matters (Mark Knight Dub), Song length: 495.30731201171875


------

__Answer 1:__


The Question 1 expects Name of the artist, title of the song and length of the track based on sessionId and itemInSession.
As we are working with a NoSQL database, we need to think about the query first which will be used to fetch the data based on which we will create the Table required.

1. The expected output is : "Name of the artist, title of the song and length of the track"
2. Based on : "sessionId and itemInSession"

From the above two points we know the query to get the data will be a SELECT statement like :

`SELECT Name of the artist, title of the song, length of the track FROM TABLE_NAME WHERE sessionId = value AND itemInSession = value`

As we know the `SELECT` query, we can move to `CREATE` table query. We will add `NOT EXIST` to the CREATE statement to check if the table exists and only create the table if it does not exist. Now we need to select the columns that are going to be in the table and the PRIMARY KEY.(Named: song_session as per Rubric requirements for tables names as alphanumeric; also as the details of the table is for songs, it sounds apt to name the table 'song_session') 

* Column Names: We need Name of the artist, title of the song and length of the track on query upon sessionId and itemInSession. Hence we will select artist_name , song_title , song_length, sessionId and itemInSession as the name of the columns.
* Primary Key: The PRIMARY key for the table should uniquely identify each row in the table. For us we need results based on sessionId and itemInSession; so we neeed these both as the primary key (*Selecting one will throw filtering error on "SELECT * FROM song_session WHERE sessionId = 338 and itemInSession = 4", as we have not set itemInSession in primary key; also filtering is not allowed for the project)*

---

Now, as we have the columns we want and the Primary Key, we can go ahead with creating the table with CREATE statement as:


```
CREATE TABLE IF NOT EXISTS song_session (sessionId int,
                                            itemInSession int, 
                                            artist_name text, 
                                            song_title text, 
                                            song_length float, 
                                            PRIMARY KEY (sessionId, itemInSession))
```

---

   Important things to notice here are the data types for each attribute. From the above `event_datafile_new.csv` we can see the various datatypes. Also we can find a particular data type as: `df = pd.read_csv('event_datafile_new.csv'); df.dtypes` which will give data types for all the columns based on which we will set the dtype for each of our attributes in the CREATE statement. Another crucial point here is that Apache Cassandra is a partition row store, which means the data should be Inserted and Retrieved in the order of the primary key (in our case the Composite Primary Key). Our CREATE statement and also the INSERT statement has been made with keeping this in mind.

---

   Once the table is created, the next step is inserting the data into the table from the csv file: Our INSERT statement will iterate through each row of the CSV file(*line*) and Insert the data from the appropriate columns to our table columns. For eg: For the `sessionId` the CSV file has the column at index 8, so for the `song_session's sessionId` we will take the value from: current row which is `line` and its 9th column which is `line[8]` : `int(line[8]`. The int here is so that data type matches our table column data-type. Similarly we have used float dtype for song_length which is a float value.

---

   Once the data has been inserted, we need to verify the insertion by retriving : a `SELECT` statement. We are using our Question's selection statemnet based on which we created this table:  `"SELECT artist_name , song_title , song_length FROM song_session WHERE sessionId = 338 and itemInSession = 4"`

The output is a single record : `Faithless Music Matters (Mark Knight Dub) 495.30731201171875` : This means our operation was successful.

---

__Summary:__ Here we can see that the Artist's name is `Faithless`, Song's name is `Music Matters (Mark Knight Dub)` and the song's Length is `495.30731201171875`

---


### __Question-2:__ Give me only the following: name of artist, song(sorted by itemInSession), user(first and last name) for userid=10, sessionid=182                

In [11]:
query = "DROP TABLE IF EXISTS song_playlist_session " #To DROP TABLE IF EXISTS:
try:
    session.execute(query)
except Exception as e:
    print(e)  
    
query = "CREATE TABLE IF NOT EXISTS song_playlist_session "
query = query+"(userid int, sessionid int, item_in_session int, artist_name text,  song_title text, user_firstName text, user_lastName text, PRIMARY KEY ((userid, sessionid), item_in_session))"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [12]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_playlist_session (userid , sessionid, item_in_session, artist_name,  song_title, user_firstName , user_lastName)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        ## For e.g., to INSERT artist_name and user first_name, you would change the code below to `line[0], line[1]`
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

In [13]:
query = "select * from song_playlist_session WHERE userid = 10 and sessionid = 182 "
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print("Artist Name: {}, Song Name: {}, User Name: {} {}, itemInSession: {}".format(row.artist_name, row.song_title, row.user_firstname, row.user_lastname, row.item_in_session))

Artist Name: Down To The Bone, Song Name: Keep On Keepin' On, User Name: Sylvie Cruz, itemInSession: 0
Artist Name: Three Drives, Song Name: Greece 2000, User Name: Sylvie Cruz, itemInSession: 1
Artist Name: Sebastien Tellier, Song Name: Kilometer, User Name: Sylvie Cruz, itemInSession: 2
Artist Name: Lonnie Gordon, Song Name: Catch You Baby (Steve Pitron & Max Sanna Radio Edit), User Name: Sylvie Cruz, itemInSession: 3


------------


__Answer 2:__
    
The question here is similar to the Q1 with a few tweaks: We need name of artist, song name, user(first and last name) for userid=10, sessionid=182. However, we need to sort the song name by item_in_session. This means we need a COMPOSITE PRIMARY KEY with `item_in_session` as the CLUSTERING KEY. We will think about the query on which we need to create the table. The query will be something like: "SELECT artist_name, song_title, user_firstname, user_lastname FROM TABLE WHERE userid=10 and sessionid=182"

Now lets create a tabel for this: userid, sessionid, artist_name, song_title, user_firstname, user_lastname will be columns with (userid, sessionid) as a composite PRIMARY KEY where `userid` is the PARTITION KEY and `sessionid` is the CLUSTERING KEY. Here `(userid, sessionid)` will be our `PARTITION KEY`. Data types will be appropriately assigned to each column names as explained in Question 1.

The INSERT query is build similar to as in Q1 with keeping in mind the sequence order for Data Partition.

The data is fetched from the CSV file same as explained above keeping in mind the data types for each attribute.

The SELECT query fetches the values asked in Question 2 while sorting is done on `item_in_session`:



```
<ARTIST>                    <SONG>                                         <First NAME>   <LAST NAME>    <itemInSession>
Down To The Bone       Keep On Keepin' On                                    Sylvie          Cruz              0
Three Drives           Greece 2000                                           Sylvie          Cruz              1
Sebastien Tellier      Kilometer                                             Sylvie          Cruz              2
Lonnie Gordon          Catch You Baby(Steve Pitron & Max Sanna Radio Edit)  Sylvie           Cruz              3
```

__From above, we see that the user(10) had listened to four songs during the session 182.__

----------
    

### __Question-3:__ Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [14]:
query = "DROP TABLE IF EXISTS user_session" #DROP TABLE IF EXISTS:
try:
    session.execute(query)
except Exception as e:
    print(e)  
    
query = "CREATE TABLE IF NOT EXISTS user_session "
query = query + "(song_title text, userid int, user_firstName text, user_lastName text, PRIMARY KEY (song_title, userid))"
try:
    session.execute(query)
except Exception as e:
    print(e)                    

In [15]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO user_session (song_title, userid , user_firstName , user_lastName)"
        query = query + " VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

In [16]:
query = "select user_firstname, user_lastname from user_session WHERE song_title = 'All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print ("User Name: {} {}".format(row.user_firstname, row.user_lastname))

User Name: Jacqueline Lynch
User Name: Tegan Levine
User Name: Sara Johnson


-------

__Answer:__

Here we need first and last name of every user who listened to a aprticular song, so our query will be "select user_firstname, user_lastname from TABLE WHERE song_title = abc".
So we can create a TABLE with song_title, user_firstname, user_lastname and PRIMARY KEY as "(song_title, user_firstname)". However, the problem with this is First Names are common and there might be other people with same first name, same goes for the last name. Hence, it is better if we consider a different value which is unique: I have selected userid which seems to be unique. Thus we have a table with COMPOSITE PRIMARY KEY (song_title, userid).

We will INSERT the data same way as the previous questions. The Retrival query will select all users who hve listened to the song 'All Hands Against His Own' which was the objective of the question.

From the output we can see that there are three users who listen to the song "All Hands Against His Own", and they are:

```
Jacqueline   Lynch
Tegan        Levine
Sara         Johnson
```

----------


### Drop the tables before closing out the sessions

In [17]:
## Now we will drop all three tables with DROP TABLE statements(We will use IF EXIST condition).

query = "DROP TABLE IF EXISTS song_session" #DROP TABLE: song_session IF EXISTS:
try:
    session.execute(query)
except Exception as e:
    print(e)

query = "DROP TABLE IF EXISTS song_playlist_session" #DROP TABLE: song_playlist_session IF EXISTS:
try:
    session.execute(query)
except Exception as e:
    print(e)  

query = "DROP TABLE IF EXISTS user_session" #DROP TABLE: user_session IF EXISTS:
try:
    session.execute(query)
except Exception as e:
    print(e)  

### Close the session and cluster connection¶

In [18]:
session.shutdown() # Closes the session
cluster.shutdown() # # Closes the cluster connection