# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# Select paht where the data is located by appending the current workspace (/home/workspace) to /event_data/data.

filepath = os.getcwd() + '/event_data/data/'

# With the following loop, we create a list with all the diferent absolute paths of each csv file.

for root, dirs, files in os.walk(filepath):
        file_path_list = glob.glob(os.path.join(root,'*'))

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# We read each csv file on its corresponding path in the filepath list.
# For each of them, we iterate over its rows and store them in the list full_data_rows_list

full_data_rows_list = [] 
for f in file_path_list:

    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 


# Now we create an event data csv file (event_datafile_full.csv) that will be used to insert into Cassandra tables.
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                     'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(f"The number of rows in the append of the csv files is: {sum(1 for line in f)}")


The number of rows in the append of the csv files is: 6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [5]:
# Let's make a connection to an Apache Cassandra instance in 127.0.0.1
from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [6]:
# We create the keyspace sparkifydb in Cassandra, with replication factor 1. 
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkifydb
    WITH REPLICATION={'class': 'SimpleStrategy','replication_factor': 1}"""
    )
except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
# With the next command we set the keyspace to the sparkifydb keyspace that's been just created.
try:
    session.set_keyspace('sparkifydb')
except Exception as e:
    print(e)


## Create queries to ask the following three questions of the data

 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




In [8]:
# Create the table session_history with the fields sessionId, itemInSession, artist, song, length especifying 
# sessionId and itemInSession as Primary Key (sessionId as partition key and itemInSession as clustering key).

query = "CREATE TABLE IF NOT EXISTS session_history"
query = query + "(sessionId int, \
                  itemInSession int, \
                  artist text, \
                  song text, \
                  length float, \
                  PRIMARY KEY(sessionId, itemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e)


In [9]:
# We iterate over the lines in the csv created in the first part of the Jupyter Notebook and select the corresponding
# fields with its datatype to insert into the Cassandra table session_history.

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO session_history (sessionId, itemInSession, artist, song, length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

#### Do a SELECT to verify that the data have been inserted into each table

In [10]:
# We fix the query asked in exercise 1 and create a pandas dataframe with the execution of such a query.

query = "select * from session_history WHERE sessionId=338 AND itemInSession=4"
session_history = pd.DataFrame(list(session.execute(query)))

# We select the fields that we need from the previous dataframe.

solution_1 = session_history[['artist', 'song', 'length']]
print(solution_1)

      artist                             song      length
0  Faithless  Music Matters (Mark Knight Dub)  495.307312


### Repetition of the process for other queries.

In [11]:
# Create the table song_for_user_in_session_history with the fields artist, song, sessionId, itemInSession, firstName, lastName and userId 
# specifying userId,sessionId and itemInSession as Primary Key (userId and sessionId as composite partition key) 
# and itemInSession as clustering key.

query = "CREATE TABLE IF NOT EXISTS song_for_user_in_session_history"
query = query + "(artist text, \
                  song text, \
                  sessionId text, \
                  itemInSession text, \
                  firstName text, \
                  lastName text, \
                  userId text, \
                  PRIMARY KEY((userId, sessionId), itemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e)

# We iterate over the lines in the csv created in the first part of the Jupyter Notebook and select the corresponding
# fields with its datatype to insert into the Cassandra table song_for_user_in_session_history.

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_for_user_in_session_history (artist, song, sessionId, itemInSession, firstName, lastName, userId)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (line[0], line[9], line[8], line[3], line[1], line[4], line[10]))    

# We fix the query asked in exercise 2 and create a pandas dataframe with the execution of such a query.
        
query = "select * from song_for_user_in_session_history WHERE userId='10' AND sessionId='182'"
song_for_user_in_session_history = pd.DataFrame(list(session.execute(query)))
#print(user_for_session_history)

# We select the fields that we need from the previous dataframe.
solution_2 = song_for_user_in_session_history[['artist', 'song', 'iteminsession', 'firstname', 'lastname']]
print(solution_2)

              artist                                               song  \
0   Down To The Bone                                 Keep On Keepin' On   
1       Three Drives                                        Greece 2000   
2  Sebastien Tellier                                          Kilometer   
3      Lonnie Gordon  Catch You Baby (Steve Pitron & Max Sanna Radio...   

  iteminsession firstname lastname  
0             0    Sylvie     Cruz  
1             1    Sylvie     Cruz  
2             2    Sylvie     Cruz  
3             3    Sylvie     Cruz  


In [12]:
# Create the table user_for_song_in_session_history with the fields firstName, lastName, song and userId, 
# specifying song and userId Primary Key song as partition key and userId as clustering key.

query = "CREATE TABLE IF NOT EXISTS user_for_song_in_session_history"
query = query + "(firstName text, lastName text, song text, userId text, PRIMARY KEY(song, userId))"

try:
    session.execute(query)
except Exception as e:
    print(e)

# We iterate over the lines in the csv created in the first part of the Jupyter Notebook and select the corresponding
# fields with its datatype to insert into the Cassandra table user_for_song_in_session_history.

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO user_for_song_in_session_history (firstName, lastName, song, userId)"
        query = query + " VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[1], line[4], line[9], line[10]))    

# We fix the query asked in exercise 3 and create a pandas dataframe with the execution of such a query.

query = "select * from user_for_song_in_session_history WHERE song='All Hands Against His Own'"
user_for_song_in_session_history = pd.DataFrame(list(session.execute(query)))
#print(user_for_song_in_session_history)

# We select the fields that we need from the previous dataframe.

solution_3 = user_for_song_in_session_history[['firstname', 'lastname']]
print(solution_3)
                    

    firstname lastname
0  Jacqueline    Lynch
1       Tegan   Levine
2        Sara  Johnson


### Drop the tables before closing out the sessions

In [13]:
# We create a list with the different queries to drop the three tables created and iterate over them executing the drop command.

queries = ["drop table if exists session_history",
           "drop table if exists song_for_user_in_session_history",
           "drop table if exists user_for_song_in_session_history"]

for query in queries:
    try:
        rows = session.execute(query)
    except Exception as e:
        print(e)


### Close the session and cluster connection

In [14]:
session.shutdown()
cluster.shutdown()