# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [5]:
from typing import Dict, List, Union
import re
import os
import glob
import json
import csv

import pandas as pd
import cassandra
import numpy as np

#### Creating list of filepaths to process original event csv data files

In [6]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [7]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            full_data_rows_list.append(line) 
            

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [8]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [9]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [10]:
keyspace_query = '''
CREATE KEYSPACE IF NOT EXISTS udacity 
WITH REPLICATION = {
    'class': 'SimpleStrategy', 
    'replication_factor': 1
}
'''
session.execute(keyspace_query);

#### Set Keyspace

In [11]:
session.execute('USE udacity')

<cassandra.cluster.ResultSet at 0x7fb82b3deef0>

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




Let's have a quick look at the data

In [12]:
pd.read_csv('./event_datafile_new.csv', nrows=1)

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Stephen Lynch,Jayden,M,0,Bell,182.85669,free,"Dallas-Fort Worth-Arlington, TX",829,Jim Henson's Dead,91


I'll use a dictionary to map the field names to the appriopriate list index in the lines. The dictionary will also contain the correct 

In [13]:
field_col_num_dict = {
    'artist': {'col_index': 0, 'type': str}, 
    'user_first_name': {'col_index': 1, 'type': str}, 
    'gender': {'col_index': 2, 'type': str},
    'item_in_session': {'col_index': 3, 'type': int}, 
    'user_last_name': {'col_index': 4, 'type': str}, 
    'song_length': {'col_index': 5, 'type': float}, 
    'level': {'col_index': 6, 'type': str}, 
    'location': {'col_index': 7, 'type': str}, 
    'session_id': {'col_index': 8, 'type': int}, 
    'song': {'col_index': 9, 'type': str}, 
    'user_id': {'col_index': 10, 'type': int}
}

In [14]:
def get_placeholder_dict(line: List[str], col_names: List[str]) -> Dict[str, Union[int, float, str]]:
    '''Get placeholder dict given column names with correct value casting
    
    :param line: The list of values in a given csv line
    :param col_names: The names of the columns that should be used to create the placeholder dict
    
    :return: placeholder dictionary. key: placeholder name, values: value to fill the placeholder with
    '''
    placeholder_dict = {}
    for col_name in col_names:
        col_meta_data = field_col_num_dict[col_name]
        type_callable = col_meta_data['type']
        val = line[col_meta_data['col_index']]
        cast_val = type_callable(val)
        placeholder_dict[col_name] = cast_val
    return placeholder_dict
    

In [15]:
def create_and_populate_table(
    session: cassandra.cluster.Session, 
    file_path: str, 
    insert_query: str, 
    col_names: List[str]
) -> None:
    '''Create and populate a cassandra table with values from a given csv file
    
    :param session: cassandra session
    :param file_path: file path of the csv file from which the cassandra tables are populated
    :param insert_query: The query used to populate the table
    :param col_names: The columns from the csv file that should be used for creating and populating the table
    '''
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        for line in csvreader:
            placeholder_dict = get_placeholder_dict(line, col_names)
            session.execute(insert_query, placeholder_dict)
    

In [16]:
file = 'event_datafile_new.csv'

## Query 1: Get the artist, song title and song length for given (session id, item in session)
Since session_id x item_in_session create a unique combination and are the fields for the query, they are a good choice in order to create the primary key. 

In [17]:
create_songs_by_session_id_query = '''
CREATE TABLE IF NOT EXISTS songs_by_session_id (
    session_id INT, 
    item_in_session INT, 
    artist TEXT, 
    song TEXT, 
    song_length FLOAT, 
    PRIMARY KEY (session_id, item_in_session)
)
'''

In [18]:
session.execute(create_songs_by_session_id_query)

<cassandra.cluster.ResultSet at 0x7fb82b3de860>

In [19]:
insert_songs_by_session_id_query = '''
INSERT INTO songs_by_session_id (session_id, item_in_session, artist, song, song_length)
VALUES (%(session_id)s, %(item_in_session)s, %(artist)s, %(song)s, %(song_length)s)
'''
create_and_populate_table(
    session, file, insert_songs_by_session_id_query, ['session_id', 'item_in_session', 'artist', 'song', 'song_length']
)

#### Test `songs_by_session_id` by using `session_id=338 AND item_in_session=4`

In [20]:
first_results = session.execute(
    'SELECT artist, song, song_length from songs_by_session_id WHERE session_id=338 AND item_in_session=4'
)
pd.DataFrame(list(first_results))

Unnamed: 0,artist,song,song_length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


### The query works as expected!

## Query 2: Get the artist, song (sorted by itemInSession) and user (first and last name) for given (userid, sessionid)
The two columns that are used for the querying - user_id, and session_id will be our partitioning keys. `item_in_session` will be used as a clustering key for two reasons: 
* Guarantee that the the primary key is unique. 
* Ensure that the results would be ordered by item_in_session, as required.

In [21]:
create_songs_by_user_id_query = '''
CREATE TABLE IF NOT EXISTS songs_by_user_id (
    user_id INT, 
    session_id INT, 
    item_in_session INT,
    artist TEXT,
    song TEXT, 
    user_first_name TEXT,
    user_last_name TEXT, 
    PRIMARY KEY ((user_id, session_id), item_in_session)
)
'''                    

In [22]:
session.execute(create_songs_by_user_id_query)

<cassandra.cluster.ResultSet at 0x7fb82b304390>

In [23]:
insert_songs_by_user_id_query = '''
INSERT INTO songs_by_user_id (user_id, session_id, item_in_session, artist, song, user_first_name, user_last_name)
VALUES (%(user_id)s, %(session_id)s, %(item_in_session)s, %(artist)s, %(song)s, %(user_first_name)s, %(user_last_name)s)
'''

create_and_populate_table(
    session, file, insert_songs_by_user_id_query, 
    ['user_id', 'session_id', 'item_in_session', 'artist', 'song', 'user_first_name', 'user_last_name']
)

#### Test `songs_by_user_id` table by using `user_id=10` and `session_id=182`

In [24]:
second_results = session.execute('SELECT artist, song, item_in_session, user_first_name, user_last_name from songs_by_user_id WHERE user_id=10 and session_id=182')
pd.DataFrame(list(second_results))

Unnamed: 0,artist,song,item_in_session,user_first_name,user_last_name
0,Down To The Bone,Keep On Keepin' On,0,Sylvie,Cruz
1,Three Drives,Greece 2000,1,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,2,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,3,Sylvie,Cruz


#### Looks good

## Query 3: Get first_name, last_name of users who listened to a particular song. 
We are going to query by `song`, so that would be a good choice for our partitioning key. In order to get unique results of first_name, last_name of a user the simplest clustering key would be `user_is`.

In [25]:
create_user_name_by_song_query = '''
CREATE TABLE IF NOT EXISTS user_name_by_song (
    song TEXT, 
    user_id INT, 
    user_first_name TEXT,
    user_last_name TEXT, 
    PRIMARY KEY (song, user_id)
)
'''                    

In [26]:
session.execute(create_user_name_by_song_query)

<cassandra.cluster.ResultSet at 0x7fb82b313f98>

In [27]:
insert_user_name_by_song_query = '''
INSERT INTO user_name_by_song (song, user_id, user_first_name, user_last_name)
VALUES (%(song)s, %(user_id)s, %(user_first_name)s, %(user_last_name)s)
'''

create_and_populate_table(
    session, file, insert_user_name_by_song_query, 
    ['song', 'user_id', 'user_first_name', 'user_last_name']
)

#### Test user_name_by_song by using song = 'All Hands Against His Own'

In [28]:
third_results = session.execute(
    "SELECT user_first_name, user_last_name FROM user_name_by_song WHERE song='All Hands Against His Own'"
)
pd.DataFrame(list(third_results))

Unnamed: 0,user_first_name,user_last_name
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


## yep! looks good

### Drop the tables before closing out the sessions

In [29]:
session.execute("DROP TABLE songs_by_session_id")

<cassandra.cluster.ResultSet at 0x7fb82b381208>

In [30]:
session.execute("DROP TABLE songs_by_user_id")

<cassandra.cluster.ResultSet at 0x7fb82b3304a8>

In [31]:
session.execute("DROP TABLE user_name_by_song")

<cassandra.cluster.ResultSet at 0x7fb82b3306d8>

### Close the session and cluster connection¶

In [32]:
session.shutdown()
cluster.shutdown()