# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
import cassandra
import re
import os
import glob
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    # join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 

        
# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        #if fists item in row is empty then its just the logout or home page, we dont need these eows
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


#### Creating a Cluster

In [4]:
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e: 
    print(e) 

#### Create Keyspace

In [5]:
# Create a Keyspace 
try: 
    session.execute(""" 
        CREATE KEYSPACE IF NOT EXISTS udacity  
        WITH REPLICATION =  
        { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }""" 
    ) 
except Exception as e: 
    print(e) 

### Set Keyspace

In [6]:
# Set KEYSPACE to the keyspace specified above
try: 
    session.set_keyspace('udacity') 
except Exception as e: 
    print(e) 

# Creating Data Model and Loading from CSV file

## Table1: session_songplays 

 - For our session_songplays table we model it so that we can support the following query:

    ``` sql
        Select artist, song_title, length
        from session_songplays
        where session_id = 338 and session_item = 4
        
    ```
    ...
 - We see in our query that we are filtering first by session_id and then by session_item.
 - Therefore session_id (being unique) becomes our candidate for our PRIMARY KEY and session_item becomes a good choice for a clustering column
   in order to support the query defined above.

In [7]:
create_sessions = "CREATE TABLE IF NOT EXISTS session_songplays "
create_sessions = create_sessions + "(session_id smallint, session_item smallint, artist text, song_title text, length double, PRIMARY KEY (session_id, session_item))"
try:
    session.execute(create_sessions)
except Exception as e:
    print(e)


### Insert into session_songplays

In [8]:
# insert into table from approriate csv columns
session_insert = "INSERT INTO session_songplays (session_id, session_item, artist, song_title, length)"
session_insert += " VALUES (%s, %s, %s, %s, %s)"

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        #line = ['artist','firstName','gender','itemInSession','lastName','length','level','location','sessionId','song','userId']
        session.execute(session_insert, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

### Test the query we modeled our table after

In [9]:
query = "SELECT artist, song_title, length FROM session_songplays WHERE session_id=338 AND session_item =4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print(row.artist, row.song_title, row.length)

Faithless Music Matters (Mark Knight Dub) 495.3073


## Table2: user_songplays

 - For our user_songplays table we model it so that we can support the following query:

    ``` sql
        Select artist, song_title (sorted by session_item), first_name, last_name
        from session_songplays
        where user_id = 10 and session_id = 182
        
    ```
    ...
 - We see in our query that we are filtering first by user_id and then by session_id.
 - Therefore user_id (being unique and supporting our query) becomes our candidate for our PRIMARY KEY and session_id becomes a good choice 
   for a clustering column in order to support the query defined above.
 - However we must note that the query also has the requirement that the artist, song_title fields must be sorted by session_item 
 - Therefore we had a second clustering column session_item

In [10]:
# Model user_songplays table after the following query: Give me only the following: name of artist, song (sorted by itemInSession) and user \
# (first and last name) for userid = 10, sessionid = 182

create_users = """CREATE TABLE IF NOT EXISTS user_songplays 
                    (user_id smallint, 
                     session_id smallint,
                     session_item smallint,
                     artist text,
                     song_title text,
                     first_name text,
                     last_name text,
                     PRIMARY KEY (user_id, session_id, session_item)
                    )
"""

try:
    session.execute(create_users)
except Exception as e:
    print(e)
    

                    

### Insert into user_songplays

In [11]:
# insert into table from approriate csv columns

users_insert = "INSERT INTO user_songplays (user_id, session_id, session_item, artist, song_title, first_name, last_name)"
users_insert += " VALUES (%s, %s, %s, %s, %s, %s, %s)"

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        #line = ['artist','firstName','gender','itemInSession','lastName','length','level','location','sessionId','song','userId']
        ## TO-DO: Assign which column element should be assigned for each column in the INSERT statement.
        ## For e.g., to INSERT artist_name and user first_name, you would change the code below to `line[0], line[1]`
        session.execute(users_insert, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

### Test the query we modeled our table after

In [12]:
## Test the query we modeled our table after
query = "SELECT artist, song_title, first_name, last_name from user_songplays WHERE user_id = 10 and session_id = 182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print(row.artist, row.song_title, row.first_name, row.last_name)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


## Table3: song_songplays

 - For our song_songplays table we model it so that we can support the following query:

    ``` sql
        SELECT first_name, last_name 
        from song_songplays 
        WHERE song_title='All Hands Against His Own'
        
    ```
    ...
 - We see in our query that we are filtering first by song_title
 - song_title (not being unique) makes it a bad candidate for our PRIMARY KEY by itself
 - Therfore we Create a compound primary key with user_id, this way we can see if more than one person listened to this song, what are there
   distinct names and membership level etc.
 - Note: If one wanted more granular information, then a 3 column primary key could be created that would include an added session_id which would
   enable us to know more about the context in which a group of users listened to a specific song (what other songs did they play in the session
   ,in how many distinct sessions did a user listen to a song etc.)

In [13]:
create_users = """CREATE TABLE IF NOT EXISTS song_songplays
                    (user_id smallint, 
                     artist text,
                     song_title text,
                     first_name text,
                     last_name text,
                     PRIMARY KEY (song_title, user_id)
                    )
"""

try:
    session.execute(create_users)
except Exception as e:
    print(e)

### Insert into song_sonplays

In [14]:
# insert into table from approriate csv columns
plays_insert = "INSERT INTO song_songplays (user_id, artist, song_title, first_name, last_name)"
plays_insert += " VALUES (%s, %s, %s, %s, %s)"

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        #line = ['artist','firstName','gender','itemInSession','lastName','length','level','location','sessionId','song','userId']
        ## TO-DO: Assign which column element should be assigned for each column in the INSERT statement.
        ## For e.g., to INSERT artist_name and user first_name, you would change the code below to `line[0], line[1]`
        session.execute(plays_insert, (int(line[10]), line[0], line[9], line[1], line[4]))

### Test the query we modeled our table after

In [15]:
# Test the query we modeled our table after
query = "SELECT first_name, last_name from song_songplays WHERE song_title='All Hands Against His Own'"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print(row.first_name, row.last_name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


# Drop the tables before closing out the sessions

In [4]:
## TO-DO: Drop the table before closing out the sessions

In [None]:
drop_sessions = "DROP TABLE IF EXISTS session_songplays"
drop_users = "DROP TABLE IF EXISTS user_songplays"
drop_songs = "DROP TABLE IF EXISTS songs_songplays"

session.execute(drop_sessions)
session.execute(drop_users)
session.execute(drop_songs)

### Close the session and cluster connection¶

In [None]:
session.shutdown()
cluster.shutdown()