# Data Modeling With Apache Cassandra
***A Project in Course of Udacity's Nano-Degree for Data Engineering With AWS***

## The Task
***As provided by Udacity***

A startup called Sparkify wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. The analysis team is particularly interested in understanding what songs users are listening to. Currently, there is no easy way to query the data to generate the results, since the data reside in a directory of CSV files on user activity on the app.  

They'd like a data engineer to create an Apache Cassandra database which can create queries on song play data to answer the questions, and wish to bring you on the project. Your role is to create a database for this analysis. You'll be able to test your database by running queries given to you by the analytics team from Sparkify to create the results.  

Specifically, they'd like you to create tables in Apache Cassandra to run the following queries:
1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4
2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'  

Udacity splits the project in two parts:
Part 1: ETL Pipeline for Pre-Processing the Files
Part II. Complete the Apache Cassandra coding portion of your project.

For the sake of the reviewer, I stick to this division.

In [1]:
# Packages used
# Standard packages
from pathlib import Path

# Third party packages
from cassandra.cluster import Cluster

# Local packages
from src.preprocessing import preprocess_data

# Auto-reload local packages
%load_ext autoreload

## Part I: ETL Pipeline for Pre-Processing the Files

We need to join the data from the csv-files into a single file, which we can use for our database.  

The target structure is given by Udacity through this image:  

<img src="../images/image_event_datafile_new.jpg">

The folder `event_data` holds the data in csv-files from `2018-11-01-events.csv` to `2018-11-30-events.csv` containing the following data (highlighted with `*` the ones we need):
- artist `*`
- auth
- firstName `*`
- gender `*`
- itemInSession `*`
- lastName `*`
- length `*`
- level `*`
- location `*`
- method
- page
- registration
- sessionId `*`
- song `*`
- status
- ts
- userId `*`

Furthermore, `artist` should not be an empty string.

In [2]:
# Global variables
RAW_DATA_PATH = Path("../data/event_data")
PREPROCESS_DATA_PATH = Path("../data/preprocessed_data")
COLUMNS_TO_USE = [
    "userId",
    "firstName", 
    "lastName", 
    "gender", 
    "location", 
    "level", 
    "sessionId", 
    "itemInSession", 
    "artist", 
    "song", 
    "length", 
]

In [3]:
# Get relevant columns into one dataframe
df = preprocess_data(RAW_DATA_PATH)
selected_df = df[COLUMNS_TO_USE].dropna(subset="artist").convert_dtypes({"userId": "int32"}) # Convert userId to int32
selected_df

Unnamed: 0,userId,firstName,lastName,gender,location,level,sessionId,itemInSession,artist,song,length
0,26,Ryan,Smith,M,"San Jose-Sunnyvale-Santa Clara, CA",free,583,0,Harmonia,Sehr kosmisch,655.77751
1,26,Ryan,Smith,M,"San Jose-Sunnyvale-Santa Clara, CA",free,583,1,The Prodigy,The Big Gundown,260.07465
2,26,Ryan,Smith,M,"San Jose-Sunnyvale-Santa Clara, CA",free,583,2,Train,Marry Me,205.45261
5,61,Samuel,Gonzalez,M,"Houston-The Woodlands-Sugar Land, TX",free,597,0,Sony Wonder,Blackbird,218.06975
9,80,Tegan,Levine,F,"Portland-South Portland, ME",paid,602,2,Van Halen,Best Of Both Worlds (Remastered Album Version),289.38404
...,...,...,...,...,...,...,...,...,...,...,...
382,16,Rylan,George,M,"Birmingham-Hoover, AL",paid,1076,57,Foo Fighters,The Pretender,271.38567
383,16,Rylan,George,M,"Birmingham-Hoover, AL",paid,1076,58,Timbiriche,Besos De Ceniza,202.60526
384,16,Rylan,George,M,"Birmingham-Hoover, AL",paid,1076,59,A Perfect Circle,Rose,206.05342
385,16,Rylan,George,M,"Birmingham-Hoover, AL",paid,1076,60,Anberlin,The Haunting,348.682


In [4]:
# Save preprocessed data
selected_df.to_csv(PREPROCESS_DATA_PATH / "event_datafile_new.csv", index=False)

## Part II: Complete the Apache Cassandra Coding Portion of Your Project

### Introduction

Cassandra is a No-SQL database or, more specifically, a key-value store.
In order to setup the database, it makes sense to think of the use cases in terms of queries first. This helps to design the tables required to later perform the queries.

Use cases:
1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4
2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'  

This translates to the following queries:
1. `SELECT artist, song, length FROM [database with appropriate structure] WHERE sessionId = 338 AND itemInSession = 4`
2. `SELECT artist, song, firstName, lastName FROM [database with appropriate structure] WHERE userId = 10 AND sessionId = 182`
3. `SELECT firstName, lastName FROM [database with appropriate structure] WHERE song = 'All Hands Against His Own'`

Furthermore, we need to think about the ordering by `itemInSession` as requested by use case 2.

As we'd like to perform `WHERE` clauses on then `sessionId`, `itemInSession`, `userId`, and `song` should be included in the respective primary keys.
It seems likely that the combination of `sessionId` and `itemInSession` is unique for each row, as one can assume that these ids are generated especially to keep track of individual sessions.

In [5]:
# Checking for uniqueness of sessionId and itemInSession
selected_df[["sessionId", "itemInSession"]].drop_duplicates().shape[0] == selected_df.shape[0]

True

Given this fact about our example data and the assumption above, we will always include `sessionId` and `itemInSession` in the primary key of the tables, if we are aiming to gather all data available. Of course, this can be negleted for databases we design for queries that are targeting for more "filtered" data like the in the third usecase.

For designing the database and keep it performant for future scaling, we also should think about how to cluster our data by choosing the right partition key, whilst keeping in mind how we'd like to cluster our data.

### Setup Cassandra Cluster And Keyspace
#### Creating a Cluster

In [6]:
# Connect to Cassandra on local machine (127.0.0.1)
cluster = Cluster(["127.0.0.1"])

# Establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [7]:
# Create a keyspace query 
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)
except Exception as e:
    print(e)

#### Set Keyspace

In [8]:
# Set keyspace 
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print(e)

### Create queries for the three use cases

#### Use Case 1 
**Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4**

##### Create Search Query

In [9]:
# Create table name for query
database_1 = "artist_song_lenght_by_session_item_in_session"

# Create search query
query_1 = f'''
SELECT artist, song, length 
FROM {database_1} 
WHERE sessionId = 338 AND itemInSession = 4
'''

# Print query
print(query_1)


SELECT artist, song, length 
FROM artist_song_lenght_by_session_item_in_session 
WHERE sessionId = 338 AND itemInSession = 4



##### Create Table

Given the search query, we should simply create a table with the following structure:
- `sessionId` as partition key (assuming that this will create an even distribution of data when the sessionId is hashed)
- `itemInSession` as clustering column (to sort the data by itemInSession)

The (other) value columns are `artist`, `song`, and `length`.

In [10]:
# Prepare creation query
selection_attributes_1 = {"artist":"text", "song":"text", "length":"float"}
partition_keys_1 = {"sessionId":"int"}
clustering_keys_1 = {"itemInSession":"int"}

relevant_columns_1_with_types = partition_keys_1 | clustering_keys_1 | selection_attributes_1

creation_query_1 = f'''
CREATE TABLE IF NOT EXISTS {database_1} (
    {", ".join([f"{key} {value}" for key, value in relevant_columns_1_with_types.items()])},
    PRIMARY KEY (({", ".join(partition_keys_1.keys())}), {", ".join(clustering_keys_1.keys())})
)
'''

# Print creation query
print(creation_query_1)


CREATE TABLE IF NOT EXISTS artist_song_lenght_by_session_item_in_session (
    sessionId int, itemInSession int, artist text, song text, length float,
    PRIMARY KEY ((sessionId), itemInSession)
)



In [11]:
# Perform creation
try:
    session.execute(creation_query_1)
except Exception as e:
    print(e)

##### Insert Data

In [12]:
# Select data to insert from selected_df
relevant_columns_1 = [key for key in relevant_columns_1_with_types.keys()]
selected_df_1 = selected_df[relevant_columns_1]

# Print first 5 rows
selected_df_1.head()

Unnamed: 0,sessionId,itemInSession,artist,song,length
0,583,0,Harmonia,Sehr kosmisch,655.77751
1,583,1,The Prodigy,The Big Gundown,260.07465
2,583,2,Train,Marry Me,205.45261
5,597,0,Sony Wonder,Blackbird,218.06975
9,602,2,Van Halen,Best Of Both Worlds (Remastered Album Version),289.38404


In [13]:
# Create insert query
insert_query_1 = f'''
INSERT INTO {database_1} ({", ".join(relevant_columns_1)})
VALUES ({", ".join(["%s" for _ in relevant_columns_1])})
'''

# Print insert query
print(insert_query_1)


INSERT INTO artist_song_lenght_by_session_item_in_session (sessionId, itemInSession, artist, song, length)
VALUES (%s, %s, %s, %s, %s)



In [14]:
# Insert data
for _, series in selected_df_1.iterrows():
    try:
        session.execute(insert_query_1, tuple(series))
    except Exception as e:
        print(e)

##### Execute Search Query

In [15]:
selection_cols_1 = [key for key in selection_attributes_1.keys()]

try:
    results = session.execute(query_1)
except Exception as e:
    print(e)

for row in results:
    row_as_string = " ".join([str(getattr(row, item.lower())) for item in selection_cols_1])
    print(row_as_string)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


#### Check Against Expected Result from `select_df`

In [16]:
selected_df.query(query_1.replace('\n', ' ').split("WHERE")[1].replace("AND", "and").replace("=", "=="))[selection_cols_1]

Unnamed: 0,artist,song,length
54,Faithless,Music Matters (Mark Knight Dub),495.3073


### Use Case 2
**Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182**

##### Create Search Query

In [17]:
# Create table name for query
database_2 = "artist_song_user_by_session_item_in_session"

# Create search query
query_2 = f'''
SELECT artist, song, firstName, lastName
FROM {database_2}
WHERE userId = 10 AND sessionId = 182
'''

# Print query
print(query_2)


SELECT artist, song, firstName, lastName
FROM artist_song_user_by_session_item_in_session
WHERE userId = 10 AND sessionId = 182



##### Create Table

Given the search query, we should simply create a table with the following structure:
- `userId`, and  `sessionId` as partition key (assuming that this will create an even distribution of data when the userId and sessionId is hashed)
- `itemInSession` as clustering column (to sort the data by itemInSession)

The (other) value columns are `artist`, `song`, `firstName`, and `lastName`.

In [18]:
# Prepare creation query
selection_attributes_2 = {"artist":"text", "song":"text", "firstName":"text", "lastName":"text"}
partition_keys_2 = {"userId":"int", "sessionId":"int"}
clustering_keys_2 = {"itemInSession":"int"}

relevant_columns_2_with_types = partition_keys_2 | clustering_keys_2 | selection_attributes_2

creation_query_2 = f'''
CREATE TABLE IF NOT EXISTS {database_2} (
    {", ".join([f"{key} {value}" for key, value in relevant_columns_2_with_types.items()])},
    PRIMARY KEY (({", ".join(partition_keys_2.keys())}), {", ".join(clustering_keys_2.keys())})
)
'''

# Print creation query
print(creation_query_2)



CREATE TABLE IF NOT EXISTS artist_song_user_by_session_item_in_session (
    userId int, sessionId int, itemInSession int, artist text, song text, firstName text, lastName text,
    PRIMARY KEY ((userId, sessionId), itemInSession)
)



In [19]:
# Perform creation
try:
    session.execute(creation_query_2)
except Exception as e:
    print(e)

#### Insert Data

In [20]:
# Select data to insert from selected_df
relevant_columns_2 = [key for key in relevant_columns_2_with_types.keys()]
selected_df_2 = selected_df[relevant_columns_2]

# Print first 5 rows
selected_df_2.head()

Unnamed: 0,userId,sessionId,itemInSession,artist,song,firstName,lastName
0,26,583,0,Harmonia,Sehr kosmisch,Ryan,Smith
1,26,583,1,The Prodigy,The Big Gundown,Ryan,Smith
2,26,583,2,Train,Marry Me,Ryan,Smith
5,61,597,0,Sony Wonder,Blackbird,Samuel,Gonzalez
9,80,602,2,Van Halen,Best Of Both Worlds (Remastered Album Version),Tegan,Levine


In [21]:
# Create insert query
insert_query_2 = f'''
INSERT INTO {database_2} ({", ".join(relevant_columns_2)})
VALUES ({", ".join(["%s" for _ in relevant_columns_2])})
'''

# Print insert query
print(insert_query_2)


INSERT INTO artist_song_user_by_session_item_in_session (userId, sessionId, itemInSession, artist, song, firstName, lastName)
VALUES (%s, %s, %s, %s, %s, %s, %s)



In [22]:
# Insert data
for _, series in selected_df_2.iterrows():
    try:
        session.execute(insert_query_2, tuple(series))
    except Exception as e:
        print(e)

#### Execute Search Query

In [23]:
selection_cols_2 = [key for key in selection_attributes_2.keys()]

try:
    results = session.execute(query_2)
except Exception as e:
    print(e)

for row in results:
    row_as_string = " ".join([str(getattr(row, item.lower())) for item in selection_cols_2])
    print(row_as_string)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


#### Check Against Expected Result from `select_df`

In [24]:
selected_df.sort_values(by="itemInSession").query(query_2.replace('\n', ' ').split("WHERE")[1].replace("AND", "and").replace("=", "=="))[selection_cols_2]

Unnamed: 0,artist,song,firstName,lastName
155,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
156,Three Drives,Greece 2000,Sylvie,Cruz
158,Sebastien Tellier,Kilometer,Sylvie,Cruz
159,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### Use Case 3
**Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'**

##### Create Search Query

In [25]:
# Create table name for query
database_3 = "user_name_by_song"

# Create search query
query_3 = f'''
SELECT firstName, lastName
FROM {database_3}
WHERE song = 'All Hands Against His Own'
'''

# Print query
print(query_3)


SELECT firstName, lastName
FROM user_name_by_song
WHERE song = 'All Hands Against His Own'



#### Create Table

Given the search query, we should simply create a table with the following structure:
- `song` as partition key. This may not create an total even distribution of data (e.g. some letters are more common than others), but it is a good start, and it will help to faster query for the songs.
- `userId` as clustering columns (as we query for the userId) and also `sessionId` and `itemInSession` as clustering columns to really have a unique primary key for all the rows available (even if the query is IMHO not really designed reflecting this fact, but the first reviewer asked for it).

In [26]:
# Prepare creation query
selection_attributes_3 = {"firstName":"text", "lastName":"text"}
partition_keys_3 = {"song":"text"}
clustering_keys_3 = {"userId":"int", "sessionId":"int", "itemInSession":"int"}

relevant_columns_3_with_types = partition_keys_3 | clustering_keys_3 | selection_attributes_3

creation_query_3 = f'''
CREATE TABLE IF NOT EXISTS {database_3} (
    {", ".join([f"{key} {value}" for key, value in relevant_columns_3_with_types.items()])},
    PRIMARY KEY (({", ".join(partition_keys_3.keys())}), {", ".join(clustering_keys_3.keys())})
)
'''

# Print creation query
print(creation_query_3)


CREATE TABLE IF NOT EXISTS user_name_by_song (
    song text, userId int, sessionId int, itemInSession int, firstName text, lastName text,
    PRIMARY KEY ((song), userId, sessionId, itemInSession)
)



In [27]:
# Create table
try:
    session.execute(creation_query_3)
except Exception as e:
    print(e)

#### Insert Data

In [28]:
# Select data to insert from selected_df
relevant_columns_3 = [key for key in relevant_columns_3_with_types.keys()]
selected_df_3 = selected_df[relevant_columns_3]

# Print first 5 rows
selected_df_3.head()

Unnamed: 0,song,userId,sessionId,itemInSession,firstName,lastName
0,Sehr kosmisch,26,583,0,Ryan,Smith
1,The Big Gundown,26,583,1,Ryan,Smith
2,Marry Me,26,583,2,Ryan,Smith
5,Blackbird,61,597,0,Samuel,Gonzalez
9,Best Of Both Worlds (Remastered Album Version),80,602,2,Tegan,Levine


In [29]:
# Create insert query
insert_query_3 = f'''
INSERT INTO {database_3} ({", ".join(relevant_columns_3)})
VALUES ({", ".join(["%s" for _ in relevant_columns_3])})
'''

# Print insert query
print(insert_query_3)


INSERT INTO user_name_by_song (song, userId, sessionId, itemInSession, firstName, lastName)
VALUES (%s, %s, %s, %s, %s, %s)



In [30]:
# Insert data
for _, series in selected_df_3.iterrows():
    try:
        session.execute(insert_query_3, tuple(series))
    except Exception as e:
        print(e)

selection_cols_3 = [key for key in selection_attributes_3.keys()]

#### Execute Search Query

In [31]:
try:
    results = session.execute(query_3)
except Exception as e:
    print(e)

for row in results:
    row_as_string = " ".join([str(getattr(row, item.lower())) for item in selection_cols_3])
    print(row_as_string)

Jacqueline Lynch
Tegan Levine
Sara Johnson


 #### Check Against Expected Result from `select_df`

In [32]:
selected_df.sort_values(by=["firstName", "lastName"]).query(query_3.replace('\n', ' ').split("WHERE")[1].replace("AND", "and").replace("=", "=="))[selection_cols_3]

Unnamed: 0,firstName,lastName
315,Jacqueline,Lynch
57,Sara,Johnson
100,Tegan,Levine


### Clean Up

#### Drop the tables before closing out the sessions

In [33]:
## Drop the table before closing out the sessions
try:
    session.execute(f"DROP TABLE IF EXISTS {database_1}")
except Exception as e:
    print(e)

try:
    session.execute(f"DROP TABLE IF EXISTS {database_2}")
except Exception as e:
    print(e)

try:
    session.execute(f"DROP TABLE IF EXISTS {database_3}")
except Exception as e:
    print(e)

#### Close the session and cluster connection¶

In [34]:
session.shutdown()
cluster.shutdown()