# Data Modeling with Apache Cassandra

The following notebook is an adaptation of the initial template that was supplied.  Deviations from the initial notebook will be noted below.

In [1]:
import main
import cql
import config
import os
import glob
import pandas as pd
import re
import matplotlib.pyplot as plt
from utils import convert_list_to_string

from IPython.core.interactiveshell import InteractiveShell
from IPython.display import display, Markdown
InteractiveShell.ast_node_interactivity = "all"
%matplotlib inline

## Part 1 - Pre-processing

In the initial notebook, pre-processing was done over several steps:

1. Creating a list of files from a filepath
2. Iterating over each file and appending each row to a list
3. Create larger csv file from created list

The new adaptation does the following:

1. Construct a file directory where files reside
2. Create list of files from the file directory
3. Construct a pandas dataframe from the list of files

The code below (without docstrings) is taken out of `main.py` and shows the steps above.

In [2]:
def get_file_directory(filepath):
    cwd = os.getcwd()
    cwd += filepath
    return cwd

def get_files(file_directory, format="csv"):
    return glob.glob(file_directory + f"/*.{format}")


def construct_dataframe_from_files(files):
    df = pd.concat((pd.read_csv(f) for f in files))
    df.dropna(subset=['artist'], inplace=True)
    df[['itemInSession', 'sessionId', 'userId']] = \
        df[['itemInSession', 'sessionId', 'userId']].apply(
            pd.to_numeric, downcast='integer')
    return df

The code above is utilized inside a `run` function.  The `run` function is passed a filepath from an argument from the command line or, if nothing is provided, it will default to `/event_data`.  The first part of the function is as follows:

In [3]:
file_directory = get_file_directory('/event_data')
files = get_files(file_directory)
dataframe = construct_dataframe_from_files(files)

In [4]:
display(Markdown("There are **{}** rows in the dataframe".format(len(dataframe))))
dataframe.head(5)

There are **6820** rows in the dataframe

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
0,A Fine Frenzy,Logged In,Anabelle,F,0,Simpson,267.91138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541040000000.0,256,Almost Lover (Album Version),200,1541380000000.0,69
1,Nirvana,Logged In,Aleena,F,0,Kirby,214.77832,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,Serve The Servants,200,1541380000000.0,44
2,Television,Logged In,Aleena,F,1,Kirby,238.49751,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,See No Evil (Remastered LP Version),200,1541380000000.0,44
3,JOHN COLTRANE,Logged In,Aleena,F,2,Kirby,346.43546,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,Blues To Bechet (LP Version),200,1541380000000.0,44
4,NOFX,Logged In,Aleena,F,3,Kirby,80.79628,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,It's My Job To Keep Punk Rock Elite,200,1541380000000.0,44


Part of the project's requirements is to create an event_data_new.csv file.  I won't use this for my ETL process, but the code below will create the file

In [5]:
dataframe.to_csv('event_datafile_new.csv', index=False)

## Part 2 - Apache Cassandra

The second part of the project is to create tables and insert data into an Apache Cassandra Cluster.  In an effort to reduce code duplication, I created two classes:  `Cassandra` and `ETL`.  Cassandra functions as my connection to the Cluster and is the base object in the ETL class.  ETL functions as my means to execute queries (CREATE and INSERT).  ETL relies on a dictionary, which can be found in the `config.py` file, as well as a dataframe, which serves as the data to be inserted into the database.

In [6]:
# Initialize ETL class
etl = cql.ETL(config.sparkify_dictionary, dataframe)

# Inspect Class
etl.cluster
etl.session
etl.dictionary
etl.dataframe.head(5)

<cassandra.cluster.Cluster at 0x7fdf68511940>

<cassandra.cluster.Session at 0x7fdf684d2860>

{'item': {'table': 'item_detail',
  'columns': ['sessionId', 'itemInSession', 'artist', 'song', 'length'],
  'column_types': ['int', 'int', 'text', 'text', 'float'],
  'primary_key': ['sessionId', 'itemInSession'],
  'select': {'columns': ['artist', 'song', 'length'],
   'where': ['sessionId = 338', 'itemInSession = 4'],
   'description': 'This query aims to get the detail of a song for                 one specific item in a session'}},
 'session': {'table': 'session_detail',
  'columns': ['sessionId',
   'itemInSession',
   'artist',
   'song',
   'userId',
   'firstName',
   'lastName'],
  'column_types': ['int', 'int', 'text', 'text', 'int', 'text', 'text'],
  'primary_key': [('userId', 'sessionId'), 'itemInSession'],
  'select': {'columns': ['artist', 'song', 'firstname', 'lastname'],
   'where': ['userId = 10', 'sessionId = 182'],
   'description': 'This query aims to find all songs a user listened                 to in a particular session'}},
 'song': {'table': 'song_detail',
  

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
0,A Fine Frenzy,Logged In,Anabelle,F,0,Simpson,267.91138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541040000000.0,256,Almost Lover (Album Version),200,1541380000000.0,69
1,Nirvana,Logged In,Aleena,F,0,Kirby,214.77832,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,Serve The Servants,200,1541380000000.0,44
2,Television,Logged In,Aleena,F,1,Kirby,238.49751,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,See No Evil (Remastered LP Version),200,1541380000000.0,44
3,JOHN COLTRANE,Logged In,Aleena,F,2,Kirby,346.43546,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,Blues To Bechet (LP Version),200,1541380000000.0,44
4,NOFX,Logged In,Aleena,F,3,Kirby,80.79628,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,It's My Job To Keep Punk Rock Elite,200,1541380000000.0,44


The ETL class also has attributes that are accessed when given a key of the dictionary.

In [7]:
key = list(etl.dictionary.keys())[0]
f"Columns for {key}"
etl._columns(key)
etl._column_types(key)
etl._column_placeholders(key)
etl._primary_key(key)
etl._create_statement(key)
etl._insert_statement(key)

'Columns for item'

['sessionId', 'itemInSession', 'artist', 'song', 'length']

['int', 'int', 'text', 'text', 'float']

'%s, %s, %s, %s, %s'

'sessionId, itemInSession'

'\n            CREATE TABLE IF NOT EXISTS item_detail\n            (sessionId int, itemInSession int, artist text, song text, length float,\n            PRIMARY KEY (sessionId, itemInSession))\n        '

'\n            INSERT INTO item_detail (sessionId, itemInSession, artist, song, length)\n            VALUES (%s, %s, %s, %s, %s)\n        '

The main access point into the `ETL` class is through the `run` method.  This method will take care of creating the table (if it doesn't exist) and then inserting the appropriate data.

### Important Caveat
The columns specified in the dictionary need to be the **EXACT SAME** as what's in the supplied dataframe

In [8]:
etl.dictionary['song']

{'table': 'song_detail',
 'columns': ['song', 'userId', 'firstName', 'lastName'],
 'column_types': ['text', 'int', 'text', 'text'],
 'primary_key': ['song', 'userId'],
 'select': {'columns': ['firstname', 'lastname'],
  'where': ["song = 'All Hands Against His Own'"],
  'description': 'This query aims to find all users who have                 listened to a specific song'}}

In [9]:
etl.dataframe.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
0,A Fine Frenzy,Logged In,Anabelle,F,0,Simpson,267.91138,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541040000000.0,256,Almost Lover (Album Version),200,1541380000000.0,69
1,Nirvana,Logged In,Aleena,F,0,Kirby,214.77832,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,Serve The Servants,200,1541380000000.0,44
2,Television,Logged In,Aleena,F,1,Kirby,238.49751,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,See No Evil (Remastered LP Version),200,1541380000000.0,44
3,JOHN COLTRANE,Logged In,Aleena,F,2,Kirby,346.43546,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,Blues To Bechet (LP Version),200,1541380000000.0,44
4,NOFX,Logged In,Aleena,F,3,Kirby,80.79628,paid,"Waterloo-Cedar Falls, IA",PUT,NextSong,1541020000000.0,237,It's My Job To Keep Punk Rock Elite,200,1541380000000.0,44


In [10]:
for key in etl.dictionary.keys():
    etl.run(key)
    print(f"Data for {etl._table(key)} inserted")

Data for item_detail inserted
Data for session_detail inserted
Data for song_detail inserted


In [11]:
display(Markdown("---"))
for key in etl.dictionary.keys():
    table = etl.dictionary[key]['table']
    columns = etl.dictionary[key]['select']['columns']
    columns_string = convert_list_to_string(columns, ", ")
    where = convert_list_to_string(etl.dictionary[key]['select']['where'], " AND ")
    select = f"SELECT {columns_string}\nFROM {table}\nWHERE {where}"
    rows = etl.session.execute(select)
    ls = []
    for row in rows:
        ls.append([getattr(row, column) for column in columns])
    display(Markdown(f'#### SELECT statement for {table}'))
    display(Markdown(f'`{select}`'))
    display(Markdown(f'#### Query Description for {table}'))
    display(Markdown(etl.dictionary[key]['select']['description']))
    display(Markdown(f'#### Data for {table}'))
    pd.DataFrame(ls, columns=columns)
    display(Markdown("---"))

---

#### SELECT statement for item_detail

`SELECT artist, song, length
FROM item_detail
WHERE sessionId = 338 AND itemInSession = 4`

#### Query Description for item_detail

This query aims to get the detail of a song for                 one specific item in a session

#### Data for item_detail

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


---

#### SELECT statement for session_detail

`SELECT artist, song, firstname, lastname
FROM session_detail
WHERE userId = 10 AND sessionId = 182`

#### Query Description for session_detail

This query aims to find all songs a user listened                 to in a particular session

#### Data for session_detail

Unnamed: 0,artist,song,firstname,lastname
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


---

#### SELECT statement for song_detail

`SELECT firstname, lastname
FROM song_detail
WHERE song = 'All Hands Against His Own'`

#### Query Description for song_detail

This query aims to find all users who have                 listened to a specific song

#### Data for song_detail

Unnamed: 0,firstname,lastname
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


---

In [12]:
# Drop tables before closing out session
tables = [etl.dictionary[key]['table'] for key in etl.dictionary.keys()]

for table in tables:
    try:
        query = f'drop table if exists {table}'
        etl.session.execute(query)
    except Exception as e:
        print(e)

<cassandra.cluster.ResultSet at 0x7fdf72ef9278>

<cassandra.cluster.ResultSet at 0x7fdf685a2e80>

<cassandra.cluster.ResultSet at 0x7fdf67348c50>

In [13]:
# Shutdown session and cluster
etl.session.shutdown()
etl.cluster.shutdown()