# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
from prettytable import PrettyTable

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

/workspace/home


In [3]:
# print all the dir and file in current directory
os.listdir()

['.workspace-config.json',
 '.ipynb_checkpoints',
 'event_data',
 'images',
 'event_datafile_new.csv',
 'Project_1B_ Project_Template.ipynb']

In [4]:
# Get directory path to event data
filepath = os.getcwd() + '/event_data'
print(filepath)

/workspace/home/event_data


In [5]:
# list all the files in event_data folder
os.listdir(filepath)

['2018-11-10-events.csv',
 '2018-11-20-events.csv',
 '2018-11-09-events.csv',
 '2018-11-11-events.csv',
 '2018-11-07-events.csv',
 '2018-11-27-events.csv',
 '2018-11-21-events.csv',
 '2018-11-15-events.csv',
 '2018-11-17-events.csv',
 '2018-11-06-events.csv',
 '2018-11-24-events.csv',
 '2018-11-23-events.csv',
 '2018-11-04-events.csv',
 '2018-11-30-events.csv',
 '2018-11-29-events.csv',
 '2018-11-01-events.csv',
 '2018-11-16-events.csv',
 '2018-11-02-events.csv',
 '2018-11-13-events.csv',
 '2018-11-08-events.csv',
 '2018-11-12-events.csv',
 '2018-11-28-events.csv',
 '2018-11-19-events.csv',
 '2018-11-18-events.csv',
 '2018-11-26-events.csv',
 '2018-11-25-events.csv',
 '2018-11-05-events.csv',
 '2018-11-03-events.csv',
 '2018-11-14-events.csv',
 '2018-11-22-events.csv']

In [6]:
# print all the dirs and files in filepath
for root, dirs, files in os.walk(filepath):
    print("Root: {}\n".format(root))
    print("Dirs: {}\n".format(dirs))
    print("Files: {}\n".format(files))

Root: /workspace/home/event_data

Dirs: []

Files: ['2018-11-10-events.csv', '2018-11-20-events.csv', '2018-11-09-events.csv', '2018-11-11-events.csv', '2018-11-07-events.csv', '2018-11-27-events.csv', '2018-11-21-events.csv', '2018-11-15-events.csv', '2018-11-17-events.csv', '2018-11-06-events.csv', '2018-11-24-events.csv', '2018-11-23-events.csv', '2018-11-04-events.csv', '2018-11-30-events.csv', '2018-11-29-events.csv', '2018-11-01-events.csv', '2018-11-16-events.csv', '2018-11-02-events.csv', '2018-11-13-events.csv', '2018-11-08-events.csv', '2018-11-12-events.csv', '2018-11-28-events.csv', '2018-11-19-events.csv', '2018-11-18-events.csv', '2018-11-26-events.csv', '2018-11-25-events.csv', '2018-11-05-events.csv', '2018-11-03-events.csv', '2018-11-14-events.csv', '2018-11-22-events.csv']



In [7]:
# get each filepath for each file under event_data
for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [8]:
# get all the rows from all the file
full_data_row = []
for f in file_path_list:
    # it is recommended to open the file with the newline='' argument value on all platforms to 
    # disable universal newline translation.
    with open(f, 'r', encoding='utf-8', newline='') as csvFile:
        csvReader = csv.reader(csvFile)
        next(csvReader)
        for line in csvReader:
            full_data_row.append(line)
            
print(len(full_data_row))

8056


In [9]:
# create new csv file that will be used to insert data into the Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_row:
        # check if artist is empty
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [10]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName
- gender
- itemInSession
- lastName
- length
- level
- location
- sessionId
- song
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [11]:
# connect to a cassandra instance
from cassandra.cluster import Cluster
try: 
    cluster = Cluster(['127.0.0.1'])
    # To establish connection and begin executing queries, need a session
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [12]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [13]:
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print(e)

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

#### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


#### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

#### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




### Query 1. Give me the artist, song title and song's length in the music app history that was heard during a given  sessionId and itemInSession

**Table Name:** `song_details_session_item`

**Columns:** 
- `sessionId`
- `itemInSession`
- `artist`
- `song`
- `songLength`

**Primary Key:** `sessionId, itemInSession`

The reason behind choosing `sessionId` and `itemInSession` as primary key is that we have to filter the data based on the given `sessionId` and `itemInSession` value. Here `sessionId` is the partition column and `itemInSession` is the clustering column and both this column can make primary key unique.

In [14]:
query = "create table if not exists song_details_session_item "
query += "(sessionId int, itemInSession int, artist text, song text, songLength float, \
primary key(sessionId, itemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e)

In [15]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "insert into song_details_session_item(sessionId, itemInSession, artist, song, songLength)"
        query = query + "values (%s, %s, %s, %s, %s)"
        try:
            session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))
        except Exception as e:
            print(e)

#### Do a SELECT with condition  sessionId = 338 and itemInSession  = 4 

The following query will return artist, song and songLength where the sessionId=338 and itemInSession=4

In [16]:
query = "select artist, song, songLength from song_details_session_item where sessionId=338 \
and itemInSession=4"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
table = PrettyTable()
table.field_names = ["Artist", "Song", "songLength"]
for row in rows:
    table.add_row([row.artist, row.song, row.songlength])
    
print(table)

+-----------+---------------------------------+--------------------+
|   Artist  |               Song              |     songLength     |
+-----------+---------------------------------+--------------------+
| Faithless | Music Matters (Mark Knight Dub) | 495.30731201171875 |
+-----------+---------------------------------+--------------------+


### Query 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for given userid and sessionid

**Table Name:** `artist_song_user_on_userid_session`

**Columns:**
- `userId`
- `sessionId`
- `itemInSession`
- `artist`
- `song`
- `firstName`
- `lastName`

**Primary Key:** `userId, sessionId, itemInSession`

The combination of `userId` and `sessionId` will not make the primary key unique. So, we need another column to add to make the primary key unique. And also we have to sort the rows based on `itemInSession` and we know that the clustering column sort data within a partition in ascending order. So, we can add `itemInSession` as clustering column to make the primary key unique and also to sort the data based on `itemInSession` value. Here `userId` is the ppartition column and `sessionId` and `itemInSession` are clustering columns.

In [17]:
query = "create table if not exists artist_song_user_on_userid_session "
query += "(userId int, sessionId int, itemInSession int, artist text, song text, firstName text, \
lastName text, primary key(userId, sessionId, itemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e)

In [18]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "insert into artist_song_user_on_userid_session (userId, sessionId, itemInSession, \
        artist, song, firstName, lastName)"
        query = query + "values (%s, %s, %s, %s, %s, %s, %s)"
        try:
            session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], 
                                    line[1], line[4]))
        except Exception as e:
            print(e)

#### Do a SELECT with condition userId=10  sessionId = 182

The following query return artist, song(sorted based on itemInSession in ascending order), firstName and lastName from table where userId=10 and sessionId=182

In [19]:
query = "select artist, song, firstName, lastName from \
artist_song_user_on_userid_session where userId=10 and sessionId=182"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
table = PrettyTable()
table.field_names = ["Artist", "Song", "User"]
for row in rows:
    table.add_row([row.artist, row.song, row.firstname+" "+row.lastname])
    
print(table)

+-------------------+------------------------------------------------------+-------------+
|       Artist      |                         Song                         |     User    |
+-------------------+------------------------------------------------------+-------------+
|  Down To The Bone |                  Keep On Keepin' On                  | Sylvie Cruz |
|    Three Drives   |                     Greece 2000                      | Sylvie Cruz |
| Sebastien Tellier |                      Kilometer                       | Sylvie Cruz |
|   Lonnie Gordon   | Catch You Baby (Steve Pitron & Max Sanna Radio Edit) | Sylvie Cruz |
+-------------------+------------------------------------------------------+-------------+


### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

**Table Name:** `user_on_song`

**Columns:**
- `song`
- `userId`
- `firstName`
- `lastName`

**Primary Key:** `song, userId`

Here, we want the user name who listened to a particular song. So, here only `song` cant be a primary key alone, as it dont make primary key unique. As there may more than one user who listen to a particular song. So, we can add `userId` to `song` to make our primary key. Although `song` and `userId` may not uniquely identify a row, but in this case that wont be a problem. As, in apache cassandra when we try to insert data, if there is already data with the same primary key then data will be updated and no error will be thrown. And, in this case we are only interested to the user name. Even though data will be updated but we will have the same user name as for a particular `userId` `firstName` and `lastName` is same always. So, here `song` is the partition key and `userId` is the clustering column.

In [20]:
query = "create table if not exists user_on_song "
query += "(song text, userId int, firstName text, lastName text, primary key(song, userId))"

try:
    session.execute(query)
except Exception as e:
    print(e)

In [21]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "insert into user_on_song (song, userId, firstName, lastName)"
        query = query + "values (%s, %s, %s, %s)"
        try:
            session.execute(query, (line[9], int(line[10]), line[1], line[4]))
        except Exception as e:
            print(e)

#### Do a SELECT with condition  song='All Hands Against His Own'

The following query will return user firstName and lastName who listen to the given song.

In [22]:
query = "select firstName, lastName from user_on_song where song='All Hands Against His Own'"

try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
table = PrettyTable()
table.field_names = ["User"]
for row in rows:
    table.add_row([row.firstname+" "+row.lastname])
    
print(table)

+------------------+
|       User       |
+------------------+
| Jacqueline Lynch |
|   Tegan Levine   |
|   Sara Johnson   |
+------------------+


### Drop the tables before closing out the sessions

In [23]:
for table in ['song_details_session_item','artist_song_user_on_userid_session','user_on_song']:
    query = "DROP TABLE IF EXISTS " + table
    try:
        session.execute(query)
        print("Dropped table: " + table)
    except Exception as e:
        print(e)

Dropped table: song_details_session_item
Dropped table: artist_song_user_on_userid_session
Dropped table: user_on_song


### Close the session and cluster connection¶

In [24]:
session.shutdown()
cluster.shutdown()