# Project: Data Modeling with Cassandra  
A startup called Sparkify wants to analyze the data they've been collecting on songs and user activity on their new music streaming app. The analysis team is particularly interested in understanding what songs users are listening to. Currently, there is no easy way to query the data to generate the results, since the data reside in a directory of CSV files on user activity on the app.

## Project Overview  
In this project, data models are created with Apache Cassandra and ETL pipeline will be created using Python :  
- keyspace and tables in Apache Cassandra are created to run queries.  
- an ETL pipeline will transfer data from a set of CSV files within a directory to create a streamlined CSV file to model and insert data into Apache Cassandra tables.

# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    # print(file_path_list)

/c/Users/sdelo/SynologyDrive/data_science/mooc/udacity/01_data_engineering/01_data_modeling/02_udacity_data_engineering_project_nosql


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
# print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
# print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist [0]
- firstName of user [1]
- gender of user [2]
- item number in session [3]
- last name of user [4]
- length of the song [5]
- level (paid or free song) [6]
- location of the user [7]
- sessionId [8]
- song title [9]
- userId [10]

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

In [5]:
# pandas dataframe will be used offer easier features to process data instead of parsing raw CSV file
# load denormalized event files to pandas dataframe df_events
# this denormalized pandas dataframe df_events will be use to insert data to Cassandra table

df_events = pd.read_csv('event_datafile_new.csv', encoding='utf8')

In [6]:
# view info and columns name of events data dataframe
df_events.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6820 entries, 0 to 6819
Data columns (total 11 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   artist         6820 non-null   object 
 1   firstName      6820 non-null   object 
 2   gender         6820 non-null   object 
 3   itemInSession  6820 non-null   int64  
 4   lastName       6820 non-null   object 
 5   length         6820 non-null   float64
 6   level          6820 non-null   object 
 7   location       6820 non-null   object 
 8   sessionId      6820 non-null   int64  
 9   song           6820 non-null   object 
 10  userId         6820 non-null   int64  
dtypes: float64(1), int64(3), object(7)
memory usage: 586.2+ KB


In [7]:
# view random sampling of events data to understand better the denormalized event dataset
df_events.sample(20)

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
2849,D.R.I.,Harper,M,16,Barrett,66.61179,paid,"New York-Newark-Jersey City, NY-NJ-PA",404,Stupid_ Stupid War (Dealing With It),42
4007,Miami Horror,Lily,F,37,Koch,386.5073,paid,"Chicago-Naperville-Elgin, IL-IN-WI",716,Sometimes (Hook N Sling Remix),15
1423,Sunny Day Real Estate,Ryan,M,2,Smith,198.97424,free,"San Jose-Sunnyvale-Santa Clara, CA",431,Red Elephant,26
4487,Rick James,Kevin,M,1,Arellano,263.60118,free,"Harrisburg-Carlisle, PA",714,Ghetto Life,66
5826,AFI,Wyatt,M,6,Scott,256.86159,free,"Eureka-Arcata-Fortuna, CA",922,The Interview,9
5802,Nirvana,Tegan,F,59,Levine,257.01832,paid,"Portland-South Portland, ME",992,Lithium,80
4094,The Killers,Aleena,F,31,Kirby,220.89098,paid,"Waterloo-Cedar Falls, IA",639,When You Were Young,44
5854,Metric,Avery,F,1,Martinez,171.25832,paid,"Atlanta-Sandy Springs-Roswell, GA",140,Gimme Sympathy,82
2511,Tony Bennett & k.d. lang,Kynnedi,F,0,Sanchez,188.26404,free,"Cedar Rapids, IA",334,That's My Home,89
6369,Reel Big Fish,Rylan,M,49,George,185.80853,paid,"Birmingham-Hoover, AL",983,She Has A Girlfriend Now,16


## ETL pipeline to create Cassandra Cluster, Keyspace, tables, queries

Ypu must run a local Cassandra cluster with Docker before running the cells below on a local computer :
``` Docker
docker-compose up -d
```

#### Creating a Cluster

In [8]:
# This will make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster


# To establish connection and begin executing queries, connect session to Cassandra local cluster (Docker)
try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [9]:
# Create a Keyspace called sparkify
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)


In [10]:
# Set KEYSPACE to the keyspace sparkify above
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print(e)


### Drop any existing tables before creating them

We need to answer 3 queries so there will be 3 data models. 3 tables will be created 

In [None]:
# use if necessary the following DROP TABLE queries to delete and reset existing tables before testing the ETL pipeline

drop_table_query_1 = "DROP TABLE IF EXISTS session_details;"
session.execute(drop_table_query_1)

drop_table_query_2 = "DROP TABLE IF EXISTS user_activity_in_session;"
session.execute(drop_table_query_2)

drop_table_query_3 = "DROP TABLE IF EXISTS songs_listeners;"
session.execute(drop_table_query_3)

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.


## Creating queries to ask the following three questions from the event data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




### Creating and testing Data Model for the following query
### 1. Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

In [11]:
## Data Model for Cassandra Database to ansnwer the query
#
# the query is looking for artist and song information for specific sessionId and itemInSession
# sessionId and itemInSession can be used as primary key
# here is the corresponding CREATE TABLE query -> create session_details table

create_table_query_1 = "CREATE TABLE IF NOT EXISTS session_details "
create_table_query_1 = create_table_query_1 + "(session_id int, item_in_session int, artist_name text, song_title text, song_length float, PRIMARY KEY (session_id, item_in_session))"
try:
    session.execute(create_table_query_1)
except Exception as e:
    print(e)
                    

In [12]:
# load event data pandas dataframe to cassandra table using PreparedStatements 
# here is the INSERT INTO query to load the data to Cassandra database to session_details table

insert_query_1 = "INSERT INTO session_details (session_id, item_in_session, artist_name, song_title, song_length)"
insert_query_1 = insert_query_1 + " VALUES (?, ?, ?, ?, ?)"

prepared_query_1 = session.prepare(insert_query_1)

for index, row in df_events.iterrows():
    session.execute(prepared_query_1, (row['sessionId'], row['itemInSession'], row['artist'],
                                        row['song'], row['length']))


In [13]:
## Here is the SELECT statement to verify the data was entered into the table
## Query 1:  Give me the artist, song title and song's length in the music app history that was heard during \
## sessionId = 338, and itemInSession = 4

sessionId = 338
itemInSession = 4

select_query_1 = "SELECT artist_name, song_title, song_length \
                    FROM session_details \
                    WHERE session_id = %s \
                    AND item_in_session = %s"

# the select query is executed and results are returned to a pandas dataframe

rows = session.execute(select_query_1, (sessionId, itemInSession))

# define a results dictionnary
query_results_1 = {'artist_name':[],
                    'song_title':[],
                    'song_length' :[]
                    }

# append the results row by row to the dictionnary
for row in rows:
    query_results_1['artist_name'].append(row.artist_name)     
    query_results_1['song_title'].append(row.song_title)
    query_results_1['song_length'].append(row.song_length)

# create a pandas dataframe from the query results
df_query_1 = pd.DataFrame(query_results_1)

# view query results dataframe
df_query_1

Unnamed: 0,artist_name,song_title,song_length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


### Creating and testing Data Model for the following query
### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [14]:
## Data Model for Cassandra Database to ansnwer the query
#
# the query is looking for specific userid and sessionid
# (userid, sessionid) can be used as primary key
# we will add itemInSession  as clustering Columns to sort the data 
# here is the corresponding CREATE TABLE query -> create user_activity_in_session table

create_table_query_2 = "CREATE TABLE IF NOT EXISTS user_activity_in_session "
create_table_query_2 = create_table_query_2 + "(user_id int, session_id int, item_in_session int, artist_name text, song_title text, user_firstname text, user_lastname text, PRIMARY KEY ((user_id, session_id), item_in_session))"
try:
    session.execute(create_table_query_2)
except Exception as e:
    print(e)

In [15]:
# load event data pandas dataframe to cassandra table using PreparedStatements 
# here is the INSERT INTO query to load the data to Cassandra database to user_activity_in_session table


insert_query_2 = "INSERT INTO user_activity_in_session (user_id, session_id, item_in_session, artist_name, song_title, user_firstname, user_lastname)"
insert_query_2 = insert_query_2 + " VALUES (?, ?, ?, ?, ?, ?, ?)"

prepared_query_2 = session.prepare(insert_query_2)

for index, row in df_events.iterrows():
    session.execute(prepared_query_2, (row['userId'], row['sessionId'], row['itemInSession'],
                                        row['artist'], row['song'], row['firstName'], row['lastName']))

In [16]:
## Here is the SELECT statement to verify the data was entered into the table
## Query 2:  Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) \
## for userid = 10, sessionid = 182

userId = 10
sessionId = 182

select_query_2 = "SELECT artist_name, song_title, user_firstname, user_lastname \
                    FROM user_activity_in_session \
                    WHERE user_id = %s \
                    AND session_id = %s"

# the select query is executed and results are returned to a pandas dataframe
rows = session.execute(select_query_2, (userId, sessionId))

# define a results dictionnary
query_results_2 = {'artist_name':[],
                'song_title' :[],
                'user_firstname' :[],
                'user_lastname' : []
                }

# append the results row by row to the dictionnary
for row in rows:   
    query_results_2['artist_name'].append(row.artist_name)
    query_results_2['song_title'].append(row.song_title)
    query_results_2['user_firstname'].append(row.user_firstname)
    query_results_2['user_lastname'].append(row.user_lastname)

# create a pandas dataframe from the query results
df_query_2 = pd.DataFrame(query_results_2)

# view query results dataframe and check that songs are sorted by item_in_session
df_query_2


Unnamed: 0,artist_name,song_title,user_firstname,user_lastname
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### Creating and testing Data Model for the following query
### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [17]:
## Data Model for Cassandra Database to ansnwer the query
#
# the query is looking for specific song and users listener
# song and userid can be used as primary key
# here is the corresponding CREATE TABLE query -> create songs_listeners table

create_table_query_3 = "CREATE TABLE IF NOT EXISTS songs_listeners "
create_table_query_3 = create_table_query_3 + "(song_title text, user_id int, user_firstname text, user_lastname text, PRIMARY KEY (song_title, user_id))"
try:
    session.execute(create_table_query_3)
except Exception as e:
    print(e)
                    

In [18]:
# load event data pandas dataframe to cassandra table using PreparedStatements 
# here is the INSERT INTO query to load the data to Cassandra database to songs_listeners table

insert_query_3 = "INSERT INTO songs_listeners (song_title, user_id, user_firstname, user_lastname)"
insert_query_3 = insert_query_3 + " VALUES ( ?, ?, ?, ?)"

prepared = session.prepare(insert_query_3)

for index, row in df_events.iterrows():
    session.execute(prepared, (row['song'], row['userId'], row['firstName'], row['lastName']))

In [19]:
## Here is the SELECT statement to verify the data was entered into the table
# Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

song = 'All Hands Against His Own'

select_query_3 = "SELECT user_id, user_firstname, user_lastname \
                    FROM songs_listeners \
                    WHERE song_title = %s"

# the select query is executed and results are returned to a pandas dataframe
rows = session.execute(select_query_3, (song,))

# define a results dictionnary
query_results_3 = {"user_firstname":[],
                "user_lastname" : []
            }

# append the results row by row to the dictionnary
for row in rows:  
    query_results_3['user_firstname'].append(row.user_firstname)
    query_results_3['user_lastname'].append(row.user_lastname)

# create a pandas dataframe from the query results
df_query_3 = pd.DataFrame(query_results_3)


# view query results dataframe and check that user_id is correct fot the users
df_query_3

Unnamed: 0,user_firstname,user_lastname
0,Jacqueline,Lynch
1,Tegan,Levine
2,Sara,Johnson


### Drop the tables before closing out the sessions

In [20]:
drop_table_query_1 = "DROP TABLE IF EXISTS session_details;"
session.execute(drop_table_query_1)

drop_table_query_2 = "DROP TABLE IF EXISTS user_activity_in_session;"
session.execute(drop_table_query_2)

drop_table_query_3 = "DROP TABLE IF EXISTS songs_listeners;"
session.execute(drop_table_query_3)

<cassandra.cluster.ResultSet at 0x7f3c884189d0>

### Close the session and cluster connection¶

In [21]:
session.shutdown()
cluster.shutdown()