# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [54]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [55]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [56]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
print(len(full_data_rows_list))
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


8056


In [57]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [58]:
# Create a connection to the Cassandra cluster
from cassandra.cluster import Cluster
try: 
    cluster = Cluster() #If you have a locally installed Apache Cassandra instance
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [59]:
# Define the keyspace to which we should connect
try:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS udacity 
        WITH REPLICATION = { 
            'class'              : 'SimpleStrategy', 
            'replication_factor' : 1 
        } 
    """)
except Exception as e:
    print(e)

#### Set Keyspace

In [60]:
# Set the current session to the previously defined keyspace
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

## Query #1 

The goal here is to retreive the following: the name of artist, the song (sorted by the number of items in a user's session) and user's first and last name, for the specific user_id = 10 and their session_id = 182.

### Create and Populate the Database

To facilitate the query above, the database will contain the following fields: session_id, items_in_session, artist, song, length (of song). In order to ensure that each user's songs listened to are uniquely identifiable within this table, the primary key is composite, constisting of the session_id and the items_in_session, where session_id functions as partition key, and items_in_session as clustering key.

In [61]:
# Drop table if it already exists just to make sure we can overwrite it if needed
query = """DROP TABLE IF EXISTS song_info;"""
try:
    session.execute(query)
except Exception as e:
    print(e)

# Create Table
query = """
    CREATE TABLE IF NOT EXISTS song_info
    (session_id int, items_in_session int, artist varchar, song varchar, length float, 
        PRIMARY KEY ((session_id), items_in_session));
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

In [62]:
# Populate the table with data from the CSV file

# Define the query  necessary to fill the table
file = 'event_datafile_new.csv'
query = """
            INSERT INTO song_info
            (session_id, items_in_session, artist, song, length)
            VALUES (%s, %s, %s, %s, %s);
        """

# Open the CSV file
with open(file, encoding = 'utf8') as f:
    
    # Read each line except the first (header)
    csvreader = csv.reader(f)
    next(csvreader) 
    
    # Iterate over each line in the CSV file
    for line in csvreader:
        
        ## Extract the relevant fields from the CSV file line
        artist           = line[0]
        items_in_session = line[3]
        length           = line[5]
        session_id       = line[8]
        song             = line[9]
        
        # Insert the extracted information into the table
        try:
            session.execute(query, (int(session_id), int(items_in_session), artist, song, float(length) ))
        except Exception as e:
            print(e)

### SELECT the necessary data using an appropriate query

As reminder, the goal here is to retreive the following: the name of artist, the song (sorted by the number of items in a user's session) and user's first and last name, for the specific user_id = 10 and their session_id = 182.

In [63]:
# Define a query to select the artist, song, and song length listened to in session 338 with 4 items in its session
query = """
    SELECT artist, song, length FROM song_info 
        WHERE session_id       = 338 
        AND   items_in_session = 4;
"""
try:
    query_hit = session.execute(query)
except Exception as e:
    print(e)
    
# Iterate over all hits from the previous query and print their outcome   
for hit in query_hit:
    print(hit.artist, "-", hit.song, "-", hit.length)

Faithless - Music Matters (Mark Knight Dub) - 495.30731201171875


## Query #2 

The goal here is to retreive the following: the name of the artist, the song(s) sorted by the number of items in a user's session, and the user's first and last name, for the specific user_id = 10 and session_id = 182.

### Create and Populate the Database

In order to nesure that we can uniquely identify each song, and its associated artist, as well as the user that listened to it in that particular session, no matter the number of items within the session, this database has a composite key constisting of the composite partition key user_id and session_id, as the clusteriong column of items_in_session. The additional fields of first_name, last_name, artist, and song are the information we are ultimately after.

In [64]:
# Create and Populate the Table

# Drop table if it already exists just to make sure we can overwrite it if needed
query = """DROP TABLE IF EXISTS user_song_info;"""
try:
    session.execute(query)
except Exception as e:
    print(e)

# Create Table
query = """
    CREATE TABLE IF NOT EXISTS user_song_info
    (user_id int, session_id int, items_in_session int, 
    first_name varchar, last_name varchar, artist varchar, song varchar, 
        PRIMARY KEY ((user_id, session_id), items_in_session))
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

# Query needed to fill table using the CSV FILE
file = 'event_datafile_new.csv'
query = """
    INSERT INTO user_song_info
        (user_id, session_id, items_in_session, first_name, last_name, artist, song)
        VALUES (%s, %s, %s, %s, %s, %s, %s);
        """

# Open the CSV file
with open(file, encoding = 'utf8') as f:
    
    # read each line except the first
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    
    # Iterate over each line
    for line in csvreader:
        
        ## Extract the relevant fields from the CSV file line
        artist           = line[0]
        first_name       = line[1]
        items_in_session = line[3]
        last_name        = line[4]
        session_id       = line[8]
        song             = line[9]
        user_id          = line[10]
        
        # Add the line's relevant information to the table
        try:
            session.execute(query, (int(user_id), int(session_id), int(items_in_session), first_name, last_name, artist, song ))
        except Exception as e:
            print(e)


### SELECT the neccesary data using an appropriate query

As a reminder, the goal here is to retreive the following: the name of the artist, the song(s) sorted by the number of items in a user's session, and the user's first and last name, for the specific user_id = 10 and session_id = 182.

In [65]:
# Extract the desired fields from the table
query = """
    SELECT artist, song, first_name, last_name FROM user_song_info
        WHERE user_id    = 10 
        AND   session_id = 182
    ORDER BY items_in_session;
"""
try:
    query_hit = session.execute(query)
except Exception as e:
    print(e)

# Iterate over all hits from the previous query and print their outcome
for hit in query_hit:
    print(hit.artist, "-", hit.song, "-", hit.first_name, hit.last_name)
                    

Down To The Bone - Keep On Keepin' On - Sylvie Cruz
Three Drives - Greece 2000 - Sylvie Cruz
Sebastien Tellier - Kilometer - Sylvie Cruz
Lonnie Gordon - Catch You Baby (Steve Pitron & Max Sanna Radio Edit) - Sylvie Cruz


## Query #3
The goal here is to retreive the following: the first and last name of every user which (at some point in their listening history) listened to the song "All Hands Against His Own".

### Create and Populate the Database

In order to keep track of what users listened to what songs, this database's composite primary key consists (unsurprisingly) of the song and each user's unique id. This way, the database can allow for the identification of each user's first and last name, if they listened to a particular song.

In [66]:
# Create the Table

# Drop table if it already exists just to make sure we can overwrite it if needed
query = """DROP TABLE IF EXISTS song_listen_info"""
try:
    session.execute(query)
except Exception as e:
    print(e)

# Create Table
query = """
    CREATE TABLE IF NOT EXISTS song_listen_info
        (song varchar, user_id int, first_name varchar, last_name varchar, 
        PRIMARY KEY (song, user_id));
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

# Fill table using the CSV FILE
file = 'event_datafile_new.csv'
query = """
    INSERT INTO song_listen_info 
        (song, user_id, first_name, last_name) 
        VALUES (%s, %s, %s, %s);
"""

# Iterate over each line in the input CSV and populate the database
with open(file, encoding = 'utf8') as f:
    
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    
    for line in csvreader:
        
        ## Extract the relevant fields from the CSV file line
        first_name       = line[1]
        last_name        = line[4]
        song             = line[9]
        user_id          = line[10]
        
        ## TO-DO: Assign which column element should be assigned for each column in the INSERT statement.
        ## For e.g., to INSERT artist_name and user first_name, you would change the code below to `line[0], line[1]`
        try:
            session.execute(query, (song, int(user_id), first_name, last_name))
        except Exception as e:
            print(e)


### Select the data using an appropriate query

As a reminder, the goal here is to retreive the following: the first and last name of every user which (at some point in their listening history) listened to the song "All Hands Against His Own".


In [67]:
# Select the desired entiers
query = """
    SELECT first_name, last_name FROM song_listen_info
        WHERE song = 'All Hands Against His Own';
"""
try:
    query_hit = session.execute(query)
except Exception as e:
    print(e)
    
# Iterate over all hits from the previous query and print their outcome
for hit in query_hit:
    print(hit.first_name, hit.last_name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


## Drop the tables before closing out the sessions

In [68]:
# Define the tables to be dropped
tables = ["song_info", "user_song_info", "song_listen_info"]
query  = """DROP TABLE IF EXISTS {}"""

# Iterate over each table and drop (if possible)
for table in tables:
    try:
        session.execute(query.format(table))
    except Exception as e:
        print(e)

## Close the session and cluster connection¶

In [69]:
session.shutdown()
cluster.shutdown()