# Modelling with Cassandra
## 1. Application Design
A startup called Sparkify wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. The analytics team is particularly interested in understanding what songs users are listening to. Currently, they don't have an easy way to query their data, which resides in a directory of JSON logs on user activity on the app, as well as a directory with JSON metadata on the songs in their app.

The aim is to create a Cassandra database with tables designed to optimize queries on song play analysis.

## 2. Conceptual Data Model
<img src="./images/01_conceptual_data_model.gif" width="1000"/>

## 3. Defining Application Queries

Create a non-sql data models to answer following questions for music streaming up Sparkify:
* Q1: Look up artist, song title and length by `sessionId = 338`, and `itemInSession  = 4`
* Q2: Look up artist, song title, user's first and last name by `userid = 10`, `sessionid = 182`
* Q3: Look up all users (first and last name) who listened to particular song ex. `'All Hands Against His Own'`

<img src="./images/02_application_queries.gif" width="1000"/>

The Sparkify analysts working flow would be first to look to song popularity (Q3). Then select one of the returned users and check his/her songplay history (Q2) and deep dive to particular session (Q1).

## 4. Logical Data Modeling


## 2. Preprocessing log files
Log files are consolidated to one file for easy extraction and loading to Cassandra

In [149]:
    # Import Python packages
import cassandra.cluster
import numpy as np
import pandas as pd
import os
import glob
import csv
from cassandra.cluster import Cluster
from matplotlib import pyplot as plt

In [150]:
# Process log files
def concatenate_files (read_path_pattern: [str, os.PathLike], write_path: [str, os.PathLike]) -> None:
    """Concatenate files matching read_path_pattern according the rules of UNIX shell

    Parameters:
        read_path_pattern: string or os.Pathlike
            Path pattern to concatenate files ex: files/director/*.csv

        write_path: string or os.Pathlike
            Path to concatenated file
    """

    # Delete existing file
    if os.path.exists(write_path):
        os.remove(write_path)

    # Match files path according read_path_pattern
    read_path_list = glob.glob(pathname=read_path_pattern, recursive=True)

    # Write file extension
    write_ext = os.path.splitext(write_path)[-1].lower()

    # Read files and concatenate them using pandas dataframe
    for read_path in read_path_list:

        # Read file extension
        read_ext = os.path.splitext(read_path)[-1].lower()

        # Handle csv files
        if (read_ext == '.csv') and (write_ext =='.csv'):

            # Handle column heading
            if os.path.exists(write_path):
                header = False
            else:
                header = True

            # Read and write csv file
            df = pd.read_csv(read_path)

            # Skip rows with empty artist
            df = df.dropna(subset=['artist'])

            df.to_csv(write_path, mode='a', header=header, index=False)

    return None

# Concatenate csv data files
write_path = 'all_event_data.csv'
read_path_pattern = os.path.join(os.getcwd(), 'event_data', '*.csv')
concatenate_files(read_path_pattern, write_path)

# Sum check
with open(write_path, 'r', encoding = 'utf8') as f:
    num_records = sum(1 for line in f)
    print(f'{num_records} new records copied into {write_path}')

6821 new records copied into all_event_data.csv


In [151]:
# Review consolidated file
columns = {
    'sessionId': int,
    'itemInSession': int,
    'userId': int,
    'artist': str,
    'song': str,
    'firstName': str,
    'lastName': str,
    'length': float
}
df = pd.read_csv(write_path, usecols=columns.keys(), dtype=columns)[columns.keys()]
df.head()

Unnamed: 0,sessionId,itemInSession,userId,artist,song,firstName,lastName,length
0,139,1,8,Des'ree,You Gotta Be,Kaylee,Summers,246.30812
1,139,3,8,Mr Oizo,Flat 55,Kaylee,Summers,144.03873
2,139,4,8,Tamba Trio,Quem Quiser Encontrar O Amor,Kaylee,Summers,177.18812
3,139,5,8,The Mars Volta,Eriatarka,Kaylee,Summers,380.42077
4,139,6,8,Infected Mushroom,Becoming Insane,Kaylee,Summers,440.2673


## 3. Setting Up Apache Cassandra
The cassandra cluster is created on local host with __simple replication strategy__ using 1 replica for purpose of development. For production purpose one can increase replication factor to ensure high availability using one datacenter. __Network topology strategy__ can be used when using more datacenters.

In [152]:
# Print Cassandra result set as pandas dataframe
def pandas_factory(columns, rows):
    return pd.DataFrame(rows, columns=columns)

# Set Cassandra cluster at local host
cluster = Cluster()

# Connect to cluster to execute queries
session = cluster.connect()
session.row_factory = pandas_factory
session.default_fetch_size = None

# Create a Keyspace
query = '''
    CREATE KEYSPACE IF NOT EXISTS sparkify
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
'''
session.execute(query)

# Set keyspace
session.set_keyspace('sparkify')

## 4. Modeling Tasks
### 4.1 Partition Key Rules
It is important to understand the way how the Cassandra write and read to optimize performance. The [rules of thumb](https://medium.com/@herryg91/in-depth-why-modeling-in-cassandra-is-important-ba25081dafd5) are:
1. Partition key should have (10 - 100)K rows
2. Partition key should dhave < 100MB size

In [153]:
def insert_df_into_cassandra (df:pd.DataFrame, session:cassandra.cluster.Session, query:str) -> None:
    """Extracts csv file into pandas dataframe and inserts into cassandra table.

    Parameters:
        df: pd.DataFrame
            pandas dataframe to be inserted into cassandra

        session: cassandra.cluster.Session
            Connection to Cassandra cluster

        query: string
            Insert statement according CQL syntax rules
    """

    # Load dataframe to cassandra table
    prepared = session.prepare(query)
    for _, series in df.iterrows():
        values = tuple(series.to_list())
        bound = prepared.bind(values)
        session.execute(bound)

    return None

In [154]:
# Drop tables
tables = [
    'song_plays_by_session',
    'song_plays_by_user_session',
    'users_by_song_play'
]
for table in tables:
    session.execute(f'''
        DROP TABLE IF EXISTS {table}
    ''')

## 4.2 Song Plays by Session
Give me the artist, song title and song's length in the music app history that was heard during  `sessionId = 338`, and `itemInSession  = 4`

## 4.2.1 Primary Key Discussion
The songplay record is uniquely identified by combination of `sessionId` and `itemInSession`. The 100K records with average rows size of 32B would result in average partition size of ~ 3.2MB. `sessionId` is not optimal for partitioning due to high cardinality (avg 9 rows per `sessionId`). The solution will be to use `shard_id`.

In [155]:
# Combination of sessionId and itemInSession is unique identifier of the songplay
primary_key_columns = ['sessionId', 'itemInSession']
is_primary_key = (df.groupby(primary_key_columns).size() == 1).all()
print(f'Does combination of {primary_key_columns} crates unique row identifier?: {is_primary_key}')

Does combination of ['sessionId', 'itemInSession'] crates unique row identifier?: True


In [156]:
# Decide on partition key
def average_row_size (df:pd.DataFrame) -> float:
    return df.memory_usage(index=False).sum() / df.index.size

def median_num_rows_per_partition(sr: pd.Series) -> [int, pd.Series]:
    return sr.value_counts().median()

partition_key = 'sessionId'
columns = {
    'sessionId': int,
    'itemInSession': int,
    'artist': str,
    'song': str,
    'length': float
}

avg_row_size = average_row_size(df[columns.keys()])
print(f'Average row size: {avg_row_size:.0f}B')

md_num_rows_per_partition = median_num_rows_per_partition(df[partition_key])
print(f'Median rows per {partition_key} partition: {md_num_rows_per_partition}')

Average row size: 32B
Median rows per sessionId partition: 2.0


In [157]:
# Create shard partition key
avg_items_per_session = df.groupby([partition_key]).size().mean()
max_sessions_per_partition = 100_000 / avg_items_per_session
df['shardId'] = (df[partition_key] // max_sessions_per_partition).astype('int')
md_num_rows_per_partition = median_num_rows_per_partition(df['shardId'])
print(f'Average rows per {partition_key}: {avg_items_per_session}')
print(f'Median rows per partition for shard_id: {md_num_rows_per_partition}')
print(f'Estimated maximum sessions per partition: {max_sessions_per_partition}')

Average rows per sessionId: 8.788659793814434
Median rows per partition for shard_id: 6820.0
Estimated maximum sessions per partition: 11378.299120234604


In [158]:
# Fix maximum session per partition
max_sessions_per_partition = 11378

In [159]:
# Create table
song_plays_by_session = '''
    CREATE TABLE IF NOT EXISTS song_plays_by_session
    (
        shard_id INT,
        session_id INT,
        item_in_session INT,
        artist_name TEXT,
        song_title TEXT,
        song_length DECIMAL,
        PRIMARY KEY (shard_id, session_id, item_in_session)
    )
'''
session.execute(song_plays_by_session)

<cassandra.cluster.ResultSet at 0x273a70c3760>

In [160]:
%%time
# Insert data into table
columns = [
    'shardId',
    'sessionId',
    'itemInSession',
    'artist',
    'song',
    'length'
]
query = '''
            INSERT INTO song_plays_by_session
            (
                shard_id,
                session_id,
                item_in_session,
                artist_name,
                song_title,
                song_length
            )
            VALUES
            (?, ?, ?, ?, ?, ?);
    '''

insert_df_into_cassandra(df[columns], session, query)

Wall time: 1min 56s


In [161]:
# Verify select statement
session_id = 338
item_in_session = 4
shard_id = int(session_id // max_sessions_per_partition)
query = f'''
    SELECT
        artist_name,
        song_title,
        song_length
    FROM song_plays_by_session
    WHERE
        shard_id = {shard_id} AND session_id = {session_id} AND item_in_session = {item_in_session};
'''
result = session.execute(query)
result._current_rows

Unnamed: 0,artist_name,song_title,song_length
0,Faithless,Music Matters (Mark Knight Dub),495.3073


## 4.3 Song Plays by User by Session
Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for `userid = 10`, `sessionid = 182`

## 4.3.1 Primary Key Discussion
The songplay record is uniquely identified by combination of `sessionId` and `itemInSession`. The 100K records with average rows size of 44B would result in average partition size of ~ 4.4MB. `userId` is not optimal for partitioning due to high cardinality (avg 15 rows per `userId`). The solution will be to use `shard_id` as a partition key and `user_id`, `session_id` as clustering columns

In [162]:
# Decide on partition key
partition_key = 'userId'
columns = {
    'userId': int,
    'sessionId': int,
    'itemInSession': int,
    'artist': str,
    'song': str,
    'firstName': str,
    'lastName': str
}

avg_row_size = average_row_size(df[columns.keys()])
print(f'Average row size: {avg_row_size:.0f}B')

md_num_rows_per_partition = median_num_rows_per_partition(df[partition_key])
print(f'Median rows per {partition_key} partition: {md_num_rows_per_partition}')

Average row size: 44B
Median rows per userId partition: 15.0


In [163]:
# Create shard partition key
avg_items_per_session = df.groupby([partition_key]).size().mean()
max_users_per_partition = 100_000 / avg_items_per_session
df['shardId'] = (df[partition_key] // max_users_per_partition).astype('int')
md_num_rows_per_partition = median_num_rows_per_partition(df['shardId'])
print(f'Average rows per {partition_key}: {avg_items_per_session}')
print(f'Median rows per partition for shard_id: {md_num_rows_per_partition}')
print(f'Estimated maximum users per partition: {max_users_per_partition}')

Average rows per userId: 71.04166666666667
Median rows per partition for shard_id: 6820.0
Estimated maximum users per partition: 1407.624633431085


In [164]:
# Fix maximum users per partition
max_users_per_partition = 1408

In [165]:
# Create table
song_plays_by_user_session = '''
    CREATE TABLE IF NOT EXISTS song_plays_by_user_session
    (
        shard_id INT,
        user_id INT,
        session_id INT,
        item_in_session INT,
        artist_name TEXT,
        song_title TEXT,
        user_first_name TEXT,
        user_last_name TEXT,
        PRIMARY KEY (shard_id, user_id, session_id, item_in_session)
    )
    WITH CLUSTERING ORDER BY (user_id ASC, session_id ASC, item_in_session ASC);
'''
session.execute(song_plays_by_user_session)

<cassandra.cluster.ResultSet at 0x273a8973850>

In [166]:
%%time
# Insert data into table
columns = [
    'shardId',
    'userId',
    'sessionId',
    'itemInSession',
    'artist',
    'song',
    'firstName',
    'lastName'
]
query = '''
            INSERT INTO song_plays_by_user_session
            (
                shard_id,
                user_id,
                session_id,
                item_in_session,
                artist_name,
                song_title,
                user_first_name,
                user_last_name
            )
            VALUES
            (?, ?, ?, ?, ?, ?, ?, ?);
    '''

insert_df_into_cassandra(df[columns], session, query)

Wall time: 2min 5s


In [167]:
# Verify select statement
# ame of artist, song (sorted by itemInSession) and user (first and last name) for `userid = 10`, `sessionid = 182`
user_id = 10
session_id = 182
shard_id = int(user_id // max_users_per_partition)

query = f'''
    SELECT
        artist_name,
        song_title,
        user_first_name,
        user_last_name
    FROM song_plays_by_user_session
    WHERE
        shard_id = {shard_id} AND user_id = {user_id} AND session_id = {session_id};
'''
result = session.execute(query)
result._current_rows

Unnamed: 0,artist_name,song_title,user_first_name,user_last_name
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


## 4.4 Users by Song Play
Give me every user name (first and last) in my music app history who listened to the song `All Hands Against His Own`

## 4.4.1 Primary Key Discussion
The songplay record is uniquely identified by combination of `sessionId` and `itemInSession`. To get the list of users listening to particular song the combination of `song` and `user_id` is sufficient as primary key. The 100K records with average rows size of 28B would result in average partition size of ~ 2.8MB. `song` is not optimal for partitioning due to high cardinality (avg 15 rows per `userId` and avg 1 rows per `song`). The solution will be to use `shard_id` as a partition key and `user_id`, `song` as clustering columns

In [178]:
# Decide on partition key
partition_key = 'song'
columns = {
    'userId': int,
    'song': str,
    'firstName': str,
    'lastName': str
}

avg_row_size = average_row_size(df[columns.keys()])
print(f'Average row size: {avg_row_size:.0f}B')

md_num_rows_per_partition = median_num_rows_per_partition(df[partition_key])
print(f'Median rows per {partition_key} partition: {md_num_rows_per_partition}')

Average row size: 28B
Median rows per song partition: 1.0


In [179]:
# Create shard partition key
avg_items_per_session = df.groupby([partition_key]).size().mean()
max_users_per_partition = 100_000 / avg_items_per_session
df['shardId'] = (df[partition_key] // max_users_per_partition).astype('int')
md_num_rows_per_partition = median_num_rows_per_partition(df['shardId'])
print(f'Average rows per {partition_key}: {avg_items_per_session}')
print(f'Median rows per partition for shard_id: {md_num_rows_per_partition}')
print(f'Estimated maximum users per partition: {max_users_per_partition}')

TypeError: unsupported operand type(s) for //: 'str' and 'float'

In [175]:
# Fix maximum users per partition
max_users_per_partition = 1408

In [176]:
# Create table
users_by_song_play = '''
    CREATE TABLE IF NOT EXISTS users_by_song_play
    (
        shard_id INT,
        user_id INT,
        song_title INT,
        user_first_name TEXT,
        user_last_name TEXT,
        PRIMARY KEY (shard_id, user_id, song_title)
    )
'''
session.execute(users_by_song_play)

<cassandra.cluster.ResultSet at 0x273a70d82e0>

In [64]:
%%time
# Insert data into table
columns = [
    'shardId',
    'userId',
    'song',
    'firstName',
    'lastName'
]
query = '''
            INSERT INTO users_by_song_play
            (
                shard_id,
                user_id,
                song_title,
                user_first_name,
                user_last_name
            )
            VALUES
            (?, ?, ?, ?, ?);
    '''

insert_df_into_cassandra(df[columns], session, query)

Wall time: 1min 57s


In [65]:
# Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
query = '''
    SELECT
        user_id,
        user_first_name,
        user_last_name
    FROM users_by_song_play
    WHERE
        song_title = 'All Hands Against His Own'
'''
rows = session.execute(query)
for row in rows:
    print(row)

Row(user_id=29, user_first_name='Jacqueline', user_last_name='Lynch')
Row(user_id=80, user_first_name='Tegan', user_last_name='Levine')
Row(user_id=95, user_first_name='Sara', user_last_name='Johnson')


### Drop the tables before closing out the sessions

In [4]:
## TO-DO: Drop the table before closing out the sessions

### Close the session and cluster connection¶

In [None]:
session.shutdown()
cluster.shutdown()