# Part I. ETL Pipeline for Pre-Processing the Files

#### Importing Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
print(os.getcwd())
filepath = os.getcwd() + '/event_data'
for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Cassandra tables

In [3]:
full_data_rows_list = [] 
for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)        
        for line in csvreader:
            full_data_rows_list.append(line) 
            
print('Total number of rows:', len(full_data_rows_list))

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)
with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

Total number of rows: 8056


In [4]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Querying the Cassandra Database

The `event_datafile_new.csv` file contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

<img src="images/image_event_datafile_new.jpg">

In [5]:
# Defining columns' ordering and types in one place to reduce code duplication.
columns = [
    ('artist', 'text'),
    ('firstName', 'text'),
    ('gender', 'text'),
    ('itemInSession', 'int'),
    ('lastName', 'text'),
    ('length', 'float'),
    ('level', 'text'),
    ('location', 'text'),
    ('sessionId', 'int'),
    ('song', 'text'),
    ('userId', 'int')
]

#### Creating a Cluster

In [6]:
try:
    from cassandra.cluster import Cluster
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

#### Creating Keyspace

In [7]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)
except Exception as e:
    print(e)

#### Setting Keyspace

In [8]:
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print(e)

#### Defining Helper Functions

Below are helper function to reduce code duplication.

In [9]:
from typing import List, Union, Iterable

FILE = 'event_datafile_new.csv'  # The input file.
CREATED_TABLES = []  # Newly created tables are appended here to make sure that all are dropped at the end.
# Columns and their ordering and types will be referred from here:
COLUMN_TO_IDX = {column: (idx, data_type) for idx, (column, data_type) in enumerate(columns)}

def value(row: List[str], column_name: str) -> Union[str, int, float]:
    """Returns the row's value corresponding to the column name."""
    index, data_type = COLUMN_TO_IDX[column_name]
    assert index < len(row)
    if data_type == 'int':
        return int(row[index])
    if data_type == 'float':
        return float(row[index])
    return row[index]

def create_table(name: str, columns: List[str], primary_key: str) -> str:
    """
    Creates a Cassandra table with the given name, columns and primary key.
    Returns the utilized CQL query.
    """
    query = f'CREATE TABLE IF NOT EXISTS {name} '
    query += f'({", ".join(column_and_type(column) for column in columns)}, PRIMARY KEY ({primary_key}))'
    try:
        session.execute(query)
        CREATED_TABLES.append(name)
        return query
    except Exception as e:
        print(e)
        
def column_and_type(column_name: str) -> str:
    """Returns column name and its type, e.g. `artist text`."""
    _, data_type = COLUMN_TO_IDX[column_name]
    return column_name + ' ' + data_type

        
def insert_into_table(name: str, columns: List[str], lines: Iterable[str]) -> str:
    """Inserts lines into a Cassandra table with the given name."""
    query = f'INSERT INTO {name} ({", ".join(columns)})'
    query += f' VALUES ({", ".join(["%s"] * len(columns))})'
    try:
        for line in lines:
            session.execute(query, (value(line, column) for column in columns))
        return query
    except Exception as e:
        print(e)
        
def display_query_results(query: str, columns: List[str]):
    """Pretty prints the result of the query for the given columns."""
    try:
        rows = session.execute(query)
        for row in rows:
            print([getattr(row, column.lower()) for column in columns])
    except Exception as e:
        print(e)

## Queries of interest

1. Give me the **artist, song title and song's length** in the music app history that was heard during **sessionId = 338, and itemInSession  = 4**.
2. Give me only the following: **name of artist, song (sorted by itemInSession) and user (first and last name)** for **userid = 10, sessionid = 182**.    
3. Give me every **user name (first and last)** in my music app history who listened to the **song 'All Hands Against His Own'**.

### Query 1

> Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4.

In [10]:
session_table_name = 'song_by_session_item'
session_columns = ['sessionId', 'itemInSession', 'artist', 'song', 'length']
session_create_query = create_table(session_table_name, session_columns, '(sessionId, itemInSession)')
print(session_create_query)

CREATE TABLE IF NOT EXISTS song_by_session_item (sessionId int, itemInSession int, artist text, song text, length float, PRIMARY KEY ((sessionId, itemInSession)))


In [11]:
with open(FILE, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    session_insert_query = insert_into_table(session_table_name, session_columns, csvreader)

print(session_insert_query)

INSERT INTO song_by_session_item (sessionId, itemInSession, artist, song, length) VALUES (%s, %s, %s, %s, %s)


#### Data Verification

In [12]:
query = f"SELECT * FROM {session_table_name} WHERE sessionId=338 AND itemInSession=4"
display_query_results(query, session_columns)

[338, 4, 'Faithless', 'Music Matters (Mark Knight Dub)', 495.30731201171875]


### Query 2

> Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182. 

In [13]:
user_session_table_name = 'song_by_user_session'
user_session_columns = ['userId', 'sessionId', 'artist', 'song', 'itemInSession', 'firstName', 'lastName']

# Create table.
user_session_create_query = create_table(user_session_table_name, user_session_columns, 'userId, sessionId, itemInSession')
print(user_session_create_query)

# Insert rows.
with open(FILE, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    user_session_insert_query = insert_into_table(user_session_table_name, user_session_columns, csvreader)
print(user_session_insert_query)

CREATE TABLE IF NOT EXISTS song_by_user_session (userId int, sessionId int, artist text, song text, itemInSession int, firstName text, lastName text, PRIMARY KEY (userId, sessionId, itemInSession))
INSERT INTO song_by_user_session (userId, sessionId, artist, song, itemInSession, firstName, lastName) VALUES (%s, %s, %s, %s, %s, %s, %s)


#### Data Verification

In [14]:
query = f"SELECT artist, song, firstName, lastName FROM {user_session_table_name} WHERE userId=10 AND sessionId=182"
display_query_results(query, ['artist', 'song', 'firstName', 'lastName'])

['Down To The Bone', "Keep On Keepin' On", 'Sylvie', 'Cruz']
['Three Drives', 'Greece 2000', 'Sylvie', 'Cruz']
['Sebastien Tellier', 'Kilometer', 'Sylvie', 'Cruz']
['Lonnie Gordon', 'Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', 'Sylvie', 'Cruz']


### Query 3

> Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [15]:
song_table_name = 'users_by_song'
song_columns = ['song', 'artist', 'firstName', 'lastName']

# Create table. Artist name is added to make sure that the primary key is unique.
song_create_query = create_table(song_table_name, song_columns, 'song, artist')
print(song_create_query)

# Insert rows.
with open(FILE, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    song_insert_query = insert_into_table(song_table_name, song_columns, csvreader)
print(song_insert_query)

CREATE TABLE IF NOT EXISTS users_by_song (song text, artist text, firstName text, lastName text, PRIMARY KEY (song, artist))
INSERT INTO users_by_song (song, artist, firstName, lastName) VALUES (%s, %s, %s, %s)


#### Data Verification

In [16]:
query = f"SELECT firstName, lastName FROM {song_table_name} WHERE song='All Hands Against His Own'"
display_query_results(query, ['firstName', 'lastName'])

['Sara', 'Johnson']


### Dropping the tables

In [17]:
def drop_table(name: str) -> str:
    query = f'DROP TABLE {name}'
    try:
        rows = session.execute(query)
        return query
    except Exception as e:
        print(e)
    
for table_name in CREATED_TABLES:
    print(drop_table(table_name))

DROP TABLE song_by_session_item
DROP TABLE song_by_user_session
DROP TABLE users_by_song


### Closing the session and cluster connection¶

In [18]:
session.shutdown()
cluster.shutdown()