## Welcome to the Cassandra project

The goal of the project is to transform a number of .csv files to 3 optimized Cassandra tables, ready to execute 3 specific queries. 
To reach this goal we need to do the following:

1. transform the .csv files to a Pandas DataFrame
2. inspect the distribution of the potential partition keys
3. check for uniqueness of the primary keys
4. create the database
5. transform the Pandas DataFrame to lists of tuples, ready for insertion into the database
6. insert the data
7. inspect and validate the queries
8. close the session and cluster connection 

### 1. transform the .csv files to a Pandas DataFrame

In [1]:
from pathlib import Path
from src.create_event_data import CreateEventData
from src.data_utils import create_csv_path_list

In [2]:
event_data_path = Path('..') / 'event_data'
csv_list = sorted(create_csv_path_list(event_data_path))

..\event_data contains 30 .csv files.


In [3]:
create_event_instance = CreateEventData(csv_path_list=csv_list)
event_df = create_event_instance.data_pipeline()

INFO 2021-01-05 10:39:47,689 [create_event_data.py:data_pipeline:37] Data pipeline started...
DEBUG 2021-01-05 10:39:47,736 [create_event_data.py:data_pipeline:46] Processed 1 / 30 .csv files
DEBUG 2021-01-05 10:39:47,758 [create_event_data.py:data_pipeline:46] Processed 2 / 30 .csv files
DEBUG 2021-01-05 10:39:47,780 [create_event_data.py:data_pipeline:46] Processed 3 / 30 .csv files
DEBUG 2021-01-05 10:39:47,803 [create_event_data.py:data_pipeline:46] Processed 4 / 30 .csv files
DEBUG 2021-01-05 10:39:47,829 [create_event_data.py:data_pipeline:46] Processed 5 / 30 .csv files
DEBUG 2021-01-05 10:39:47,852 [create_event_data.py:data_pipeline:46] Processed 6 / 30 .csv files
DEBUG 2021-01-05 10:39:47,874 [create_event_data.py:data_pipeline:46] Processed 7 / 30 .csv files
DEBUG 2021-01-05 10:39:47,907 [create_event_data.py:data_pipeline:46] Processed 8 / 30 .csv files
DEBUG 2021-01-05 10:39:47,936 [create_event_data.py:data_pipeline:46] Processed 9 / 30 .csv files
DEBUG 2021-01-05 10:39:4

In [4]:
# compare to the .jpg in /images
event_df.head(n=3)

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Des'ree,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",139,You Gotta Be,8
1,Mr Oizo,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",139,Flat 55,8
2,Tamba Trio,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",139,Quem Quiser Encontrar O Amor,8


In [5]:
# check correct datatypes
event_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6820 entries, 0 to 6819
Data columns (total 11 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   artist         6820 non-null   object 
 1   firstName      6820 non-null   object 
 2   gender         6820 non-null   object 
 3   itemInSession  6820 non-null   int32  
 4   lastName       6820 non-null   object 
 5   length         6820 non-null   float64
 6   level          6820 non-null   object 
 7   location       6820 non-null   object 
 8   sessionId      6820 non-null   int32  
 9   song           6820 non-null   object 
 10  userId         6820 non-null   int32  
dtypes: float64(1), int32(3), object(7)
memory usage: 506.3+ KB


### 2. inspect the distribution of the potential partition keys

The potential partition keys are dependent on the queries we are interested in, these keys are marked in **bold**. These partition keys will be part of the WHERE statements in the SQL queries. To fully benefit from the speed of a Cassandra database, it is important that the partition key is evenly distributed, so that each node has a comparable number of values to store. If only one node contains most of the values, this may slow down the queries.
To prevent this, we visually inspect each potential partition key with the help of matplotlib and seaborn (the code to reproduce these graphs can be found in src/partition_key_graphs.py).

1. Give me the artist, song title and song's length in the music app history that was heard during **sessionId** = 338, and **itemInSession**  = 4
2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for **userid** = 10, **sessionid** = 182
3. Give me every user name (first and last) in my music app history who listened to the **song** 'All Hands Against His Own'

#### Query 1

<img src="https://user-images.githubusercontent.com/49920622/103486947-d7f4a380-4e01-11eb-8b81-6c7494649b28.jpg" width=800>

#### Query 2

<img src="https://user-images.githubusercontent.com/49920622/103487010-581b0900-4e02-11eb-821b-32951b566c4b.jpg" width=800>

#### Query 3

<img src="https://user-images.githubusercontent.com/49920622/103487072-d7104180-4e02-11eb-9689-529824ba78d2.jpg" width=800>

### 3. check for uniqueness of the primary keys

Based on the graphs we have found the best suited partition keys. However, by themselves they cannot uniquely identify each row. Furthermore, we also need to make multiple statements in the WHERE clause. Both of these reasons leads to the addition of clustering keys. The combination of the partition and clustering key make up for the unique primary index. Before we create the tables, we need to verify this assumed uniqueness.

In [6]:
from src.data_utils import unique_key_check

In [7]:
unique_key_check(event_df, ['sessionId', 'itemInSession'])

True

In [8]:
unique_key_check(event_df, ['userId', 'sessionId', 'itemInSession'])

True

In [9]:
unique_key_check(event_df, ['song'])

False

#### Note

Queries 1 and 2 are unique based on the already identified keys, however, for query 3 the variable song is not unique by itself. Since people love to hear certain songs more than once - even in the same session -, we add sessionId *and* itemInSession as clustering keys.

In [10]:
unique_key_check(event_df, ['song', 'sessionId', 'itemInSession'])

True

### 4. create the database

The create_database_pipeline method returns the cluster and session instances so we can use these to execute the insert queries in the next step, and to shutdown the cluster and session at the end of the notebook.

In [12]:
from src.create_database import CreateDatabase
from src.sql_queries import drop_list, create_list

In [13]:
create_db_instance = CreateDatabase(create_queries=create_list, drop_queries=drop_list)

In [14]:
cluster, session = create_db_instance.create_database_pipeline()

INFO 2021-01-05 09:22:55,271 [create_database.py:create_database_pipeline:20] Create database pipeline started...
INFO 2021-01-05 09:22:55,275 [create_database.py:init_cluster_and_session:30] Initializing the local Cassandra cluster and session
INFO 2021-01-05 09:22:55,539 [create_database.py:set_udacity_keyspace:43] Setting up the udacity keyspace
INFO 2021-01-05 09:22:55,652 [create_database.py:drop_tables:59] Dropping tables if exists
INFO 2021-01-05 09:22:55,665 [create_database.py:create_tables:68] Creating tables


### 5. transform the Pandas DataFrame to lists of tuples, ready for insertion into the database

In [15]:
# query 1 -> session_info table 
columns_session_info = ['artist', 'song', 'length', 'sessionId', 'itemInSession']
data_session_info = [tuple(row) for row in event_df[columns_session_info].itertuples(index=False)]

In [16]:
# query 2 -> user_info table 
columns_user_info = ['artist', 'song', 'firstName', 'lastName', 'sessionId', 'userId', 'itemInSession']
data_user_info = [tuple(row) for row in event_df[columns_user_info].itertuples(index=False)]

In [17]:
# query 3 -> user_info_per_song table 
columns_user_info_per_song = ['firstName', 'lastName', 'song', 'sessionId', 'itemInSession']
data_user_info_per_song = [tuple(row) for row in event_df[columns_user_info_per_song].itertuples(index=False)]

### 6. insert the data

In [18]:
from src.sql_queries import insert_list

In [19]:
data_list = [data_session_info, data_user_info, data_user_info_per_song]

for idx, (data, query) in enumerate(zip(data_list, insert_list), start=1):
    print(f"inserting file {idx}/{len(data_list)}")
    for row in data:
        try:
            session.execute(query, row)
        except Exception as e:
            print(e)

inserting file 1/3
inserting file 2/3
inserting file 3/3


### 7. inspect and validate the queries

In [22]:
query_1 = """
SELECT artist
,    song
,    song_length

FROM
    session_info
    
WHERE
    session_id=338
and item_in_session=4
"""

try:
    rows = session.execute(query_1)
except Exception as e:
    print(e)
    
for row in rows:
    print(row)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', song_length=495.30731201171875)


In [26]:
query_2 = """
SELECT artist
,    song
,    first_name
,    last_name
,    item_in_session -- to check sorting

FROM
    user_info
    
WHERE
    user_id=10
and session_id=182
"""

try:
    rows = session.execute(query_2)
except Exception as e:
    print(e)
    
for row in rows:
    print(row)

Row(artist='Down To The Bone', song="Keep On Keepin' On", first_name='Sylvie', last_name='Cruz', item_in_session=0)
Row(artist='Three Drives', song='Greece 2000', first_name='Sylvie', last_name='Cruz', item_in_session=1)
Row(artist='Sebastien Tellier', song='Kilometer', first_name='Sylvie', last_name='Cruz', item_in_session=2)
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', first_name='Sylvie', last_name='Cruz', item_in_session=3)


In [27]:
query_3 = """
SELECT first_name
,    last_name

FROM
    user_info_per_song
    
WHERE
    song='All Hands Against His Own'
"""

try:
    rows = session.execute(query_3)
except Exception as e:
    print(e)
    
for row in rows:
    print(row)

Row(first_name='Sara', last_name='Johnson')
Row(first_name='Jacqueline', last_name='Lynch')
Row(first_name='Tegan', last_name='Levine')


### 8. close the session and cluster connection 

In [28]:
session.shutdown()
cluster.shutdown()