# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    print(file_path_list)

/home/workspace
['/home/workspace/event_data/2018-11-27-events.csv', '/home/workspace/event_data/2018-11-04-events.csv', '/home/workspace/event_data/2018-11-07-events.csv', '/home/workspace/event_data/2018-11-09-events.csv', '/home/workspace/event_data/2018-11-19-events.csv', '/home/workspace/event_data/2018-11-05-events.csv', '/home/workspace/event_data/2018-11-22-events.csv', '/home/workspace/event_data/2018-11-16-events.csv', '/home/workspace/event_data/2018-11-26-events.csv', '/home/workspace/event_data/2018-11-24-events.csv', '/home/workspace/event_data/2018-11-29-events.csv', '/home/workspace/event_data/2018-11-15-events.csv', '/home/workspace/event_data/2018-11-20-events.csv', '/home/workspace/event_data/2018-11-06-events.csv', '/home/workspace/event_data/2018-11-18-events.csv', '/home/workspace/event_data/2018-11-21-events.csv', '/home/workspace/event_data/2018-11-10-events.csv', '/home/workspace/event_data/2018-11-23-events.csv', '/home/workspace/event_data/2018-11-02-events.c

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [5]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

In [6]:
print(session)

<cassandra.cluster.Session object at 0x7fd045c0e358>


#### Create Keyspace

In [7]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
""")

<cassandra.cluster.ResultSet at 0x7fd045bf05f8>

#### Set Keyspace

In [8]:
session.set_keyspace('sparkify')

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




### song_plays_by_session

Desired Query:

> Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

Our table will need artist, song, length, because that is the information that will be requested. We also need sessionId, itemInSession because that is how the results will be filtered.

Partition Key:

- Should be sessionId so that our data is distributed by session rather than distributed by position in session (itemInSession).
Primary Key:

- sessionId by itself will not be unique, but if we add itemInSession as a clustering column, we can ensure uniqueness.

In [9]:
session.execute("""
    CREATE TABLE IF NOT EXISTS song_plays_by_session (
        sessionId int,
        itemInSession int,
        artist text,
        song text,
        length float,
        PRIMARY KEY (sessionId, itemInSession)
    )
""")                    

<cassandra.cluster.ResultSet at 0x7fd0337b4e10>

In [10]:
# We have provided part of the code to set up the CSV file. Please complete the Apache Cassandra code below#
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_plays_by_session (sessionId, itemInSession, artist, song, length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

#### Do a SELECT to verify that the data have been inserted into each table

In [11]:
rows = session.execute("""
    SELECT artist, song, length
    FROM song_plays_by_session
    WHERE sessionId=338 AND itemInSession=4
""")

for row in rows:
    print(list(row))

['Faithless', 'Music Matters (Mark Knight Dub)', 495.30731201171875]


### song_plays_by_user_session
Desired Query:

> Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

Our table will need artist, song, firstName, lastName, because that is the information that will be requested. We also need userId, sessionId, itemInSession because that is how the results will be filtered and ordered.

Partition Key:

- Should be userId and sessionId so that our data is distributed by user which is the first filter that is applied in the query. Sessions belonging to the same user might be in different nodes therefore we should use both userId and sessionId as partition keys so sessions from the same user are stored together

Primary Key:

- In this case we are filtering on two keys and then ordering on a third key, so our PK should be the partition key as well as two clustering columns, sessionId, itemInSession.

In [12]:
session.execute("""
    CREATE TABLE IF NOT EXISTS song_plays_by_user_session (
        userId int,
        sessionId int,
        itemInSession int,
        artist text,
        song text,
        firstName text,
        lastName text,
        PRIMARY KEY ((userId, sessionId), itemInSession)
    )
""")                

<cassandra.cluster.ResultSet at 0x7fd025f17828>

In [13]:
file = 'event_datafile_new.csv'

# Insert expected data into song_plays_by_user_session
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_plays_by_user_session (userId, sessionId, itemInSession, artist, song, firstName, lastName)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

In [14]:
rows = session.execute("""
    SELECT itemInSession, artist, song, firstName, lastName
    FROM song_plays_by_user_session 
    WHERE userId=10 AND sessionId=182
""")

for row in rows:
    # Convert named tuple into a list so we only see the values
    print(list(row))

[0, 'Down To The Bone', "Keep On Keepin' On", 'Sylvie', 'Cruz']
[1, 'Three Drives', 'Greece 2000', 'Sylvie', 'Cruz']
[2, 'Sebastien Tellier', 'Kilometer', 'Sylvie', 'Cruz']
[3, 'Lonnie Gordon', 'Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', 'Sylvie', 'Cruz']


### users_by_song_listens
Desired Query:

> Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

Our table will need firstName, lastName, because that is the information that will be requested. We also need song, userId for our Primary Key.

Partition Key:

- Should be song so that our data is distributed by song which is the filter that is applied in the query.
Primary Key:

- I've chosen to add userId to the PK as a clustering column for two reasons:
 - I want to ensure a unique PK and (song, name) didn't seem sufficient
 - As a clustering column, it will ensure my results are ordered by id which is nice
- If i instead wanted alphabetical results, I could use PRIMARY KEY (song, firstName, userId), which would still ensure uniqueness, but would order by firstName instead.

In [15]:
session.execute("""
    CREATE TABLE IF NOT EXISTS users_by_song_listens (
        song text,
        userId int,
        firstName text,
        lastName text,
        PRIMARY KEY (song, userId)
    )
""")       

<cassandra.cluster.ResultSet at 0x7fd045bece10>

In [16]:
file = 'event_datafile_new.csv'

# Insert expected data into users_by_song_listens
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO users_by_song_listens (song, userId, firstName, lastName)"
        query = query + " VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

In [17]:
rows = session.execute("""
    SELECT userId, firstName, lastName
    FROM users_by_song_listens
    WHERE song='All Hands Against His Own'
""")

for row in rows:
    # Convert named tuple into a list so we only see the values
    print(list(row))

[29, 'Jacqueline', 'Lynch']
[80, 'Tegan', 'Levine']
[95, 'Sara', 'Johnson']


### Drop the tables before closing out the sessions

In [18]:
session.execute("drop table song_plays_by_session")
session.execute("drop table song_plays_by_user_session")
session.execute("drop table users_by_song_listens")

<cassandra.cluster.ResultSet at 0x7fd01c2bcb38>

### Close the session and cluster connection¶

In [19]:
session.shutdown()
cluster.shutdown()