# 1. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/workspace/home


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userID'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# 2. Apache Cassandra Modeling

## Data Inputs

The event_datafile_new.csv contains the following columns: 

- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userID

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Apache Cassandra Connection and Setup

#### Creating a Cluster

In [5]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()
session = cluster.connect()

#### Create Keyspace

In [6]:
qry = '''
CREATE KEYSPACE IF NOT EXISTS udacity
WITH REPLICATION = {
    'class' : 'SimpleStrategy',
    'replication_factor' : 1
    }
'''
session.execute(qry)

<cassandra.cluster.ResultSet at 0x7f1630e4f7f0>

#### Set Keyspace

In [7]:
session.set_keyspace('udacity')

### Helpers

We define a few helper functions that will be used later on to remove some of the redudant work neeeded for modeling for each query

In [8]:
# input data Column Names
file = 'event_datafile_new.csv'

fcols = ["artist","firstName","gender","itemInSession","lastName","length","level","location","sessionID","song","userID"]

In [9]:
# Construct an insert query
def get_insert_query_base(table, cols):
    """
    Generate a base SQL INSERT query string for a given table and column list.

    Parameters:
    table (str): The name of the SQL table to insert data into.
    cols (list): A list of column names to specify in the INSERT statement.

    Returns:
    str: A formatted SQL INSERT query string that can be used to insert data into the specified table
         using placeholders for column values.

    Example:
    >>> get_insert_query_base("employees", ["id", "name", "salary"])
    'INSERT INTO employees (id, name, salary) VALUES (%s, %s, %s)'

    Note:
    - This function returns a base query string with placeholders for column values. You should use
      this as a template and add the actual values using a database cursor's execute method.
    - The column names provided in the 'cols' parameter should be in the same order as the values
      you intend to insert into the corresponding columns.
    - The function uses the %s placeholder for values, which is common in database libraries like cassandra. 
      Make sure to adapt it to the specific placeholder syntax of your database library if necessary.
    """
    
    return f'''INSERT INTO {table} {str(cols).replace("'",'')} VALUES ({('%s,'*len(cols))[:-1]})'''
    

In [10]:
# Create an insert query for a table based on columns and data types
def get_insert_query(file, table, cols, dtypes):
    """
    Generate and execute an SQL INSERT query using data from a CSV file.

    Parameters:
    - file (str): The path to the CSV file containing the data to be inserted.
    - table (str): The name of the database table where the data will be inserted.
    - cols (list of str): A list of column names in the table corresponding to the CSV data.
    - dtypes (list of callable): A list of callable functions (e.g., int, str) that will be used
      to cast and format the CSV data before insertion.

    Returns:
    None

    This function reads data from the specified CSV file and constructs an SQL INSERT query for
    inserting that data into the specified database table. The function uses the 'cols' list to
    map the CSV columns to the corresponding table columns, and the 'dtypes' list to specify
    how the data should be converted to the appropriate data types for insertion.

    Example:
    - Suppose we have a CSV file 'data.csv' with columns 'name', 'age', and 'city', and we want
      to insert this data into a table 'persons' with corresponding columns 'full_name', 'age',
      and 'residence'. We can call the function as follows:

      get_insert_query('data.csv', 'persons', ['full_name', 'age', 'residence'], [str, int, str])

    This will generate and execute an SQL INSERT query that inserts the data from 'data.csv'
    into the 'persons' table, converting 'age' to an integer and leaving 'name' and 'city' as
    strings.
    """
    query = get_insert_query_base(table,cols)
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader)
        for line in csvreader:
            data = tuple(d(line[fcols.index(c)]) for c,d in zip(cols, dtypes))
            session.execute(query, data)

In [11]:
# use pretty table to display query results
from prettytable import PrettyTable
def display_rows(rows,cols):
    """
    Display a tabular representation of data rows.

    Parameters
    ----------
    rows : list of lists
        A list of rows where each row is represented as a list of values.
    cols : list of str
        A list of column headers as strings. The number of columns should match
        the number of elements in each row.

    Returns
    -------
    None
        This function does not return a value but prints the tabular data.

    Examples
    --------
    >>> data = [
    ...     ["Alice", 25, "Engineer"],
    ...     ["Bob", 30, "Manager"],
    ...     ["Charlie", 28, "Designer"]
    ... ]
    >>> headers = ["Name", "Age", "Occupation"]
    >>> display_rows(data, headers)
    +---------+-----+------------+
    |   Name  | Age | Occupation |
    +---------+-----+------------+
    |  Alice  |  25 |  Engineer  |
    |   Bob   |  30 |  Manager   |
    | Charlie |  28 |  Designer  |
    +---------+-----+------------+

    Notes
    -----
    This function uses the PrettyTable library to create a formatted table
    for displaying the data. Make sure to install PrettyTable using
    `pip install PrettyTable` before using this function.
    """
    t = PrettyTable(cols)
    for row in rows:
        t.add_row(row)
    print(t)

### Query 1:  

The goal of query 1 is to get information about songs (artist, title, length) for songs that have been heard.  It's important for us to be able to isolate this information for particular sessions (`sessionID`), and when they happened in that session (`itemInSession`).

In order to accomplish this we have a query specific things to consider when data modeling:
+ The columns that are neccesary are at minimum artist, title, length, sessionID, and itemInSession.  The requirements use those in either the output or filter criteria so we know those must be in the table.
+ sessionsID and itemInSession must be part of a primary, composite, or clustering key because those are used as filters in the target query.

With those requirements in mind we can construct a create table statement.  Once we have that we need to verify that the primary + composite key combinations are unique.  In this case they are, so we do not have to do anything more.

In [12]:
qry = '''
CREATE TABLE session_songs (
    sessionID int,
    itemInSession int,
    artist text,
    song text,
    length decimal,
    PRIMARY KEY(sessionID,itemInSession)
)
'''
session.execute(qry)

<cassandra.cluster.ResultSet at 0x7f1630ec1e80>

In [13]:
cols = ('sessionID','itemInSession','artist','song','length')
dtypes = (int, int, str, str, float)

get_insert_query(file, 'session_songs', cols, dtypes)

In [14]:
qry = 'select artist, song, length from session_songs where sessionId = 338 and itemInSession = 4;'
rows = session.execute(qry)
display_rows(rows,('artist', 'song', 'length'))

+-----------+---------------------------------+----------+
|   artist  |               song              |  length  |
+-----------+---------------------------------+----------+
| Faithless | Music Matters (Mark Knight Dub) | 495.3073 |
+-----------+---------------------------------+----------+


### Query 2


The goal of query 2 is to get information about what songs users listed (song, user name) for songs they listened to.  It's important for us to be able to isolate this information for particular users (`userID`) and a given session (`sessionID`).

In order to accomplish this we have a query specific things to consider when data modeling:
+ The columns that are neccesary are at minimum artist, song, firstName, lastName, userID, sessionID, and ItemInSession.  The requirements use those in either the output, filter, or sort criteria so we know those must be in the table.
+ userID and sessionID must be part of a primary, composite, or clustering key because those are used as filters in the target query.
+ The query should be sorted by ItemInSession.  For optimal performance, we can make ItemInSession a clustering key so that the data is sorted by default.

With those requirements in mind we can construct a create table statement.  Once we have that we need to verify that the primary + composite key combinations are unique.  In this case they are, so we do not have to do anything more.

In [15]:
qry = '''
CREATE TABLE user_artists (
    userID int,
    sessionID int,
    itemInSession int,
    artist text,
    song text,
    firstname text,
    lastname text,
    PRIMARY KEY((userID, sessionID), itemInSession)
)
'''
session.execute(qry)

<cassandra.cluster.ResultSet at 0x7f1630e93a90>

In [16]:
cols = ('userID', 'sessionID', 'itemInSession', 'artist','song','firstName','lastName')
dtypes = (int, int, int, str, str, str, str)

get_insert_query(file, 'user_artists', cols, dtypes)

In [17]:
qry = 'select artist, song, firstName, lastName from user_artists where userID = 10 and sessionID = 182'
rows = session.execute(qry)
display_rows(rows,('artist', 'song', 'firstName', 'lastName'))

+-------------------+------------------------------------------------------+-----------+----------+
|       artist      |                         song                         | firstName | lastName |
+-------------------+------------------------------------------------------+-----------+----------+
|  Down To The Bone |                  Keep On Keepin' On                  |   Sylvie  |   Cruz   |
|    Three Drives   |                     Greece 2000                      |   Sylvie  |   Cruz   |
| Sebastien Tellier |                      Kilometer                       |   Sylvie  |   Cruz   |
|   Lonnie Gordon   | Catch You Baby (Steve Pitron & Max Sanna Radio Edit) |   Sylvie  |   Cruz   |
+-------------------+------------------------------------------------------+-----------+----------+


### Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

The goal of query 3 is to get users that have listened to a particular song at any point.

In order to accomplish this we have a query specific things to consider when data modeling:
+ The columns that are neccesary are at minimum firsName, lastName, and song.  The requirements use those in either the output, filter, or sort criteria so we know those must be in the table.
+ song must be part of the primary or clustering key as it is used as a filter requirement.

With those requirements in mind we can construct a create table statement.  Once we have that we need to verify that the primary + composite key combinations are unique.  In this case song is not sufficient as a primary key as it is not unique, and so we need to add userID to make the primary and clustering key combination unique.

In [18]:
qry = '''
CREATE TABLE song_users (
    song text,
    userID int,
    firstName text,
    lastName text,
    itemInSession int,
    sessionID int,
    PRIMARY KEY(song, userID)
)
'''
session.execute(qry)

<cassandra.cluster.ResultSet at 0x7f1630db9f60>

In [19]:
cols = ('song','userID','firstName','lastName','itemInSession','sessionID')
dtypes = (str, int, str, str, int, int)

get_insert_query(file, 'song_users', cols, dtypes)

In [20]:
qry = "select firstName, lastName from song_users where song = 'All Hands Against His Own'"
rows = session.execute(qry)
display_rows(rows,('firstName', 'lastName'))

+------------+----------+
| firstName  | lastName |
+------------+----------+
| Jacqueline |  Lynch   |
|   Tegan    |  Levine  |
|    Sara    | Johnson  |
+------------+----------+


### Drop the tables before closing out the sessions

In [21]:
for table in ('session_songs','user_artists','song_users'):
    session.execute(f"DROP TABLE IF EXISTS {table}") 

### Close the session and cluster connection¶

In [22]:
session.shutdown()
cluster.shutdown()