# Data Modeling with Apache Cassandra

## Introduction 
Data modeling with NoSQL database is discussed in this project. Specifically, I employ Apache Cassandra NoSQL database store.

This notebook creates an Apache Cassandra keyspace sparkifyks for the music app, Sparkify. The data is modeled on the following 3 queries:

- artist, song title and song's length for sessionId=338, and itemInSession=4

- artist, song (sorted by itemInSession) and user (firstName and lastName) for userid=10, sessionid=182

- every user name (firstName and lastName) for the song 'All Hands Against His Own'

## Data

For this project, I will be working with a dataset called 'event_data': a directory of CSV files partitioned by date.



### ETL Pipeline for Pre-Processing the Files

Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
from cassandra.cluster import Cluster

Creating list of filepaths to process original event csv data files

In [2]:
filepath = os.getcwd() + '/event_data'
for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

Processing the files to create a smaller event data csv file called `event_datafile_new.csv` that will be used to populate Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            full_data_rows_list.append(line) 
            

# creating event_datafile_full_new.csv 
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in the new csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


In [5]:
df = pd.read_csv('event_datafile_new.csv',encoding='utf8')
df.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Barry Tuckwell/Academy of St Martin-in-the-Fie...,Mohammad,M,0,Rodriguez,277.15873,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Horn Concerto No. 4 in E flat K495: II. Romanc...,88
1,Jimi Hendrix,Mohammad,M,1,Rodriguez,239.82975,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Woodstock Inprovisation,88
2,Building 429,Mohammad,M,2,Rodriguez,300.61669,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Majesty (LP Version),88
3,The B-52's,Gianna,F,0,Jones,321.54077,free,"New York-Newark-Jersey City, NY-NJ-PA",107,Love Shack,38
4,Die Mooskirchner,Gianna,F,1,Jones,169.29914,free,"New York-Newark-Jersey City, NY-NJ-PA",107,Frisch und g'sund,38


## Data Modeling with Apache Cassandra

We now have the CSV file titled <font color=red>event_datafile_new.csv</font>. It contains the following columns: 
- [0] artist 
- [1] firstName of user
- [2] gender of user
- [3] item number in session
- [4] last name of user
- [5] length of the song
- [6] level (paid or free song)
- [7] location of the user
- [8] sessionId
- [9] song title
- [10] userId

Our next task is to create model our database. Unlike with PostgreSQL, in Apache Cassandra, the database tables are modeled on the queries we want to run.

#### Creating a Cluster

In [6]:
# This makes a connection to a Cassandra instance on my local machine 
#(127.0.0.1)

from cassandra.cluster import Cluster
try:
    cluster = Cluster()

# To establish connection and begin executing queries, need a session
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

Keyspace in Apache Cassandra is similar to a schema in PostgreSQL

In [7]:
#Creating a Keyspace 
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkifydb 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [8]:
#Set KEYSPACE
try:
    session.set_keyspace('sparkifydb')
except Exception as e:
    print(e)


## Queries

There are three business request from the analytical team at sparkify we need to create tables for. We will

- Create tables for each of the business
- Populate the table with data from our event_datafile.csv
- Finaly we will validate our database bu creating select statements.

### Query 1

Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

In [9]:
query = """CREATE TABLE IF NOT EXISTS music_app_history_artist_and_song(
            sessionId int,
            itemInSession int,
            artist text,
            song text,
            length float,
            PRIMARY KEY (sessionId, itemInSession))"""

try:
    session.execute(query)
except Exception as e:
    print(e)                 

In [10]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = """INSERT INTO music_app_history_artist_and_song 
                (sessionId, itemInSession, artist, song, length)"""
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        # Assign which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5]), ))
        

Verify that the data have been inserted into each table

In [11]:
query = """SELECT artist, song, length 
            FROM music_app_history_artist_and_song
            WHERE sessionId = 338 AND itemInSession = 4"""

try:
    result = session.execute(query)
except Exception as e:
    print(e) 

for row in result:
    print("artist: {} | song title: {} | length: {}".format(row.artist, row.song, row.length))

artist: Faithless | song title: Music Matters (Mark Knight Dub) | length: 495.30731201171875


Let's read the data into pandas [hint](https://stackoverflow.com/questions/41247345/python-read-cassandra-data-into-pandas)

In [12]:
def pandas_factory(col, rows):
    """Returns a pandas dataframe given data = rows, and column=col
    """
    return pd.DataFrame(rows, columns=col)

session.row_factory = pandas_factory

rslt = session.execute("""
    SELECT artist, song, length
    FROM music_app_history_artist_and_song
    WHERE sessionId=338 AND itemInSession=4;
""")

df = rslt._current_rows 
df

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


For sessionId=338 and itemInsession=4, the artist was 'Faithless', and the song was 'Music Matters (Mark Knight Dub)' with a length of 495.307312 seconds.

### Query 2

Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [13]:
query = """CREATE TABLE IF NOT EXISTS music_app_history_session(
        userId int, 
        sessionId int,
        itemInSession int, 
        artist text, 
        song text, 
        firstName text, 
        lastName text,
        PRIMARY KEY ((userId, sessionId), itemInSession))"""

try:
    session.execute(query)
except Exception as e:
    print(e)  

                    

In [14]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = """INSERT INTO music_app_history_session (userId, sessionId, 
                    itemInSession,artist, song, firstName, lastName)"""
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        # Assign which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

Verify that table have been populated

In [15]:
query = """SELECT userId, sessionId,itemInSession,artist, song, firstName, lastName 
            FROM music_app_history_session
            WHERE userId = 10 AND sessionId = 182"""


session.row_factory = pandas_factory

rslt = session.execute(query)
df = rslt._current_rows 
df

Unnamed: 0,userid,sessionid,iteminsession,artist,song,firstname,lastname
0,10,182,0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,10,182,1,Three Drives,Greece 2000,Sylvie,Cruz
2,10,182,2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,10,182,3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


In sessionId=182, userId=10 Sylvie Cruz listened to 4 songs: 
- 'Keep on Keepin' On' by 'Down To The Bone', 
- 'Greece 2000' by 'Three Drives', 
- 'Kilometer' by 'Sebastien Tellier', and 
- 'Catch You Baby (Steve Pitron & Max Sanna Radio Edit)' by 'Lonnie Gordon'.


### Query 3

Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

Create table called `music_app_history_user`

In [16]:
try:
    session.execute("""
        CREATE TABLE IF NOT EXISTS music_app_history_user (
            song TEXT,
            firstName TEXT,
            lastName TEXT,
            userId INT,
            PRIMARY KEY (song,userId));
    """)
except Exception as e:
    print(e)

Populate table with data from event_data.csv file

In [17]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        # Assign the INSERT statements into the `query` variable
        query = "INSERT INTO music_app_history_user (song, firstName, lastName, userId)"
        query = query + "VALUES (%s, %s, %s, %s)"
        # Assign which column element should be assigned for each column in the INSERT statement.
        session.execute(query, (line[9], line[1], line[4], int(line[10])))


Verify

In [18]:
# Add in the SELECT statement to verify the data was entered into the table

query = """SELECT firstName,
                    lastName
            FROM music_app_history_user
            WHERE song = 'All Hands Against His Own'"""

session.row_factory = pandas_factory

rslt = session.execute(query)
df = rslt._current_rows 
df

Unnamed: 0,firstname,lastname
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


Three users Jacqueline Lynch, Sara Johnson, and Tegan Levine respectively listened to the song titled 'All Hands Against His Own'.

### Drop the tables before closing out the sessions

In [19]:
session.execute("DROP TABLE music_app_history_artist_and_song")
session.execute("DROP TABLE music_app_history_session")
session.execute("DROP TABLE music_app_history_user")

<cassandra.cluster.ResultSet at 0x7fcc4780b2e8>

### Close the session and cluster connection¶

In [20]:
session.shutdown()
cluster.shutdown()