# Data Modeling with Apache Cassandra

## <font color=blue>Part I. ETL Pipeline for Pre-Processing the Files</font>

In [1]:
# Import Python packages 
import re
import os
import glob
import json
import csv

In [2]:
import numpy as np
import pandas as pd

In [3]:
import cassandra
from cassandra.cluster import Cluster

In [4]:
from helper import *

#### Creating list of filepaths to process original event csv data files

In [5]:
# Print the current working directory
os.getcwd()

'/home/workspace'

In [6]:
os.path.join(os.getcwd(), '/event_data',)

'/event_data'

In [7]:
# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'
filepath

'/home/workspace/event_data'

In [8]:
# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    # join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root, '*'))

In [9]:
len(file_path_list)

30

In [10]:
# Print sample path to the csv files.
file_path_list[:3]

['/home/workspace/event_data/2018-11-27-events.csv',
 '/home/workspace/event_data/2018-11-04-events.csv',
 '/home/workspace/event_data/2018-11-07-events.csv']

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [11]:
# Initiate an empty list of rows that will be generated from each file.
full_data_rows_list = [] 
    
for fp in file_path_list:
    # Read csv file.
    with open(fp, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        # Extract each data row one by one and append it.      
        for line in csvreader:
            full_data_rows_list.append(line) 

In [12]:
# Get total number of rows 
len(full_data_rows_list)

8056

In [13]:
# Check what the list of event data look like
full_data_rows_list[:2]

[['Barry Tuckwell/Academy of St Martin-in-the-Fields/Sir Neville Marriner',
  'Logged In',
  'Mohammad',
  'M',
  '0',
  'Rodriguez',
  '277.15873',
  'paid',
  'Sacramento--Roseville--Arden-Arcade, CA',
  'PUT',
  'NextSong',
  '1.54051E+12',
  '961',
  'Horn Concerto No. 4 in E flat K495: II. Romance (Andante cantabile)',
  '200',
  '1.54328E+12',
  '88'],
 ['Jimi Hendrix',
  'Logged In',
  'Mohammad',
  'M',
  '1',
  'Rodriguez',
  '239.82975',
  'paid',
  'Sacramento--Roseville--Arden-Arcade, CA',
  'PUT',
  'NextSong',
  '1.54051E+12',
  '961',
  'Woodstock Inprovisation',
  '200',
  '1.54328E+12',
  '88']]

In [14]:
# Create a smaller event data csv file called event_datafile_full csv
##that will be used to insert data into the Apache Cassandra tables.
csv.register_dialect('myDialect', quoting= csv.QUOTE_ALL, skipinitialspace= True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect= 'myDialect')
    # Write the header row (column names) first.
    writer.writerow(['artist', 'firstName', 'gender', 'itemInSession', 'lastName',
                     'length', 'level', 'location', 'sessionId', 'song', 'userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6],
                         row[7], row[8], row[12], row[13], row[16]))

In [15]:
# # Check the number of rows in the generated csv file.
# with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
#     print(sum(1 for line in f))

In [16]:
# Preview the data.
df = pd.read_csv('event_datafile_new.csv')
df.shape

(6820, 11)

In [17]:
df.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Barry Tuckwell/Academy of St Martin-in-the-Fie...,Mohammad,M,0,Rodriguez,277.15873,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Horn Concerto No. 4 in E flat K495: II. Romanc...,88
1,Jimi Hendrix,Mohammad,M,1,Rodriguez,239.82975,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Woodstock Inprovisation,88
2,Building 429,Mohammad,M,2,Rodriguez,300.61669,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Majesty (LP Version),88
3,The B-52's,Gianna,F,0,Jones,321.54077,free,"New York-Newark-Jersey City, NY-NJ-PA",107,Love Shack,38
4,Die Mooskirchner,Gianna,F,1,Jones,169.29914,free,"New York-Newark-Jersey City, NY-NJ-PA",107,Frisch und g'sund,38


In [18]:
# Make sure the generated columns are in expected format.
# e.g., `length` is float ...
df.dtypes

artist            object
firstName         object
gender            object
itemInSession      int64
lastName          object
length           float64
level             object
location          object
sessionId          int64
song              object
userId             int64
dtype: object

## <font color=blue>Part II. Complete the Apache Cassandra coding portion of your project.</font>

Now we are ready to work with the CSV file generated under the title <font color=red>event_datafile_new.csv</font>.

#### Creating a Cluster

In [19]:
# Make a connection to a Cassandra instance on local machine (127.0.0.1)
cluster = Cluster()

# Establish connection and begin executing queries; need a session.
session = cluster.connect()

#### Create Keyspace

In [20]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [21]:
# Set KEYSPACE to the keyspace specified above.
def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

try:
    session.set_keyspace('udacity')
    session.row_factory = pandas_factory
except Exception as e:
    print(e)

Now we need to create tables to run the following queries. Remember, __with Apache Cassandra you model the database tables on the queries you want to run.__

#### 1. Return the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

In [24]:
# Distributed database design is all about queries.
# Especifically, the focus is on the WHERE clause.
# In the first query, first and foremost we want to filter by `session_id`
# this will become our partition key.
# Then, the filter is based on `itemInSession`, which will become our clustering key.
# Together, the partition key & the clustering key make the composite PRIMARY KEY.
query = ("CREATE TABLE IF NOT EXISTS sessions "
         "(session_id int, item_in_session int, artist text, song text, length float, "
         "PRIMARY KEY (session_id, item_in_session))")

drop_create_table(session, table_name= 'sessions', query= query)

>>> Dropped sessions with success.
>>> Table sessions created!


In [26]:
query = ("INSERT INTO sessions "
         "(session_id, item_in_session, artist, song, length) "
         "VALUES (%s, %s, %s, %s, %s);")
    
populate_table(session,
               file_path = 'event_datafile_new.csv',
               query= query,
               columns= (8, 3, 0, 9, 5))

In [29]:
# Fetch some query to make sure data is populated successfully.
query = ("SELECT artist, song, length FROM sessions "
         "WHERE session_id = 338 AND item_in_session = 4;")

try:
    rows = session.execute(query)
    df = rows._current_rows[['artist', 'song', 'length']]
    print('Query result: ' + '=' * 45)
    print(df)
except Exception as e:
    print(e)

      artist                             song      length
0  Faithless  Music Matters (Mark Knight Dub)  495.307312


#### 2. Return only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [61]:
## Here the Primary Key has three fields: userid and sessionid are the partition key, and itemInSession are clustering keys. 
## Partitioning is done by userid and sessionid, and within that partition, rows are ordered by the itemInSession.
query = ("CREATE TABLE IF NOT EXISTS users "
         "(user_id int, session_id int, item_in_session int, "
         "artist text, song text, first_name text, last_name text, "
         "PRIMARY KEY (user_id, session_id, item_in_session));"
        )

drop_create_table(session, table_name= 'users', query= query)

>>> Dropped users with success.
>>> Table users created!


In [None]:


try:
    session.execute(query)
except Exception as e:
    print(e)

#### 3. Return every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### Drop the tables before closing out the sessions

In [42]:
query = "DROP TABLE IF EXISTS sessions"
try:
    session.execute(query)
except Exception as e:
    print(e)

query = "DROP TABLE IF EXISTS users"
try:
    session.execute(query)
except Exception as e:
    print(e)

query = "DROP TABLE IF EXISTS songs"
try:
    session.execute(query)
except Exception as e:
    print(e)

#### Close the session and cluster connection¶

In [None]:
session.shutdown()
cluster.shutdown()