# Project: Data Modeling with Cassandra

**Author:** Dahi Nemutlu\
**Date:** April 14 2024

## Table of Contents
<ul>
<li><a href="#introduction">Introduction</a></li>
<li><a href="#pre-processing">ETL Pipeline for Pre-Processing the Files</a></li>
<li><a href="#cassandra">Modeling the Data with Apache Cassandra</a></li>
   <ul>
      <li><a href="#query1">Modeling the Table for the 1st Query</a></li>
      <li><a href="#query2">Modeling the Table for the 2nd Query</a></li>
      <li><a href="#query3">Modeling the Table for the 3rd Query</a></li>
   </ul>
</ul>

<a id='introduction'></a>
## Introduction
A startup called Sparkify wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. The analysis team is particularly interested in understanding what songs users are listening to. The data reside in a directory of CSV files (event_data).

The purpose of the project is to create an Apache Cassandra database which can create queries on song play data to answer the questions given to by the analytics team. This project aims to create an ETL pipeline that transfers data from a set of CSV files within a directory to create a streamlined CSV file to model and insert data into Apache Cassandra tables.

#### Import Python packages

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
from cassandra.cluster import Cluster
import os

<a id='pre-processing'></a>
## ETL Pipeline for Pre-Processing the Files

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [2]:
# define the file path and file extension for the input files
path = './event_data/'
extension = '.csv'

# create a list of CSV file names
files = [file for file in os.listdir(path) if file.endswith(extension)]

# import the CSV files into Pandas
dfs = []
for file in files:
    df = pd.read_csv(os.path.join(path, file))
    # append each DataFrame to a list to concatenate them later
    # exclude the rows where 'artist' column is null
    dfs.append(df[~df['artist'].isnull()])

# concatenate the DataFrames into one
df = pd.concat(dfs, ignore_index=True)

# drop unnecessary columns
df.drop(['auth', 'method', 'page', 'registration', 'status', 'ts'], axis=1, inplace=True)

# write the DataFrame to a CSV file
df.to_csv('event_datafile_new.csv', index=False)

<a id='cassandra'></a>
## Modeling the Data with Apache Cassandra

Now we are ready to work with the CSV file titled `event_datafile_new.csv`, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

#### Creating a Cluster

In [3]:
# create a connection to the database
try: 
    cluster = Cluster(['localhost']) # if you have a locally installed Apache Cassandra instance
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [4]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)
except Exception as e:
    print(e)

#### Set Keyspace

In [5]:
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

Now we need to create tables to run the following queries. (With Apache Cassandra we model the database tables on the queries we want to run.)
1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4
2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

<a id='query1'></a>
### Modeling the Table for the 1st Query
#### Query
Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

#### Model
To efficiently query the data for a specific `session_id` and `item_in_session`, as required by the given query;
- `session_id` and `item_in_session` are used as the composite primary key.
  - `session_id` is the partition key, ensuring data for each session is stored together.
  - `item_in_session` is used as a clustering column, allowing efficient sorting of data within each session.

<img src="images/table1.png">

#### Create the Cassandra Table

In [6]:
query = "CREATE TABLE IF NOT EXISTS udacity.session_song_history "
query = query + "(session_id int, item_in_session int, artist text, song text, length float, PRIMARY KEY (session_id, item_in_session))"
try:
    session.execute(query)
except Exception as e:
    print(e)

#### Insert Data into Table

In [7]:
query = "INSERT INTO udacity.session_song_history(session_id, item_in_session, artist, song, length) VALUES (?, ?, ?, ?, ?)"
prepared = session.prepare(query)

for index, row in df.iterrows(): 
    session.execute(prepared, (row.sessionId, row.itemInSession, row.artist, row.song, row.length))

#### Verify that the data have been inserted into the table

In [8]:
query = "SELECT COUNT(*) FROM udacity.session_song_history;"
try:
    row_count = session.execute(query)
except Exception as e:
    print(e)

print ('Number of records in DataFrame:', len(df))
print ('Number of records in Cassandra table:', row_count.one().count)

Number of records in DataFrame: 6820
Number of records in Cassandra table: 6820


#### Execute the 1st query

In [9]:
# Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
query = "select artist, song, length from udacity.session_song_history where session_id = 338 and item_in_session = 4;"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song, row.length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


<a id='query2'></a>
### Modeling the Table for the 2nd Query
#### Query
Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

#### Model

Given the query's requirements, we need to be able to filter by both `user_id` and `session_id`, and sort the results by `item_in_session`. Therefore;
- `user_id` and `session_id` will be used as partition keys, ensuring data for each user session is stored together.
- `item_in_session` is used as a clustering column to sort the results within each partition according to the order of items in the session.

<img src="images/table2.png">

#### Create the Cassandra Table

In [10]:
query = "CREATE TABLE IF NOT EXISTS udacity.user_session_song_history "
query = query + "(user_id float, session_id int, item_in_session int, artist text, song text, first_name text, last_name text, PRIMARY KEY ((user_id, session_id), item_in_session))"
try:
    session.execute(query)
except Exception as e:
    print(e)

#### Insert Data into Table

In [11]:
query = "INSERT INTO udacity.user_session_song_history(user_id, session_id, item_in_session, artist, song, first_name, last_name) VALUES (?, ?, ?, ?, ?, ?, ?)"
prepared = session.prepare(query)

for index, row in df.iterrows(): 
    session.execute(prepared, (row.userId, row.sessionId, row.itemInSession, row.artist, row.song, row.firstName, row.lastName))

#### Verify that the data have been inserted into the table

In [12]:
query = "SELECT COUNT(*) FROM udacity.user_session_song_history;"
try:
    row_count = session.execute(query)
except Exception as e:
    print(e)

print ('Number of records in DataFrame:', len(df))
print ('Number of records in Cassandra table:', row_count.one().count)

Number of records in DataFrame: 6820
Number of records in Cassandra table: 6820


#### Execute the 2nd query

In [13]:
# Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
query = "select artist, song, first_name, last_name from udacity.user_session_song_history where user_id = 10 and session_id = 182;"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song, row.first_name, row.last_name)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


<a id='query3'></a>
### Modeling the Table for the 3rd Query
#### Query
Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### Model
Since the query involves searching for users who listened to a specific song, let's use `song` as the partition key. Additionally, we need to include `user_id` as a clustering column to ensure each user's name is unique.

- `song` is used as the partition key to ensure that all data for a particular song is stored together.
- `user_id` is used as a clustering column to ensure each user's name is unique within a song's partition.

Therefore, we can efficiently query the data to retrieve the first and last names of all users who listened to a specific song.

<img src="images/table3.png">

#### Create the Cassandra Table

In [14]:
query = "CREATE TABLE IF NOT EXISTS udacity.user_song_history "
query = query + "(song text, user_id float, first_name text, last_name text, PRIMARY KEY (song, user_id))"
try:
    session.execute(query)
except Exception as e:
    print(e)

#### Insert Data into Table

In [15]:
query = "INSERT INTO udacity.user_song_history(song, user_id, first_name, last_name) VALUES (?, ?, ?, ?)"
prepared = session.prepare(query)

for index, row in df.iterrows(): 
    session.execute(prepared, (row.song, row.userId, row.firstName, row.lastName))

#### Verify that the data have been inserted into the table

In [16]:
query = "SELECT COUNT(*) FROM udacity.user_song_history;"
try:
    row_count = session.execute(query)
except Exception as e:
    print(e)

print ('Number of records in DataFrame:', len(df))
print ('Number of records in Cassandra table:', row_count.one().count)

Number of records in DataFrame: 6820
Number of records in Cassandra table: 6618


#### Execute the 3rd query

In [17]:
# Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
query = "select first_name, last_name from udacity.user_song_history where song = 'All Hands Against His Own';"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.first_name, row.last_name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


#### Drop the tables before closing out the sessions

In [18]:
query = "drop table udacity.session_song_history"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table udacity.user_session_song_history"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "drop table udacity.user_song_history"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

#### Drop the keyspace

In [19]:
query = "drop keyspace udacity;"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

#### Close the session and cluster connection

In [20]:
session.shutdown()
cluster.shutdown()