# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import os
import glob
from tqdm import tqdm
import multiprocessing as mp
from cassandra.cluster import Cluster
from cql_queries import *

In [2]:
# for printing more rows/columns with pandas
pd.set_option("display.max_rows", 100)
pd.set_option("display.max_columns", None)

#### Creating list of filepaths to process original event csv data files

In [3]:
# Folder containing csv files
PATH = os.path.join(os.getcwd(), "event_data")

# List of csv files
list_files = glob.glob(os.path.join(PATH, "*.csv"))
print(f"{len(list_files)} csv files found!")

30 csv files found!


In [4]:
# helper function to load one csv as pandas dataframe
def _read_csv(p): return pd.read_csv(p)

In [5]:
%%time

# Load all csv files in parallel and concatenate in single dataframe

with mp.Pool() as pool:
    df_music = pd.concat(
        pool.map(_read_csv, tqdm(list_files)),
        ignore_index=True, copy=False
    )

100%|██████████| 30/30 [00:00<00:00, 45442.08it/s]


CPU times: user 75.4 ms, sys: 15.9 ms, total: 91.2 ms
Wall time: 319 ms


In [6]:
print(f"The dataset has {df_music.shape[0]} rows and {df_music.shape[1]} columns") 

The dataset has 8056 rows and 17 columns


In [7]:
print("Sample rows")
df_music.head()

Sample rows


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
0,,Logged In,Kevin,M,0,Arellano,,free,"Harrisburg-Carlisle, PA",GET,Home,1540010000000.0,514,,200,1542070000000.0,66.0
1,Fu,Logged In,Kevin,M,1,Arellano,280.05832,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1540010000000.0,514,Ja I Ty,200,1542070000000.0,66.0
2,,Logged In,Maia,F,0,Burke,,free,"Houston-The Woodlands-Sugar Land, TX",GET,Home,1540680000000.0,510,,200,1542070000000.0,51.0
3,All Time Low,Logged In,Maia,F,1,Burke,177.84118,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540680000000.0,510,A Party Song (The Walk of Shame),200,1542070000000.0,51.0
4,Nik & Jay,Logged In,Wyatt,M,0,Scott,196.51873,free,"Eureka-Arcata-Fortuna, CA",PUT,NextSong,1540870000000.0,379,Pop-Pop!,200,1542080000000.0,9.0


In [11]:
# Save subset of columns of interest in a new csv file

df_music[['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId']].to_csv("event_datafile_new.csv", index=False)

# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now we'll load the previously saved csv file and try to insert the data in cassandra tables we'll create. The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

In [12]:
# Load the processed dataset
df_music = pd.read_csv(os.path.join(os.getcwd(), "event_datafile_new.csv"))

In [21]:
# Check # of missig values for each column
print("Number of missing values per column")
df_music.isnull().sum()[df_music.isnull().sum() > 0]

Number of missing values per column


artist       1236
firstName     286
gender        286
lastName      286
length       1236
location      286
song         1236
dtype: int64

In the above dataframe, some text columns have their missing values encoded as `NaN`. When trying to insert them as is into cassandra tables we will get some errors. To avoid this, we'll fill `NaN` for these colums with empty strings, and user id with `-9999`.
We'll also cast user id to int :

In [22]:
# dictionary for filling NA for string columns with empty string
dict_na = {
    "firstName": "",
    "gender": "",
    "lastName": "",
    "location": "",
    "song": "",
    "artist": "",
    "userId": -9999
}

df_music.fillna(dict_na, inplace=True)

In [24]:
df_music["userId"] = df_music.userId.astype(int)

#### Creating a Cluster

In [13]:
# Connect to cassandra cluster and create a session

try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print("Can't connect and/or open session")
    print(e)

#### Create Keyspace

In [14]:
try:
    session.execute(create_keyspace_query)
except Exception as e:
    print("can't create keyspace")
    print(e)

#### Set Keyspace

In [15]:
try:
    session.set_keyspace('big_sparkify')
except Exception as e:
    print("can't set keyspace to session")
    print(e)

#### Drop all tables if they exist 

In [25]:
for query in list_drop_queries:
    try:
        session.execute(query)
    except Exception as e:
        print(f"can't excute drop table query : {query}")
        print(e)

### Given a set of 3 queries we'll create 3 data models accordingly

### The first query :
1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

We'll create the table `music_play_sessions`, with a primary key composed of 'Session id' and 'item in session', so that we can identify uniquely our rows and reference these fields in WHERE clause.

But first let's verify that these fileds uniquely identify our rows :

In [26]:
print("Number of duplicate rows : ")
print(df_music.duplicated(["sessionId", "itemInSession"]).sum())

Number of duplicate rows : 
0


Create Table :

In [27]:
try:
    session.execute(create_music_play_sessions)
except Exception as e:
    print(f"Can't execute query {create_music_play_sessions}")
    print(e)

Insert data :

In [28]:
%%time

# insert data into 
try:
    for row_item in tqdm(df_music[["sessionId", "itemInSession", "artist", "song", "length", "userId", "firstName", "lastName", "gender", "level", "location"]]\
                         .itertuples(index=False, name=None)):
        session.execute(insert_music_play_sessions, row_item)
except Exception as e:
    print("can't execute insert statement")
    print(e)

8056it [00:12, 641.90it/s]

CPU times: user 3.38 s, sys: 527 ms, total: 3.9 s
Wall time: 12.6 s





#### Results of query 1

In [29]:
query = """
    SELECT artist, song, length from music_play_sessions WHERE session_id = 338 AND item_in_session = 4
"""

try:
    results = session.execute(select_query1)
    for res in results:
        print(res)
except Exception as e:
    print(e)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=495.30731201171875)


### The second query :
2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

We'll create the table `users_activities`, with a primary key composed of 'user id' and 'Session id' and 'item in session', so that we can identify uniquely our rows and reference 'user id' fields in WHERE clause.

#### Create table :

In [30]:
try:
    session.execute(create_users_activities)
except Exception as e:
    print(f"Can't execute query {create_users_activities}")
    print(e)

#### Insert data :

In [31]:
%%time
try:
    for row_item in tqdm(df_music[["userId", "sessionId", "itemInSession", "artist", "song", "length", "firstName", "lastName", "gender", "level", "location"]]\
                         .itertuples(index=False, name=None)):
        session.execute(insert_users_activities, row_item)
except Exception as e:
    print("can't execute insert statement")
    print(e)

8056it [00:11, 708.40it/s]

CPU times: user 3.38 s, sys: 516 ms, total: 3.89 s
Wall time: 11.4 s





#### Results from query 2 :

In [32]:
try:
    results = session.execute(select_query2)
    for res in results:
        print(res)
except Exception as e:
    print(e)

Row(artist='Down To The Bone', song="Keep On Keepin' On", first_name='Sylvie', last_name='Cruz')
Row(artist='Three Drives', song='Greece 2000', first_name='Sylvie', last_name='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', first_name='Sylvie', last_name='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', first_name='Sylvie', last_name='Cruz')


### The 3rd query

3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'


Here we'll use song as primary key in addition to session id and item in session (that helps identify uniquely rows). Because some songs are empty strings and primary keys don't accept empty item, we'll filter the rows with empty song when inserting data.

#### Create table :

In [33]:
try:
    session.execute(create_songs_records)
except Exception as e:
    print(f"Can't execute query {create_songs_records}")
    print(e)        

#### Insert data :

In [34]:
%%time

try:
    for row_item in tqdm(df_music.loc[df_music.song != "", 
                                      ["song", "sessionId", "itemInSession", "artist", "length", "userId", "firstName", "lastName", "gender", "level", "location"]]\
                         .itertuples(index=False, name=None)):
        session.execute(insert_songs_records, row_item)
except Exception as e:
    print("can't execute insert statement")
    print(e)

6820it [00:09, 718.28it/s]

CPU times: user 2.75 s, sys: 531 ms, total: 3.28 s
Wall time: 9.51 s





In [35]:
try:
    results = session.execute(select_query3)
    for res in results:
        print(res)
except Exception as e:
    print(e)

Row(first_name='Sara', last_name='Johnson')
Row(first_name='Jacqueline', last_name='Lynch')
Row(first_name='Tegan', last_name='Levine')


### Drop the tables before closing out the sessions

In [38]:
for query in list_drop_queries:
    try:
        session.execute(query)
    except Exception as e:
        print(f"can't excute drop table query : {query}")
        print(e)

### Close the session and cluster connection¶

In [39]:
session.shutdown()
cluster.shutdown()