# Data Modeling with Apache Cassandra

I created this project with the goal to create an Apache Cassandra database to run some analysis in songs data. 


In [21]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

CSV_EVENTS_FILE = 'event_datafile_new.csv'
KEYSPACE = 'anobrega'

## 1. Pre-processing csv files

In [22]:
filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):    
    file_path_list = glob.glob(os.path.join(root,'*'))


In [23]:
full_data_rows_list = [] 

for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            full_data_rows_list.append(line) 
            
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open(CSV_EVENTS_FILE, 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','iteminsession','lastName','length',\
                'level','location','sessionid','song','userid'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


### Checking the number of rows in csv file and viewing the columns names

In [24]:
df_events = pd.read_csv(CSV_EVENTS_FILE, sep=',')
print(f"Number of lines {len(df_events)}")
print(df_events.columns)


Number of lines 6820
Index(['artist', 'firstName', 'gender', 'iteminsession', 'lastName', 'length',
       'level', 'location', 'sessionid', 'song', 'userid'],
      dtype='object')


## 2. Analysing data

#### Support functions

In [25]:
def executeSafe(func):
    """Try to execute function passed, if it fails print error"""
    try:
        return func()
    except Exception as e:
        print(e)


def execute_query_without_return(session, query):
    """Execute query in a safe way"""
    executeSafe(lambda: session.execute(query))


def insert_rows_from_file(session, insert_query, positions_mapper, file=CSV_EVENTS_FILE):
    """Reads the file passed, map insert query and execute it"""
    with open(file, encoding='utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader)
        for line in csvreader:
            session.execute(insert_query, (tuple([line[p[0]] if not p[1] else p[1](
                line[p[0]]) for p in positions_mapper.items()])))


def validate_insert(session, table_name):
    """Validate if the data was inserted"""
    validate_first_insert_query = f"SELECT count(*) FROM {table_name}"
    rows = executeSafe(lambda: session.execute(validate_first_insert_query))
    print("############## INSERTED LINES ##############")
    for item in rows:
        print(f"Number of lines in {table_name}: {item.count}")
    print("############################################")


def view_schema(cluster, table_name, keyspace=KEYSPACE):
    """Shows the table schema"""
    print("############## SCHEMA ##############")
    print(
        cluster.metadata.keyspaces[keyspace].tables[table_name].export_as_string())
    print("####################################")


#### Connecting to local Cassandra

In [26]:
from cassandra.cluster import Cluster

cluster = executeSafe(lambda: Cluster(["127.0.0.1"]))

session = executeSafe(lambda: cluster.connect())


#### Creating and setting keyspace

In [27]:
keyspace_create_query = f"CREATE KEYSPACE IF NOT EXISTS {KEYSPACE}" + \
    " WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}"

execute_query_without_return(session, keyspace_create_query)

executeSafe(lambda: session.set_keyspace(KEYSPACE))


### 2.1 Getting artist, song title and length informations history filtering by session and iteminsession

Since we want to filter by `sessionid` and `iteminsession` they must be the composite key, in this sequence. So the data will be ordered by `iteminsession`.

#### Creating table 'song_info'

In [28]:
first_table_name = 'song_info'
create_table_query1 = f"CREATE TABLE {first_table_name} (sessionid int, iteminsession int, artist text, song_title text, song_length float, PRIMARY KEY(sessionid, iteminsession)) "
execute_query_without_return(session, create_table_query1)

view_schema(cluster, first_table_name)


############## SCHEMA ##############
CREATE TABLE anobrega.song_info (
    sessionid int,
    iteminsession int,
    artist text,
    song_length float,
    song_title text,
    PRIMARY KEY (sessionid, iteminsession)
) WITH CLUSTERING ORDER BY (iteminsession ASC)
    AND additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';
####################################


#### Mapping and inserting data into the new table

In [29]:
insert_query = f"INSERT INTO {first_table_name} (sessionid, iteminsession, artist, song_title, song_length)"
insert_query = insert_query + " VALUES (%s, %s, %s, %s, %s);"
mapper_table1 = { 8: int, 3: int, 0: None, 9: None, 5: float}
insert_rows_from_file(session, insert_query, mapper_table1)

validate_insert(session, first_table_name)


############## INSERTED LINES ##############
Number of lines in song_info: 6820
############################################


### 2.1.1 The first analyse that we'll do is to find out what are the records with `sessionid` = 338 and  `iteminsession` = 4

In [30]:
first_query = f"SELECT artist, song_title, song_length FROM {first_table_name} WHERE sessionid = 338 and iteminsession = 4;"

rows = executeSafe(lambda : session.execute(first_query))
for item in rows:
    print(f"{item.artist}, {item.song_title}, {item.song_length}")

Faithless, Music Matters (Mark Knight Dub), 495.30731201171875


### 2.2 Getting artist, song and user informations by `userid` and `sessionid` sorting by `iteminsession`

To be able to filter by `userid` and `sessionid` they have to be considered as the composite key. And, in this case, we want to sort data by `iteminsession`, then this must be the clustering key.

#### Modeling 'song_user_info'

In [31]:
second_table_name = 'song_user_info'
create_table_query2 = f"CREATE TABLE {second_table_name} (userid int, iteminsession int, sessionid int, artist text, song_title text, user_first_name text, user_last_name text, PRIMARY KEY((userid, sessionid), iteminsession))"
execute_query_without_return(session, create_table_query2)
view_schema(cluster, second_table_name)


############## SCHEMA ##############
CREATE TABLE anobrega.song_user_info (
    userid int,
    sessionid int,
    iteminsession int,
    artist text,
    song_title text,
    user_first_name text,
    user_last_name text,
    PRIMARY KEY ((userid, sessionid), iteminsession)
) WITH CLUSTERING ORDER BY (iteminsession ASC)
    AND additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_ret

#### Mapping and inserting data into the new table

In [32]:
insert_query2 = f"INSERT INTO {second_table_name} (artist, song_title, userid, user_first_name, user_last_name, sessionid, iteminsession) "
insert_query2 = insert_query2 + " VALUES (%s, %s, %s, %s, %s, %s, %s);"
mapper_table2 = {0: str, 9: str, 10: int, 1: str, 4: str, 8: int, 3: int}
insert_rows_from_file(session, insert_query2, mapper_table2)

validate_insert(session, second_table_name)

############## INSERTED LINES ##############
Number of lines in song_user_info: 6820
############################################


### 2.2.1 In this second analyse we want the `artist`, `song_title`, `user first and last name` that matches by `userid` = 10 and `sessionid` = 182

In [33]:
second_query = f"SELECT artist, song_title, user_first_name, user_last_name FROM {second_table_name} WHERE userid = 10 AND sessionid = 182"

rows = executeSafe(lambda: session.execute(second_query))

for item in rows:
    print(f"{item.artist}, {item.song_title}, {item.user_first_name} {item.user_last_name}")


Down To The Bone, Keep On Keepin' On, Sylvie Cruz
Three Drives, Greece 2000, Sylvie Cruz
Sebastien Tellier, Kilometer, Sylvie Cruz
Lonnie Gordon, Catch You Baby (Steve Pitron & Max Sanna Radio Edit), Sylvie Cruz


### 2.3 Getting `song title`, `user first and last name` who listened determined song

In this case we need to filter data by the `song title`, so it must be our PK. But to avoid that some data be supressed we also need a unique information to make a composite key, `userid` will serve for this, since we want the users information too.

#### Modeling 'user_song_listen_history' for retrieve this information

In [34]:
third_table_name = 'user_song_listen_history'
create_table_query3 = f"CREATE TABLE {third_table_name} (song_title text, userid int, user_first_name text, user_last_name text, PRIMARY KEY((song_title), userid));"
execute_query_without_return(session, create_table_query3)

view_schema(cluster, third_table_name)

############## SCHEMA ##############
CREATE TABLE anobrega.user_song_listen_history (
    song_title text,
    userid int,
    user_first_name text,
    user_last_name text,
    PRIMARY KEY (song_title, userid)
) WITH CLUSTERING ORDER BY (userid ASC)
    AND additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';
####################################


#### Inserting data from csv file for user_song_info

In [35]:
insert_query3 = f"INSERT INTO {third_table_name} (song_title, userid, user_first_name, user_last_name) "
insert_query3 = insert_query3 + " VALUES (%s, %s, %s, %s);"
mapper_table3 = {9: str, 10: int, 1: str, 4: str}
insert_rows_from_file(session, insert_query3, mapper_table3)

validate_insert(session, third_table_name)


############## INSERTED LINES ##############
Number of lines in user_song_listen_history: 6618
############################################


### 2.3.1 We are going to list all users that listened to music 'All Hands Against His Own'

In [36]:
third_query = f"SELECT user_first_name, user_last_name FROM {third_table_name} WHERE song_title = 'All Hands Against His Own'"

rows = executeSafe(lambda : session.execute(third_query))

for item in rows:
    print(f"{item.user_first_name} {item.user_last_name}")


Jacqueline Lynch
Tegan Levine
Sara Johnson


## Dropping tables

In [37]:
for table_name in [first_table_name, second_table_name, third_table_name]:
    execute_query_without_return(session, f"DROP TABLE IF EXISTS {table_name};")

## Shutting down session and cluster

In [38]:
session.shutdown()
cluster.shutdown()