# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
import csv
import os
import glob
import pandas as pd

## Create file paths that contain the data for events

In [2]:
#root or workspace directory
root_dir = os.getcwd()

#directory for events data 
events_dir = root_dir + '/event_data'

#get all file paths in a list that contain events data in csv by looping over directory path
for root, dirs, files in os.walk(events_dir):
    #using glob to join all the file paths in a list
    files = glob.glob(os.path.join(root,'*.csv'))

## Process the files and collect all the rows

In [3]:
#initialise an empty list to hold each row
data_rows = []
for file in files:
    with open(file, 'r', newline='', encoding='utf-8') as csvfile:
        csvreader = csv.reader(csvfile)
        
        #skip the header
        next(csvreader) 
        
        #add rows to the data_rows list
        for row in csvreader:
            data_rows.append(row)

## Creating event data csv file called event_datafile_full.csv that will be used to insert data into the Apache Cassandra tables

In [4]:
#Creating a dialect formatter that will quote all the fields and ignore if any column has white space
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)            

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    csvwriter = csv.writer(f,dialect='myDialect')
    
    #writing the header row
    csvwriter.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    
    
    for row in data_rows:
        
        #filter any rows that has empty artist name
        if (row[0] == ''):
            continue
            
        csvwriter.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

In [5]:
#checking length of the data or number of rows in the file created
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    data_length = sum([1 for line in f]) - 1
    print("Length of the file excluding header rows: {}".format(data_length))

Length of the file excluding header rows: 6820


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [6]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [7]:
# Creating a Keyspace music for the project with single node
try:
    session.execute(" CREATE KEYSPACE IF NOT EXISTS music \
                    WITH REPLICATION = {'class':'SimpleStrategy','replication_factor': 1}\
                    ")
except Exception as e:
    print(e)

#### Set Keyspace

In [8]:
# Setting a Keyspace to music to only operate and associate queries within that keyspace
try:
    session.set_keyspace("music")
except Exception as e:
    print(e)

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [9]:
#Creating table music_sessions to answer question 1

create_table_query = "CREATE TABLE IF NOT EXISTS %s "
table_one_schema = "(sessionId int, itemInSession int, artist text, song text, song_length double, PRIMARY KEY (sessionId, itemInSession))"
full_query = (create_table_query + table_one_schema) % ("music_sessions")
try:
    session.execute(full_query)
except Exception as e:
    print(e)

In [10]:
#Creating table user_sessions to answer question 2

table_two_schema = "(userId int, sessionId int, itemInSession int, artist text, song text, firstName text,\
                    lastName text, PRIMARY KEY (userId, sessionId,itemInSession))"
full_query = (create_table_query + table_two_schema) % ("user_sessions")
try:
    session.execute(full_query)
except Exception as e:
    print(e)

In [11]:
#Creating table song_users to answer question 3

table_three_schema = "( song text, sessionId int, itemInSession int, firstName text, lastName text, PRIMARY KEY (song, sessionId, itemInSession))"
full_query = (create_table_query + table_three_schema) % ("song_users")
try:
    session.execute(full_query)
except Exception as e:
    print(e)

#### Inserting data into three tables created

In [12]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
    
    
    ## Assigning the INSERT statements into the `query` variable to add data to music_sessions table
        query = "INSERT INTO music_sessions (sessionId, itemInSession, artist, song, song_length) "
        query = query + "VALUES (%s, %s, %s, %s, %s)"
    
        session.execute(query, (int(line[8]), int(line[3]), line[0],line[9],float(line[5])))
        ## Assigning the INSERT statements into the `query` variable to add data to user_sessions table
        query = "INSERT INTO user_sessions (userId, sessionId, itemInSession, artist, song, firstName, lastName) "
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
    
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0],line[9],line[1],line[4]))
        
        ## Assigning the INSERT statements into the `query` variable to add data to song_users table
        query = "INSERT INTO song_users ( song,sessionId, itemInSession, firstName, lastName) "
        query = query + "VALUES (%s, %s, %s, %s, %s)"
    
        session.execute(query, (line[9], int(line[8]), int(line[3]), line[1], line[4]))

#### Validating schema definitions against number of rows created

In [13]:
#checking if correct primary key is created and values are not over written
#if value is 6820 we have unique rows

unique_rows = data_length
counts = session.execute('select count(*) from music_sessions')
for cnt in counts:
    if cnt.count == unique_rows:
        print("Unique values in music_sessions table and primary key is correct")
    else:
        print("Revist primary key and do further analysis before assigning primary key")

        
counts = session.execute('select count(*) from user_sessions')
for cnt in counts:
    if cnt.count == unique_rows:
        print("Unique values in user_sessions table and primary key is correct")
    else:
        print("Revist primary key and do further analysis before assigning primary key")

        
counts = session.execute('select count(*) from song_users')
for cnt in counts:
    if cnt.count == unique_rows:
        print("Unique values in song_users table and primary key is correct")
    else:
        print("Revist primary key and do further analysis before assigning primary key")


Unique values in music_sessions table and primary key is correct
Unique values in user_sessions table and primary key is correct
Unique values in song_users table and primary key is correct


#### Do a SELECT to verify that the data have been inserted into each table

In [14]:
#Query 1:  Give me the artist, song title and song's length in the music app history that was heard during \
## sessionId = 338, and itemInSession = 4

music_session_query = "SELECT artist, song, song_length from music_sessions \
                        WHERE sessionId = {} AND itemInSession = {}".format(338, 4)

#Initiating the dataframe with columns
music_session_output = pd.DataFrame(columns = ['artist','song', 'song_length(mins)'])

try:
    data = session.execute(music_session_query) #executing the query
    
    #appending values of the query to the dataframe
    for index, row in enumerate(data):
        music_session_output.loc[index] = [row.artist, row.song, row.song_length/60]
    
    #printing the values
    print(music_session_output)
except Exception as e:
    print(e)

      artist                             song  song_length(mins)
0  Faithless  Music Matters (Mark Knight Dub)           8.255122


In [15]:
#Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)\
## for userid = 10, sessionid = 182

user_session_query = "SELECT artist, song, itemInSession, firstName, lastName from user_sessions \
                        WHERE userId = {} AND sessionId = {}".format(10, 182)

#Initiating the dataframe with columns
user_session_output = pd.DataFrame(columns  = ['artist','song', 'itemInSession','firstName', 'lastName'])

try:
    data = session.execute(user_session_query) #executing the query
    
    #appending values of the query to the dataframe
    for index, row in enumerate(data):
        user_session_output.loc[index] = [row.artist, row.song, row.iteminsession, row.firstname, row.lastname]
    
    #printing the values
    print(user_session_output)
except Exception as e:
    print(e)

              artist                                               song  \
0   Down To The Bone                                 Keep On Keepin' On   
1       Three Drives                                        Greece 2000   
2  Sebastien Tellier                                          Kilometer   
3      Lonnie Gordon  Catch You Baby (Steve Pitron & Max Sanna Radio...   

  itemInSession firstName lastName  
0             0    Sylvie     Cruz  
1             1    Sylvie     Cruz  
2             2    Sylvie     Cruz  
3             3    Sylvie     Cruz  


In [16]:
#Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

song_users_query = "SELECT firstName, lastName, song from song_users \
                        WHERE song = '{}'".format("All Hands Against His Own")

#Initiating the dataframe with columns
song_users_output = pd.DataFrame(columns  = ['firstName', 'lastName','song'])

try:
    data = session.execute(song_users_query) #executing the query
    
    #appending values of the query to the dataframe
    for index, row in enumerate(data):
        song_users_output.loc[index] = [row.firstname, row.lastname, row.song]
    
    #printing the values
    print(song_users_output)
except Exception as e:
    print(e)

    firstName lastName                       song
0        Sara  Johnson  All Hands Against His Own
1  Jacqueline    Lynch  All Hands Against His Own
2       Tegan   Levine  All Hands Against His Own


## Drop the tables before closing out the sessions

In [17]:
try:
    session.execute("DROP TABLE music_sessions")
except Exception as e:
    print(e)

try:
    session.execute("DROP TABLE user_sessions")
except Exception as e:
    print(e)

try:
    session.execute("DROP TABLE song_users")
except Exception as e:
    print(e)

# Validate tables have been deleted in the keyspace 

In [18]:
#If all deleted , nothing should be printed
tables = session.execute("SELECT * FROM system_schema.tables WHERE keyspace_name = 'music'")
for tbl in tables:
    print(tbl.table_name)

## Closing the session and cluster connection

In [19]:
session.shutdown()
cluster.shutdown()