# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

1. Checking current working directory
2. Get current folder and subfolder event data
3. Create a for loop to create a list of files and collect each filepath
4. Join the filepath and roots with the subdirecories using glob

In [2]:
print(os.getcwd())

filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

1. Initiate an empty list of rows that will be generated from each file
2. For every filepath in the file path list
3. Read in csv file
4. Create a csv reader object
5. Extract each data row one by one and append it
6. Create a smaller event data csv file called event_datafile_full csv that will be used to insert data into the
Apache Cassandra tables

In [3]:
full_data_rows_list = [] 
    
for f in file_path_list: 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile:  
        csvreader = csv.reader(csvfile) 
        next(csvreader)       
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8],\
                         row[12], row[13], row[16]))


Validate the row count in the csv file

In [4]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

 This cell creates a connection to local Cassandra instance (127.0.0.1)

In [5]:
from cassandra.cluster import Cluster
try: 
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [6]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




# Create table to test data load
The purpose of the next three cells is to test the data and the method. I choose to include several columns to get
a broad sense of the data. I choose a filter to limit the results and to validate that the query was working as 
expected including the where clause. I chose sessionid as the partition key and iteminsession for a clustering key because I expected to use these again in later queries and wanted to test the approach.

# Data type logic: 
length is cast as a float because it has a decimal and there is the potential that math might be performed on this field. For instance the length of some combination of songs. iteminsession is cast as an integer because it is ordinal. All others are cast as text including id numbers. In this data set item numbers are factor variables and should not be leveraged in derivations nor should they have math performed on the values.

In [8]:
query="drop table if exists evnt_data_new_test"  
try:
    session.execute(query)
except Exception as e:
    print(e)
    
query = "CREATE TABLE IF NOT EXISTS evnt_data_new_test "
query = query + "(sessionid text, iteminsession int, userid text, artist text, song text \
                 , length float, firstname text, lastname text, PRIMARY KEY (sessionid,iteminsession))"
try:
    session.execute(query)
except Exception as e:
    print(e)    


In [9]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for row in csvreader:
        #print(row[4])
        query = "insert into evnt_data_new_test (sessionid, iteminsession, userid, artist, song, length, firstname \
                                           , lastname)"
        query=query+"values (%s, %s, %s, %s, %s, %s, %s, %s)"        
        try:
            session.execute(query, (row[8], int(row[3]), row[10], row[0], row[9], float(row[5]), row[1], row[4]))
        except Exception as e:
            print(e)  

#### Do a SELECT to verify that the data have been inserted into each table

In [10]:
query = "select sessionid, iteminsession, userid, artist, song, length, firstname, lastname \
          from evnt_data_new_test where sessionId='338'"
try:
    rows = session.execute(query)

except Exception as e:
    print(e)


for row in rows:
    print (row.sessionid, row.iteminsession, row.userid, row.artist, row.song, row.length, 
           row.firstname, row.lastname)
 

338 1 50 Pixies Build High 89.36444091796875 Ava Robinson
338 2 50 The Roots / Jack Davey Atonement 155.95057678222656 Ava Robinson
338 3 50 Mike And The Mechanics A Beggar On A Beach Of Gold 275.1211853027344 Ava Robinson
338 4 50 Faithless Music Matters (Mark Knight Dub) 495.30731201171875 Ava Robinson


Query 1: artist, song title and song's length in the music app history that was heard during sessionId = 338 and itemInSession = 4
I built this table and query around the list of columns in the requirements and the fields I needed for limiting the data and ordering it. I chose sessionid for the partition key because the requirements called for filtering on sessionid leveraging a where clause. I included iteminsession as a clustering key so that the data would be ordered by that column.

In [11]:
query="drop table if exists artist_by_song_session"  
try:
    session.execute(query)
except Exception as e:
    print(e)
    
query = "CREATE TABLE IF NOT EXISTS artist_by_song_session "
query = query + "(sessionid text, iteminsession int, userid text, artist text, song text \
                 , length float, firstname text, lastname text, PRIMARY KEY (sessionid,iteminsession))"
try:
    session.execute(query)
except Exception as e:
    print(e)    


In [12]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for row in csvreader:
        #print(row[4])
        query = "insert into artist_by_song_session (sessionid, iteminsession, userid, artist, song, length, firstname \
                                           , lastname)"
        query=query+"values (%s, %s, %s, %s, %s, %s, %s, %s)"        
        try:
            session.execute(query, (row[8], int(row[3]), row[10], row[0], row[9], float(row[5]), row[1], row[4]))
        except Exception as e:
            print(e)  

In [13]:
query = "select artist, song, length from artist_by_song_session where sessionId='338'  and iteminsession=4"
try:
    rows = session.execute(query)

except Exception as e:
    print(e)


for row in rows:
    print (row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


Create query 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
This query is similar to query one in being centered on the artist but requiring limiting by sessionid. I included userid as my first clustering key because the requirements called for an additional adding a filter for this data in the where clause. I added iteminsession as a second clustering key in order to order the data by this value

In [14]:
query="drop table if exists artist_by_song_user"  
try:
    session.execute(query)
except Exception as e:
    print(e)
    
query = "CREATE TABLE IF NOT EXISTS artist_by_song_user"
query = query + "(sessionid text, iteminsession int, userid text, artist text, song text \
                 , length float, firstname text, lastname text, PRIMARY KEY (sessionid,userid,iteminsession))"
try:
    session.execute(query)
except Exception as e:
    print(e)    

In [15]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for row in csvreader:
        #print(row[4])
        query = "insert into artist_by_song_user (sessionid, iteminsession, userid, artist, song, length, firstname \
                                           , lastname)"
        query=query+"values (%s, %s, %s, %s, %s, %s, %s, %s)"        
        try:
            session.execute(query, (row[8], int(row[3]), row[10], row[0], row[9], float(row[5]), row[1], row[4]))
        except Exception as e:
            print(e)  

In [16]:
query = "select artist, song, firstname, lastname \
         from artist_by_song_user where sessionid='182' and userid='10'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist,row.song,row.firstname, row.lastname)                 

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


Create query 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [17]:
query="drop table if exists user_by_song_AllHands"  
try:
    session.execute(query)
except Exception as e:
    print(e)
    
query = "CREATE TABLE IF NOT EXISTS user_by_song_AllHands "
query = query + "(sessionid text, iteminsession float, userid text, artist text, song text \
                 , length float, firstname text, lastname text, PRIMARY KEY (song,userid))"
try:
    session.execute(query)
except Exception as e:
    print(e)    


In [18]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for row in csvreader:
        #print(row[4])
        query = "insert into user_by_song_AllHands (sessionid, iteminsession, userid, artist, song, \
        length, firstname, lastname)"
        query=query+"values (%s, %s, %s, %s, %s, %s, %s, %s)"        
        try:
            session.execute(query, (row[8], int(row[3]), row[10], row[0], row[9], float(row[5]), row[1], row[4]))
        except Exception as e:
            print(e)  

In [19]:
query = "select song,firstname,lastname from user_by_song_AllHands where song='All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.firstname, row.lastname)                 
                    

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop the tables before closing out the sessions

In [20]:
query="drop table if exists evnt_data_new" 
query="drop table if exists evnt_data_new_test"
query="drop table if artist_by_song_session"
query="drop table if exists artist_by_song_user"
query="drop table if exists user_by_song_AllHands"

### Close the session and cluster connection¶

In [21]:
session.shutdown()
cluster.shutdown()