# Project: Data Modeling with Cassandra

## Context

A startup called Sparkify wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. The analysis team is particularly interested in understanding what songs users are listening to. Currently, there is no easy way to query the data to generate the results, since the data reside in a directory of CSV files on user activity on the app.

They'd like a data engineer to create an Apache Cassandra database which can create queries on song play data to answer the questions, and wish to bring you on the project. Your role is to create a database for this analysis. You'll be able to test your database by running queries given to you by the analytics team from Sparkify to create the results.

# Part I. ETL Pipeline for Pre-Processing the Files

In this first part, we'll focus on ingesting the CSV files that reside inside the `event_data` folder, and create a single "compiled" file that we'll use to load the data into the Cassandra database.

#### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
print(os.getcwd())

filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    # ignore notebook temp files
    if ('.ipynb_checkpoints' in root):
        continue
    
    file_path_list = glob.glob(os.path.join(root,'*.csv'))

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
full_data_rows_list = [] 

for f in file_path_list:
    print(f"Processing file {f}")
    
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            full_data_rows_list.append(line) 

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

Processing file /home/workspace/event_data/2018-11-27-events.csv
Processing file /home/workspace/event_data/2018-11-04-events.csv
Processing file /home/workspace/event_data/2018-11-07-events.csv
Processing file /home/workspace/event_data/2018-11-09-events.csv
Processing file /home/workspace/event_data/2018-11-19-events.csv
Processing file /home/workspace/event_data/2018-11-05-events.csv
Processing file /home/workspace/event_data/2018-11-22-events.csv
Processing file /home/workspace/event_data/2018-11-16-events.csv
Processing file /home/workspace/event_data/2018-11-26-events.csv
Processing file /home/workspace/event_data/2018-11-24-events.csv
Processing file /home/workspace/event_data/2018-11-29-events.csv
Processing file /home/workspace/event_data/2018-11-15-events.csv
Processing file /home/workspace/event_data/2018-11-20-events.csv
Processing file /home/workspace/event_data/2018-11-06-events.csv
Processing file /home/workspace/event_data/2018-11-18-events.csv
Processing file /home/wor

In [4]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print("There are %d rows in the resulting CSV" % sum(1 for line in f))

There are 6821 rows in the resulting CSV


# Part II. Data Modeling

For the next part, we will work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory, that was generated in Part I.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [5]:
from cassandra.cluster import Cluster

try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [6]:
try:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS sparkify
        WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}
    """)
except Exception as e:
    print(e)



#### Set Keyspace

In [7]:
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print(e)


## Questions made by the Sparkify team


### Question 1: 
#### Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

It is required by the team to return the name of the artist, the title and length of the song associated to the itemInSession "4", during the sessionId "338". To answer the question, we have to approach data from a session's perspective.

First we should understand the data involved in the requirement:
1. `artist` (varchar): Name of the artist
2. `song` (varchar): Title of the song
3. `length` (decimal): Length of the song in seconds, including fractions of a second
4. `sessionId` (int): Id of the session during which the song was played
5. `itemInSession` (int): Position of the song in the session's history of songs played

_Please remember that we are refering to the column headers in the CSV files._

Since we are talking about the songs **played** during a **session**, we can name the table `session_plays`.

Then, we must define the attributes (columns) that will be using to identify each row and partition the data. In this case, the search will be done by `sessionId` and `itemInSession`, so these will conform the primary key for the table.


If we translate this requirement to a SQL instruction, we have the following query:

`SELECT artist, song, length FROM session_plays WHERE sessionid = 338 and iteminsession = 4`




In [8]:
query = "CREATE TABLE IF NOT EXISTS session_plays \
            (sessionid int, iteminsession int, artist varchar, song varchar, length decimal, \
            PRIMARY KEY ((sessionid, iteminsession)))"
try:
    session.execute(query)
except Exception as e:
    print(e)
                    

In [9]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.DictReader(f)

    for line in csvreader:
        query = "INSERT INTO session_plays (sessionid, iteminsession, artist, song, length)"
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line["sessionId"]), int(line["itemInSession"]), line["artist"], line["song"], float(line["length"])))

In [10]:
query = "SELECT artist, song, length FROM session_plays WHERE sessionid = 338 and iteminsession = 4"

try:
    rows = session.execute(query)
    for row in rows:
        print ("Artist: %s\nSong: %s\nLength: %f seconds (%.2f minutes)" % (row.artist, row.song, row.length, row.length/60))
except Exception as e:
    print(e)

Artist: Faithless
Song: Music Matters (Mark Knight Dub)
Length: 495.307300 seconds (8.26 minutes)


#### Answer to Question 1:

The song that was played during session id "338", associated with itemInSession "4", was "Music Matters (Mark Knight Dub)" by Faithless with a duration of 8.26 minutes.

### Question 2: 
#### Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

We are asked to return a list of artist names and song titles ordered by `itemInSession`, along with the user's full name, for a specific user's session. In this case, we are going to focus on the **user's activity during a session**, thus, assigning the name `user_activity` to the table that holds the data to answer the question.

For this requirement we'll use the following columns:
1. userid (int): User's id
2. sessionid (int): Session's id
3. itemInSession (int): Position of the song in the session's history of songs played
4. artist (varchar): Artist's name
5. song (varchar): Song's title
6. firstName (varchar): Firstname of the user associated with the session
7. lastName (varchar): Lastname of the user associated with the session

To partition the data and identify rows, we'll be using `userId` and `sessionId` as a compound primary key, since those are the attributes used to filter the data. To order the data in the partition, we'll be using `itemInSession` as clustering key.

The SQL query that we'll be using for this is the following:

```
SELECT artist, song, firstname, lastname
FROM user_activity WHERE userid = 10 and sessionid = 182
```

In [11]:
query = "CREATE TABLE IF NOT EXISTS user_activity \
            (userid int, sessionid int, iteminsession int, artist varchar, song varchar, firstname varchar, lastname varchar, \
            PRIMARY KEY ((userid, sessionid), iteminsession))"
try:
    session.execute(query)
except Exception as e:
    print(e)           

In [12]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.DictReader(f)

    for line in csvreader:
        query = "INSERT INTO user_activity (userid, sessionid, iteminsession, artist, song, firstname, lastname)"
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line["userId"]), int(line["sessionId"]), int(line["itemInSession"]), line["artist"], line["song"], line["firstName"], line["lastName"]))

In [13]:
query = "SELECT artist, song, firstname, lastname \
            FROM user_activity WHERE userid = 10 and sessionid = 182"

try:
    rows = session.execute(query)
    for row in rows:
        print ("Artist: %s | Song: %s | User: %s %s" % (row.artist, row.song, row.firstname, row.lastname,))
except Exception as e:
    print(e)          

Artist: Down To The Bone | Song: Keep On Keepin' On | User: Sylvie Cruz
Artist: Three Drives | Song: Greece 2000 | User: Sylvie Cruz
Artist: Sebastien Tellier | Song: Kilometer | User: Sylvie Cruz
Artist: Lonnie Gordon | Song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit) | User: Sylvie Cruz


#### Answer to Question 2:

The user Sylvie Cruz (id: 10) played the following songs during session id "182":
- "Keep On Keepin' On" by Down To The Bone
- "Greece 2000" by Three Drives
- "Kilometer" by Sebastien Tellier
- "Catch You Baby (Steve Pitron & Max Sanna Radio Edit)" by Lonnie Gordon

### Question 3: 
#### Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

Finally, we were required to list all the users that listened to the song "All Hands Against His Own". This will required that we model after the relation between songs and users that listened to them. For this, the table will be called `song_plays` and we'll be using the following columns:

1. song (varchar): Song title
2. userId (int): User's id
3. firstName (varchar): User's firstname
4. lastName (varchar): User's lastname

Since the we'll be searching using the title of the song, the primary key for the table will be `song`. Additionally, we will include `userId` as clustering key to identify each user that listened to the song (otherwise every song would have just 1 user due to Cassandra's insert/update behaviour on primary keys).

Translating this requirement into a SQL query results in the following: 

`SELECT firstname, lastname FROM song_plays WHERE song = 'All Hands Against His Own'`


In [14]:
query = "CREATE TABLE IF NOT EXISTS song_plays \
            (song varchar, userid int, firstname varchar, lastname varchar, \
            PRIMARY KEY((song), userid))"

try:
    session.execute(query)
except Exception as e:
    print(e)

In [15]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.DictReader(f)

    for line in csvreader:
        query = "INSERT INTO song_plays (song, userid, firstname, lastname)"
        query = query + "VALUES (%s, %s, %s, %s)"
        session.execute(query, (line["song"], int(line["userId"]), line["firstName"], line["lastName"]))

In [16]:
query = "SELECT firstname, lastname FROM song_plays WHERE song = 'All Hands Against His Own'"

try:
    rows = session.execute(query)
    for row in rows:
        print ("User: %s %s" % (row.firstname, row.lastname,))
except Exception as e:
    print(e)

User: Jacqueline Lynch
User: Tegan Levine
User: Sara Johnson


#### Answer to Question 3

The song "All Hands Against His Own" was listened by Jacqueline Lynch, Tegan Levine and Sara Johnson

### Drop the tables before closing out the sessions

In [17]:
query = "DROP TABLE session_plays"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [18]:
query = "DROP TABLE user_activity"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [19]:
query = "DROP TABLE song_plays"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [20]:
session.shutdown()
cluster.shutdown()