# Initialize

In [1]:
# Import Python packages and functions
import os

import pandas as pd

from db_utils import (get_files, read_all_rows, read_header, write_csv,
                      cassandra_connect, create_table, insert_rows, run_query, drop_table)

In [2]:
# Constants

CSV_FILENAME = 'event_data_full.csv'

# Part I. ETL Pipeline for Pre-Processing the Files

In [3]:
# Create list of filepaths to process original event CSV data files:
file_path_list = get_files(os.path.join(os.getcwd(), 'event_data'))

# Process the files to create the data file CSV that will be used for Apache Cassandra tables
full_data_rows_list = read_all_rows(file_path_list)
write_csv(full_data_rows_list, csv_filename=CSV_FILENAME)

In [4]:
# Checking

n_rows = len(full_data_rows_list)
n_rows_without_artist = sum(1 for r in full_data_rows_list if r[0] == '')
with open(CSV_FILENAME, 'r', encoding='utf8') as f:
    n_rows_csv = sum(1 for line in f)

print('Total number of CSV files = {:,d}'.format(len(file_path_list)))
print()
print('Header of the CSV files:')
!head -n 1 {file_path_list[0]}
print()
print(f'Header of {CSV_FILENAME:s}: (column name --> column position)')
header = read_header(CSV_FILENAME)
print([f'{h:s} --> {i:d}' for i, h in enumerate(header)])
print()
print(f'Total number of rows = {n_rows:,d}')
print(f'Rows without artist  = {n_rows_without_artist:,d}')
print(f'Rows in the CSV file = {n_rows_csv:,d}')

assert n_rows_csv == n_rows - n_rows_without_artist + 1  # plus one, because of the header row

Total number of CSV files = 30

Header of the CSV files:
artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId

Header of event_data_full.csv: (column name --> column position)
['artist --> 0', 'userFirstName --> 1', 'userGender --> 2', 'itemInSession --> 3', 'userLastName --> 4', 'songLength --> 5', 'level --> 6', 'userLocation --> 7', 'sessionId --> 8', 'songTitle --> 9', 'userId --> 10']

Total number of rows = 8,056
Rows without artist  = 1,236
Rows in the CSV file = 6,821


# Part II. Apache Cassandra 

## Create tables

With Apache Cassandra the database tables are modeled based on the queries that will be run.

Queries:

1. Give me the artist, song title and song's length in the music app history that was heard during `sessionId` = 338, and `itemInSession` = 4
1. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for `userid` = 10, `sessionid` = 182
1. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [5]:
cluster, session = cassandra_connect()

In [6]:
csv_df = pd.read_csv(CSV_FILENAME)

### Query 1

Give me the artist, song title and song's length in the music app history that was heard during `sessionId` = 338, and `itemInSession` = 4

In [7]:
# Create table and insert rows

# The query requires:
# - as return values (will be in the SELECT clause): artist, song title, song length
# - as filter values (will be in the WHERE clause): sessionId, itemInSession

# Therefore, the columns necessary in the table are:
# artist, song title, song length, sessionId, itemInSession

# We are going to use (sessionId, itemInSession) as composite partition key, because:
# - the combination of sessionId and itemInSession is unique
# - we need to filter by sessionId and itemInSession (that is, use these columns in the WHERE clause)

# So, in the CREATE TABLE statement, let's use first the primary keys (in the order they appear in PRIMARY KEY),
# then the other columns in the order they will appear in the SELECT clause.

table_name1 = 'sessions'

table_info1 = '''
(session_id INT,
 item_in_session INT,
 artist_name TEXT,
 song_title TEXT,
 song_length FLOAT,
 PRIMARY KEY ((session_id, item_in_session)))'''

create_table(table_name1, table_info1, session)
insert_rows(table_name1, table_info1,
            lambda row: (int(row[8]), int(row[3]), row[0], row[9], float(row[5])),
            CSV_FILENAME, session)

In [8]:
# Verify if the data was entered into the table

# Query: Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

query = f'''
SELECT artist_name, song_title, song_length
FROM {table_name1:s}
WHERE session_id = 338 AND item_in_session = 4;'''

run_query(query, session)

Row(artist_name='Faithless', song_title='Music Matters (Mark Knight Dub)', song_length=495.30731201171875)


In [9]:
# Checking the with content of the CSV file

(csv_df
 .loc[(csv_df['sessionId'] == 338) & (csv_df['itemInSession'] == 4),
      ['artist', 'songTitle', 'songLength']]
 .reset_index(drop=True))

Unnamed: 0,artist,songTitle,songLength
0,Faithless,Music Matters (Mark Knight Dub),495.3073


### Query 2

Give me only the following: name of artist, song (sorted by `itemInSession`) and user (first and last name) for `userid` = 10, `sessionid` = 182

In [10]:
# Create table and insert rows

# The query requires:
# - as return values (will be in the SELECT clause): artist, song title, user firstname, user lastname
# - as filter values (will be in the WHERE clause): userId, sessionId
# - column to be sorted by (will be in the clustering column in the CREATE TABLE statement): itemInSession

# Therefore, the columns necessary in the table are:
# artist, song title, user firstname, user lastname, userId, sessionId, itemInSession

# We are going to use (userId, sessionId) as composite partition key, and itemInSession as clustering column, because:
# - the combination of userId, sessionId and itemInSession is unique
# - we need to filter by userId, sessionId (that is, use these columns in the WHERE clause)
# - we need the results sorted by itemInSession

# So, in the CREATE TABLE statement, let's use first the primary keys (in the order they appear in PRIMARY KEY),
# then the other columns in the order they will appear in the SELECT clause.

table_name2 = 'user_session'

table_info2 = '''
(user_id INT,
 session_id INT,
 item_in_session INT,
 artist_name TEXT,
 song_title TEXT,
 user_firstname TEXT,
 user_lastname TEXT,
 PRIMARY KEY ((user_id, session_id), item_in_session))'''

create_table(table_name2, table_info2, session)
insert_rows(table_name2, table_info2,
            lambda row: (int(row[10]), int(row[8]), int(row[3]), row[0], row[9], row[1], row[4]),
            CSV_FILENAME, session)

In [11]:
# Verify if the data was entered into the table

query = f'''
SELECT artist_name, song_title, user_firstname, user_lastname
FROM {table_name2:s}
WHERE user_id = 10 AND session_id = 182;'''

run_query(query, session)

Row(artist_name='Down To The Bone', song_title="Keep On Keepin' On", user_firstname='Sylvie', user_lastname='Cruz')
Row(artist_name='Three Drives', song_title='Greece 2000', user_firstname='Sylvie', user_lastname='Cruz')
Row(artist_name='Sebastien Tellier', song_title='Kilometer', user_firstname='Sylvie', user_lastname='Cruz')
Row(artist_name='Lonnie Gordon', song_title='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', user_firstname='Sylvie', user_lastname='Cruz')


In [12]:
# Checking the with content of the CSV file

(csv_df
 .loc[(csv_df['userId'] == 10) & (csv_df['sessionId'] == 182),
      ['artist', 'songTitle', 'userFirstName', 'userLastName', 'itemInSession']]
 .sort_values('itemInSession')
 .drop(columns='itemInSession')
 .reset_index(drop=True))

Unnamed: 0,artist,songTitle,userFirstName,userLastName
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### Query 3

Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'.

In [13]:
# Create table and insert rows

# The query requires:
# - as return values (will be in the SELECT clause): user firstname, user lastname
# - as filter values (will be in the WHERE clause): song title

# Therefore, the columns necessary in the table are:
# user firstname, user lastname, song title

# A natural (and naive) choice would be to choose as composite partition key the columns user
# firstname, user lastname, song title. But the combination may not be unique,
# since multiple people can have the same name. So, to disambiguate the name, we are going to use
# the userId instead, which is unique for each user.

# Therefore, we are going to use song title as primary key, and userId as clustering column, because:
# - the combination of song title, userId is unique
# - we need to filter by song title (that is, use this column in the WHERE clause)

# So, in the CREATE TABLE statement, let's use first the primary keys (in the order they appear in PRIMARY KEY),
# then the other columns in the order they will appear in the SELECT clause.

table_name3 = 'music_app_history'

table_info3 = '''
(song_title TEXT,
 user_id INT,
 user_firstname TEXT,
 user_lastname TEXT,
 PRIMARY KEY (song_title, user_id));'''

create_table(table_name3, table_info3, session)
insert_rows(table_name3, table_info3,
            lambda row: (row[9], int(row[10]), row[1], row[4]),
            CSV_FILENAME, session)

In [14]:
# Verify if the data was entered into the table

query = f'''
SELECT user_firstname, user_lastname
FROM {table_name3:s}
WHERE song_title = 'All Hands Against His Own';'''

run_query(query, session)

Row(user_firstname='Jacqueline', user_lastname='Lynch')
Row(user_firstname='Tegan', user_lastname='Levine')
Row(user_firstname='Sara', user_lastname='Johnson')


In [15]:
# Checking the with content of the CSV file

(csv_df
 .loc[csv_df['songTitle'] == 'All Hands Against His Own',
      ['userFirstName', 'userLastName']]
 .reset_index(drop=True))

Unnamed: 0,userFirstName,userLastName
0,Sara,Johnson
1,Jacqueline,Lynch
2,Tegan,Levine


## Drop the tables before closing out the sessions

In [16]:
tables = (table_name1, table_name2, table_name3)

for t in tables:
    drop_table(t, session)

## Close the session and cluster connection¶

In [17]:
session.shutdown()
cluster.shutdown()