# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [5]:
filepath = r'C:\Users\DELL\P2_Cassandra_Data_Modeling_and_ETL\event_data'

# store data files pathes into a list
for root, dirs, files in os.walk(filepath):
    file_list = glob.glob(os.path.join(root,'*.csv'))

#test the
len(file_list)   

30

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [17]:
# create a dataframe with all data in the file 
df_final = pd.DataFrame()
for file in file_list:
    df = pd.read_csv(file)
    df_final = df_final.append(df)

# test the process
df_final.head()


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
0,,Logged In,Walter,M,0,Frye,,free,"San Francisco-Oakland-Hayward, CA",GET,Home,1540920000000.0,38,,200,1541110000000.0,39.0
1,,Logged In,Kaylee,F,0,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Home,1540340000000.0,139,,200,1541110000000.0,8.0
2,Des'ree,Logged In,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,You Gotta Be,200,1541110000000.0,8.0
3,,Logged In,Kaylee,F,2,Summers,,free,"Phoenix-Mesa-Scottsdale, AZ",GET,Upgrade,1540340000000.0,139,,200,1541110000000.0,8.0
4,Mr Oizo,Logged In,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",PUT,NextSong,1540340000000.0,139,Flat 55,200,1541110000000.0,8.0


In [20]:
# filter the empty cells in artist column
df_final = df_final[df_final.artist.notna()]

# keep the columns in need
df_final = df_final[['artist','firstName','gender','itemInSession','lastName','length','level','location','sessionId','song','userId']]

# test the process
df_final.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
2,Des'ree,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",139,You Gotta Be,8.0
4,Mr Oizo,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",139,Flat 55,8.0
5,Tamba Trio,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",139,Quem Quiser Encontrar O Amor,8.0
6,The Mars Volta,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",139,Eriatarka,8.0
7,Infected Mushroom,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",139,Becoming Insane,8.0


In [21]:
# save the final resulr into  a csv file
df_final.to_csv("event_datafile_new.csv",index=False)

# Part II. Create Apache Cassandra Database Modelling 

### Now we are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [5]:
# connect to cassandra instance
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])

# create session
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [6]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify
    WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
    """)

<cassandra.cluster.ResultSet at 0x7f61694b3320>

#### Set Keyspace

In [7]:
session.set_keyspace('sparkify')


## Create queries to ask the following three questions of the data

#### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


#### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

#### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




# Query #1

### Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4


In CQL the query will:

```
SELECT artist, song_title, song_length 
FROM song_sessions 
WHERE sessionId = 338 AND itemInSession = 4
```

- Create a table with primary key consists of partition key sessionId, and clustering key itemInSession.
- The columns will be: sessionId, itemInSession, artist, song_title and song_length.


In [9]:
# create song_sessions table
session.execute("""
    CREATE TABLE IF NOT EXISTS song_sessions
    (sessionId int, itemInSession int, artist text, song_title text, song_length float,
    PRIMARY KEY(sessionId, itemInSession))
    """)


<cassandra.cluster.ResultSet at 0x7f6143fe4ef0>

In [13]:
# process the file to extract data 
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csv_file = csv.reader(f)
    next(csvreader) 
    for line in csv_file:
        query = "INSERT INTO song_sessions (sessionId, itemInSession, artist, song_title, song_length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        artist_name, user_name, gender, itemInSession, user_last_name, length, level, location, sessionId, song, userId = line
        session.execute(query, (int(sessionId), int(itemInSession), artist_name, song, float(length)))

#### SELECT to verify that the data have been inserted into each table

In [15]:
out = session.execute("""SELECT artist, song_title, song_length FROM session_songs WHERE sessionId = 338 AND itemInSession = 4""")

for row in out:
    print(out.artist, out.song_title, out.song_length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


# Query #2

### Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In CQL query will be:

```
SELECT itemInSession, artist, song, firstName, lastName 
FROM users
WHERE userId = 10 AND sessionId = 182*
```

- Create table with primary consists of composite partition key userId, sessionId in order to avoid large data to be the same node (performance issue).
- Choose clustering key to be itemInSession to sort data by it.
- The columns will be: userId, sessionId, itemInSession, artist, song and firstName and lastName.


In [16]:
# create users table
session.execute("""
    CREATE TABLE IF NOT EXISTS users
    (userId int, sessionId int, artist text, song text, firstName text, lastName text, itemInSession int,
    PRIMARY KEY((userId, sessionId), itemInSession))
    """)


<cassandra.cluster.ResultSet at 0x7f6169480ef0>

In [18]:
# process the file to extract data 
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO users (userId, sessionId, artist, song, firstName, lastName, itemInSession)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        artist, firstName, gender, itemInSession, lastName, length, level, location, sessionId, song, userId = line
        session.execute(query, (int(userId), int(sessionId), artist, song, firstName, lastName, int(itemInSession)))

In [24]:
out = session.execute("""SELECT itemInSession, artist, song, firstName, lastName FROM users WHERE userId = 10 AND sessionId = 182""")

for out in rows:
    print(out.iteminsession, out.artist, out.song, out.firstname, out.lastname)

0 Down To The Bone Keep On Keepin' On Sylvie Cruz
1 Three Drives Greece 2000 Sylvie Cruz
2 Sebastien Tellier Kilometer Sylvie Cruz
3 Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


# Query #3

### Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In CQL query will bw:

```
SELECT firstName, lastName 
FROM history 
WHERE song = 'All Hands Against His Own'
```

- Create table with primary key consists of partition key song, and clustering key userId. 
- The columns will be: song, firstName, lastName and userId.


In [26]:
# create history table
session.execute("""
    CREATE TABLE IF NOT EXISTS history
    (song text, firstName text, lastName text, userId int,
    PRIMARY KEY(song, userId))
    """)


<cassandra.cluster.ResultSet at 0x7f616949f7b8>

In [27]:
# process the file to extract data 
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO app_history (song, firstName, lastName, userId)"
        query = query + " VALUES (%s, %s, %s, %s)"
        artist, firstName, gender, itemInSession, lastName, length, level, location, sessionId, song, userId = line
        session.execute(query, (song, firstName, lastName, int(userId)))

In [28]:
rows = session.execute("""SELECT firstName, lastName FROM app_history WHERE song = 'All Hands Against His Own'""")

for row in rows:
    print(row.firstname, row.lastname)

Jacqueline Lynch
Tegan Levine
Sara Johnson


In [29]:
# drop all created tables
session.execute("""DROP TABLE history""")

session.execute("""DROP TABLE users""")

session.execute("""DROP TABLE song_sessions""")

<cassandra.cluster.ResultSet at 0x7f613da7c400>

In [30]:
# close connection with cassandra
session.shutdown()
cluster.shutdown()