# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
from IPython.display import display
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Error Wrapper

This is a utility function that takes in another function and execute it in a try/catch block showing the error message if there is any.

In [5]:
def safe_execute(execute):
    try:
        return execute()
    except Exception as e:
        print(e) 

#### Populate Dataframe for raw data analysis

In [6]:
df = pd.read_csv('event_datafile_new.csv')
df.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Stephen Lynch,Jayden,M,0,Bell,182.85669,free,"Dallas-Fort Worth-Arlington, TX",829,Jim Henson's Dead,91
1,Manowar,Jacob,M,0,Klein,247.562,paid,"Tampa-St. Petersburg-Clearwater, FL",1049,Shell Shock,73
2,Morcheeba,Jacob,M,1,Klein,257.41016,paid,"Tampa-St. Petersburg-Clearwater, FL",1049,Women Lose Weight (Feat: Slick Rick),73
3,Maroon 5,Jacob,M,2,Klein,231.23546,paid,"Tampa-St. Petersburg-Clearwater, FL",1049,Won't Go Home Without You,73
4,Train,Jacob,M,3,Klein,216.76363,paid,"Tampa-St. Petersburg-Clearwater, FL",1049,Hey_ Soul Sister,73


#### Creating a Cluster

In [7]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

Creating a keyspace dend_p2 representing Data Engineering Nanodegree Project 2. It uses SimpleStrategy with replication factor 1 since scaling is not a concern for this project

In [8]:
# TO-DO: Create a Keyspace 
def create_keyspace():
    session.execute(
        """
            CREATE KEYSPACE dend_p2
            WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }
        """        
    )
    
safe_execute(create_keyspace)

#### Set Keyspace

In [9]:
# TO-DO: Set KEYSPACE to the keyspace specified above
safe_execute(lambda: session.set_keyspace('dend_p2'))

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




#### Populate table

This is a helper function that takes in a function that inserts records into the table using a generic file looper

In [10]:
def populate_table(insert_line):
    file = 'event_datafile_new.csv'
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        count = 0
        for line in csvreader:
            insert_line(line)
            count += 1
            
        print(f"Inserted {count} records.")

#### Validate Results

This is a helper function that takes in a sql query, query parameters and a print function to show the results of the query.

In [11]:
def select_table(sql, parameters, show_results):
    for history in safe_execute(lambda: session.execute(session.prepare(sql), parameters)):
        show_results(history)

#### Query 1. Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

##### Preview Raw Data

In [12]:
display(df[["sessionId", "itemInSession", "artist", "song", "length"]].head())

display(df[
    (df["sessionId"] == 338) & (df["itemInSession"] == 4)
][["sessionId", "itemInSession", "artist", "song", "length"]])

Unnamed: 0,sessionId,itemInSession,artist,song,length
0,829,0,Stephen Lynch,Jim Henson's Dead,182.85669
1,1049,0,Manowar,Shell Shock,247.562
2,1049,1,Morcheeba,Women Lose Weight (Feat: Slick Rick),257.41016
3,1049,2,Maroon 5,Won't Go Home Without You,231.23546
4,1049,3,Train,Hey_ Soul Sister,216.76363


Unnamed: 0,sessionId,itemInSession,artist,song,length
5810,338,4,Faithless,Music Matters (Mark Knight Dub),495.3073


##### Design

Base on the raw data and the required query search by sessionId and item inInSession. The table is created with a compound primary key using a simple partition key and a clustering key. If the dataset become too large then it can utilized a composite primary key of sessionId and itemInSession.

1. sessionId is selected as the partition key with type int.
2. itemInSession is a clustered key with type int.
3. artist text
4. song text
5. length float

In [13]:
## TO-DO: Query 1:  Give me the artist, song title and song's length in the music app history that was heard during \
## sessionId = 338, and itemInSession = 4
def create_session_history():
    session.execute("""
        CREATE TABLE IF NOT EXISTS session_history (
            sessionId int, itemInSession int, artist text, song text, length float, 
            PRIMARY KEY(sessionId, itemInSession)
        )
    """)

safe_execute(create_session_history)

def populate_session_history(line):
    query = """
        INSERT INTO session_history (sessionId, itemInSession, artist, song, length)
        VALUES (?, ?, ?, ?, ?)
    """
    safe_execute(lambda: session.execute(session.prepare(query), [int(line[8]), int(line[3]), line[0], line[9], float(line[5])]))
    
populate_table(populate_session_history)

Inserted 6820 records.


#### Do a SELECT to verify that the data have been inserted into each table

In [14]:
## TO-DO: Add in the SELECT statement to verify the data was entered into the table
def print_session_history(history):
    print(f"artist: {history.artist}, song: {history.song}, length: {history.length}")

select_table(
    "SELECT artist, song, length from session_history WHERE sessionId = ? AND itemInSession = ?",
    [338, 4], print_session_history
)

artist: Faithless, song: Music Matters (Mark Knight Dub), length: 495.30731201171875


### COPY AND REPEAT THE ABOVE THREE CELLS FOR EACH OF THE THREE QUESTIONS

#### Query 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

##### Preview Raw Data

In [15]:
display(df[["userId", "sessionId", "itemInSession", "artist", "firstName", "lastName"]].head())
display(df[
    (df["userId"] == 10) & (df["sessionId"] == 182)
][["userId", "sessionId", "itemInSession", "artist", "firstName", "lastName"]])

Unnamed: 0,userId,sessionId,itemInSession,artist,firstName,lastName
0,91,829,0,Stephen Lynch,Jayden,Bell
1,73,1049,0,Manowar,Jacob,Klein
2,73,1049,1,Morcheeba,Jacob,Klein
3,73,1049,2,Maroon 5,Jacob,Klein
4,73,1049,3,Train,Jacob,Klein


Unnamed: 0,userId,sessionId,itemInSession,artist,firstName,lastName
6805,10,182,0,Down To The Bone,Sylvie,Cruz
6806,10,182,1,Three Drives,Sylvie,Cruz
6807,10,182,2,Sebastien Tellier,Sylvie,Cruz
6808,10,182,3,Lonnie Gordon,Sylvie,Cruz


##### Design

Base on the raw data and the required query search by userId and sessionId, with records also sorted by itemInSession. The table is created with a compound primary key using a simple partition key and 2 clustering keys. If the dataset become too large then it can utilized a composite primary key of userId and sessionId and a clustering key of itemInSession.

1. userId is selected as the partition key with type int.
2. sessionId is a clustered key with type int. 
3. itemInSession is a clustered key with type int.
3. artist text
4. firstName text
5. lastName float

In [16]:
## TO-DO: Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)\
## for userid = 10, sessionid = 182
def create_user_session_history():
    session.execute("""
        CREATE TABLE IF NOT EXISTS user_session_history (
            userId int, sessionId int, itemInSession int, artist text, song text, firstName text, lastName text,
            PRIMARY KEY(userId, sessionId, itemInSession)
        )
    """)

safe_execute(create_user_session_history)

def populate_user_session_history(line):
    query = """
        INSERT INTO user_session_history (userId, sessionId, itemInSession, artist, song, firstName, lastName)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """
    safe_execute(lambda: session.execute(session.prepare(query), [int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]]))
    
populate_table(populate_user_session_history)

Inserted 6820 records.


#### Do a SELECT to verify that the data have been inserted into each table

In [17]:
## TO-DO: Add in the SELECT statement to verify the data was entered into the table
def print_user_session_history(history):
    print(f"artist: {history.artist}, song: {history.song}, firstName: {history.firstname}, lastName: {history.lastname}")

select_table(
    "SELECT artist, song, firstName, lastName from user_session_history WHERE sessionId = ? AND userId = ?",
    [182, 10], print_user_session_history
)

artist: Down To The Bone, song: Keep On Keepin' On, firstName: Sylvie, lastName: Cruz
artist: Three Drives, song: Greece 2000, firstName: Sylvie, lastName: Cruz
artist: Sebastien Tellier, song: Kilometer, firstName: Sylvie, lastName: Cruz
artist: Lonnie Gordon, song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit), firstName: Sylvie, lastName: Cruz


#### Query 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

##### Preview Raw Data

In [18]:
display(df[["song", "userId", "firstName", "lastName"]].head())

user_group = df.groupby(
    ["firstName", "lastName", "userId"]
)

display(user_group.size().reset_index().groupby(
    ["firstName", "lastName"]
).size().sort_values(ascending=False).head())

Unnamed: 0,song,userId,firstName,lastName
0,Jim Henson's Dead,91,Jayden,Bell
1,Shell Shock,73,Jacob,Klein
2,Women Lose Weight (Feat: Slick Rick),73,Jacob,Klein
3,Won't Go Home Without You,73,Jacob,Klein
4,Hey_ Soul Sister,73,Jacob,Klein


firstName  lastName
Ava        Robinson    2
Chloe      Cuevas      1
Christian  Porter      1
Cienna     Freeman     1
Cierra     Finley      1
dtype: int64

##### Design

Base on the raw data and the required query search by song. The table is created with a compound primary key using a simple partition key and a clustering key. If the dataset become too large then it can utilized a composite primary key of sessionId and itemInSession.

1. song is selected as the partition key with type int.
2. userId is a clustered key with type int. This column is added since the firstName and lastName actually had a duplication entry for Ava Robinson.
3. firstName text
4. lastName text

In [19]:
## TO-DO: Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
def create_user_song_history():
    session.execute("""
        CREATE TABLE IF NOT EXISTS user_song_history (
            song text, user_id int, firstName text, lastName text,
            PRIMARY KEY(song, user_id)
        )
    """)

safe_execute(create_user_song_history)

def populate_user_song_history(line):
    query = """
        INSERT INTO user_song_history (song, user_id, firstName, lastName)
        VALUES (?, ?, ?, ?)
    """
    safe_execute(lambda: session.execute(session.prepare(query), [line[9], int(line[10]), line[1], line[4]]))
    
populate_table(populate_user_song_history)

Inserted 6820 records.


#### Do a SELECT to verify that the data have been inserted into each table

In [20]:
## TO-DO: Add in the SELECT statement to verify the data was entered into the table
def print_session_history(history):
    print(f"song: {history.song}, firstName: {history.firstname}, lastName: {history.lastname}")

select_table(
    "SELECT song, firstname, lastname FROM user_song_history WHERE song = ?",
    ["All Hands Against His Own"], print_session_history
)

song: All Hands Against His Own, firstName: Jacqueline, lastName: Lynch
song: All Hands Against His Own, firstName: Tegan, lastName: Levine
song: All Hands Against His Own, firstName: Sara, lastName: Johnson


### Drop the tables before closing out the sessions

In [21]:
## TO-DO: Drop the table before closing out the sessions
safe_execute(lambda: session.execute("DROP TABLE IF EXISTS session_history"))
safe_execute(lambda: session.execute("DROP TABLE IF EXISTS user_session_history"))
safe_execute(lambda: session.execute("DROP TABLE IF EXISTS user_song_history"))

<cassandra.cluster.ResultSet at 0x7fde686cb908>

### Close the session and cluster connection¶

In [22]:
session.shutdown()
cluster.shutdown()