# Creating Python ETL Data Pipeline And Modeling the data with Cassandra 

#### By Otto Kwon

<br> </br>
## I. ETL Pipeline by Python
#### Transfers data from a set of CSV files within a directory to create a streamlined CSV file


### PROCEDURE:

1. Create filepaths function 
2. Get access to the dataset's directory 
3. Create new CSV file 
4. Transfer the data with appropriate columns

<br>

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### 1. Creating filepaths to extract original csv data

In [2]:
# checking directory
print(os.getcwd())

/home/workspace


In [3]:
# Getting current folder and subfolder, "event_data"
filepath = os.getcwd() + '/event_data'
# checking for filepath
print(filepath)

/home/workspace/event_data


#### 2. Get accessible to the dataset by probing the directory

In [4]:
# Create a for loop to create a list of files and collect each filepath using glob
for root, dirs, files in os.walk(filepath):
    # setting new_root to avoide checkpoints files
    if '.' not in root:
        new_root = root
    file_path_list = glob.glob(os.path.join(new_root,'*'))
    
# checking for file_path_list
print(file_path_list)

['/home/workspace/event_data/2018-11-30-events.csv', '/home/workspace/event_data/2018-11-23-events.csv', '/home/workspace/event_data/2018-11-22-events.csv', '/home/workspace/event_data/2018-11-29-events.csv', '/home/workspace/event_data/2018-11-11-events.csv', '/home/workspace/event_data/2018-11-14-events.csv', '/home/workspace/event_data/2018-11-20-events.csv', '/home/workspace/event_data/2018-11-15-events.csv', '/home/workspace/event_data/2018-11-05-events.csv', '/home/workspace/event_data/2018-11-28-events.csv', '/home/workspace/event_data/2018-11-25-events.csv', '/home/workspace/event_data/2018-11-16-events.csv', '/home/workspace/event_data/2018-11-18-events.csv', '/home/workspace/event_data/2018-11-24-events.csv', '/home/workspace/event_data/2018-11-04-events.csv', '/home/workspace/event_data/2018-11-19-events.csv', '/home/workspace/event_data/2018-11-26-events.csv', '/home/workspace/event_data/2018-11-12-events.csv', '/home/workspace/event_data/2018-11-27-events.csv', '/home/work

#### 3. Creating new csv file that will be used for ETL by Cassandra later

In [5]:
# initiating an empty list of rows that will be generated by each file later
full_data_rows_list = [] 
    
# for every dataset in the file path list 
for f in file_path_list:

# reading csv file of the datasets
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
     
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            full_data_rows_list.append(line)

# checking first 5 rows of full_data_rows_list
print(full_data_rows_list[:4])

[['Stephen Lynch', 'Logged In', 'Jayden', 'M', '0', 'Bell', '182.85669', 'free', 'Dallas-Fort Worth-Arlington, TX', 'PUT', 'NextSong', '1.54099E+12', '829', "Jim Henson's Dead", '200', '1.54354E+12', '91'], ['Manowar', 'Logged In', 'Jacob', 'M', '0', 'Klein', '247.562', 'paid', 'Tampa-St. Petersburg-Clearwater, FL', 'PUT', 'NextSong', '1.54056E+12', '1049', 'Shell Shock', '200', '1.54354E+12', '73'], ['Morcheeba', 'Logged In', 'Jacob', 'M', '1', 'Klein', '257.41016', 'paid', 'Tampa-St. Petersburg-Clearwater, FL', 'PUT', 'NextSong', '1.54056E+12', '1049', 'Women Lose Weight (Feat: Slick Rick)', '200', '1.54354E+12', '73'], ['Maroon 5', 'Logged In', 'Jacob', 'M', '2', 'Klein', '231.23546', 'paid', 'Tampa-St. Petersburg-Clearwater, FL', 'PUT', 'NextSong', '1.54056E+12', '1049', "Won't Go Home Without You", '200', '1.54354E+12', '73']]


#### Sample view of data in excel form

In [6]:
df_sample = pd.read_csv(file_path_list[0])
df_sample.columns

Index(['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName',
       'length', 'level', 'location', 'method', 'page', 'registration',
       'sessionId', 'song', 'status', 'ts', 'userId'],
      dtype='object')

#### 4. Transfer data to new CSV file with right columns
##### The columns :
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

In [7]:
# registering dialect that quotes just everything written on the csv.
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
with open('loaded_data.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length', \
                     'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5],
                         row[6], row[7], row[8], row[12], row[13], row[16]))

        
# Source For all codes above : 
#    https://docs.python.org/3/library/csv.html
#    https://pymotw.com/2/csv/

In [8]:
# check the number of rows in your csv file
with open('loaded_data.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Performing Cassandra or CQL on the data

#### I will perform Cassandra on loaded_data.csv with 3 CQL questions.
#### I created loaded_data.csv on the directory and it has colums :
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of <font color=red>**loaded_data.csv**</font><br>

<img src="images/image_event_datafile_new.jpg">

### Creating Cluster and Keyspace

#### Creating a Cluster

In [9]:
# Creating cluster
from cassandra.cluster import Cluster
cluster = Cluster()

# Creating session for query later
session = cluster.connect()

#### Create Keyspace

In [10]:
try :
    session.execute("""CREATE KEYSPACE IF NOT EXISTS otto_kwon
                    WITH REPLICATION =
                    {'class' : 'SimpleStrategy', 'replication_factor' : 1}
                    """)
except Exception as e:
    print(e)

#### Set Keyspace

In [11]:
try:
    session.set_keyspace('otto_kwon')
except Exception as e:
    print(e)

## <font color='red'>Creating 3 CQL queries</font>

<br>

##### 1. Show artist, song, and length for session_id = 338, and item_in_session = 4


##### 2. Show name of artist, song (sorted by itemInSession) and user full name for user_id = 10, session_id = 182
    

##### 3. Show all user full name who listened to the song 'All Hands Against His Own'

<br>
<br>
<br>


### <font color='blue'>1st question : </font> 
##### Show artist, song and length for session_Id = 338, and item_in_session = 4

In [12]:
# create table song_session_item with appropriate types for each columns
session.execute("""
                CREATE TABLE IF NOT EXISTS song_session_item
                (session_id int, 
                item_in_session int, 
                artist text, 
                song text, 
                length float, 
                PRIMARY KEY(session_id, item_in_session))
                """)

# I set partition keys to be session_id and item_in_session
# To avoid WHERER ordering error in NoSQL

<cassandra.cluster.ResultSet at 0x7f9204609128>

In [13]:
# getting the loaded_data.csv
file = 'loaded_data.csv'

# reading the loaded_data.csv
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # to skip header columns
    for line in csvreader:
        # assigning CQL query : INSERT INTO ....
        query = "INSERT INTO song_session_item (session_id, item_in_session, artist, song, length)"
        query = query + "VALUES (%s,%s,%s,%s,%s)"
        
        # executing the query
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

#### Visualizing query with artist, song, and length for session_id = 338, and item_in_session = 4

In [14]:
# creating CQL query for 1st question
lines = session.execute("""
                        SELECT artist, 
                        song, 
                        length 
                        FROM song_session_item
                        WHERE session_id = 338 
                        AND item_in_session = 4
                        """)

In [15]:
# Visualizing by printing the SELECT execution
for line in lines:
    print(line.artist, line.song, line.length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


### <font color='blue'>Second question :</font> 
##### Show name of artist, song (sorted by itemInSession) and user full name for user_id = 10, session_id = 182

In [16]:
# create table "song_play_list_session" with appropriate types for each columns
session.execute("""
                CREATE TABLE IF NOT EXISTS song_play_list_session
                (user_id int, 
                session_id int, 
                item_in_session int, 
                artist text, 
                song text, 
                user text, 
                PRIMARY KEY((user_id, session_id), item_in_session))
                WITH CLUSTERING ORDER BY (item_in_session DESC);
                """)
# The partition keys are user_id and session_id with same reason as QUESTION 1
# I used WITH CLUSTERING ORDER BY ... to sort song with item_in_session

#I got format of CLUSTERING ORDER BY from :
#https://www.datastax.com/blog/2015/03/we-shall-have-order

<cassandra.cluster.ResultSet at 0x7f9204605400>

In [17]:
# getting the loaded_data.csv
file = 'loaded_data.csv'

# reading the loaded_data.csv
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        # assigning CQL query : INSERT INTO ....
        query = "INSERT INTO song_play_list_session (user_id, session_id, item_in_session, artist, song, user)"
        query = query + "VALUES (%s,%s,%s,%s,%s,%s)"
        
        # executing the query
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1]+' '+line[4]))

#### Visualizing query with artist, song (sorted by item_in_session, and user full name 
#### for user_id = 10 session_id = 182, and item_in_session = 4

In [18]:
# creating CQL query for 1st question
lines = session.execute("""
                        SELECT artist, song, user 
                        FROM song_play_list_session 
                        WHERE user_id = 10 
                        AND session_id = 182
                        """)

# visualizing the query
for line in lines:
    print(line.artist, line.song, line.user)

Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Down To The Bone Keep On Keepin' On Sylvie Cruz


### <font color = 'blue'> 3rd Question : </font>
##### Show all user full name who listened to the song 'All Hands Against His Own'

In [19]:
# create table song_one_title with appropriate types for each columns
session.execute("""
                CREATE TABLE IF NOT EXISTS song_one_title
                (song text, 
                user_id int, 
                name text, 
                PRIMARY KEY(song, user_id))
                """)
# I included user_id into partition so that the names will be in good order.

# getting the loaded_data.csv
file = 'loaded_data.csv'

# reading the loaded_data.csv
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        # assigning CQL query : INSERT INTO ....
        query = "INSERT INTO song_one_title (song, user_id, name)"
        query = query + "VALUES (%s,%s,%s)"
        
        # executing the query
        session.execute(query,(line[9],int(line[10]),line[1]+' '+line[4]))

#### Visualizing query with user full name for song = 'All Hands Against His Own'.

In [20]:
# query
lines = session.execute("""
                        SELECT name FROM song_one_title 
                        WHERE song = 'All Hands Against His Own'
                        """)

# visualizing
for line in lines:
    print(line.name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Dropping the table.

In [21]:
# Dropping all tables
session.execute("DROP TABLE song_session_item")
session.execute("DROP TABLE song_play_list_session")
session.execute("DROP TABLE song_one_title")

<cassandra.cluster.ResultSet at 0x7f92045f6588>

### Checking for the Drop.

In [22]:
# Checking for the Drop. If error occurs and says, "unconfigured table...", then it is dropped.
try :
    session.execute("""
                    SELECT * 
                    FROM song_session_item
                    LIMIT 1
                    """)
except :
    print("First Table Drop Confirmed")

First Table Drop Confirmed


In [23]:
# Checking for the Drop. If error occurs, then it is dropped.
try:
    session.execute("""
                    SELECT * 
                    FROM song_play_list_session
                    LIMIT 1
                    """)
except :
    print('Second Table Drop Confirmed')

Second Table Drop Confirmed


In [24]:
# Checking for the Drop. If error occurs, then it is dropped.
try:
    session.execute("""
                    SELECT * 
                    FROM song_one_title
                    LIMIT 1
                    """)
except :
    print('Third Table Drop Confirmed')

Third Table Drop Confirmed


### Close the session and cluster connection¶

In [25]:
session.shutdown()
cluster.shutdown()