# Part I. ETL Pipeline for Pre-Processing the Files

In [14]:
# Import Python packages
import pandas as pd
import os
import glob
import csv
from cassandra.cluster import Cluster

## Setup predefined SQL statements

In [15]:
# DROP TABLES
drop_session_song_play = "DROP TABLE IF EXISTS session_song_play"
drop_session_song_play_list = "DROP TABLE IF EXISTS session_song_play_list"
drop_session_play_artist = "DROP TABLE IF EXISTS session_play_artist"

## Session Song Play Table
The session song play table will have a primary key of session_id and item_in_session to ensure the records are unique. It will also enable us to tailor this to the request of knowing song information based on a session id and an item in session value.

In [16]:
# CREATE TABLE
create_session_song_play = ("CREATE TABLE IF NOT EXISTS session_song_play \
(session_id int, item_in_session int, artist text, song_title text, \
song_length float, PRIMARY KEY (session_id, item_in_session))")

## Session Song Play List Table
The session song play list table will have a primary key of user_id and session_id to ensure the records are unique. The item_in_session value will be a clustering column to sort the data. This is also to tailor the table to the requests of knowing data based on a provided user id and session id.

In [17]:
# CREATE TABLE
create_session_song_play_list = ("CREATE TABLE IF NOT EXISTS \
session_song_play_list \
(user_id int, session_id int, item_in_session int, artist text, \
song_title text, first_name text, last_name text, \
PRIMARY KEY ((user_id, session_id), item_in_session))")

## Session Play Artist Table
The session play artist table will have a primary key of song_title and user_id to ensure the records are unique. It will also enable us to tailor this to the request of knowing song information based on a song title.

In [18]:
# CREATE TABLE
create_session_play_artist = ("CREATE TABLE IF NOT EXISTS session_play_artist \
(song_title text, user_id int, first_name text, last_name text, \
PRIMARY KEY (song_title, user_id))")

In [19]:
# INSERT RECORDS
insert_session_song_play = ("INSERT INTO session_song_play \
(session_id, item_in_session, artist, song_title, song_length) \
VALUES (%s, %s, %s, %s, %s)")

insert_session_song_play_list = ("INSERT INTO session_song_play_list \
(user_id, session_id, item_in_session, artist, song_title, first_name, \
last_name) VALUES (%s, %s, %s, %s, %s, %s, %s)")

insert_session_play_artist = ("INSERT INTO session_play_artist \
(song_title, user_id, first_name, last_name) \
VALUES (%s, %s, %s, %s)")

# SELECT RECORDS
select_session_song_play = ("SELECT artist, song_title, song_length \
FROM session_song_play WHERE session_id = 338 and item_in_session = 4;")

select_session_song_play_list = ("SELECT artist, song_title, first_name, \
last_name FROM session_song_play_list WHERE user_id = 10 and \
session_id = 182;")

select_session_play_artist = ("SELECT first_name, last_name \
FROM session_play_artist WHERE song_title = 'All Hands Against His Own';")

# QUERY LISTS
create_table_queries = [create_session_play_artist, create_session_song_play,
                        create_session_song_play_list]
drop_table_queries = [drop_session_play_artist, drop_session_song_play,
                      drop_session_song_play_list]


## Setup database helper methods

In [20]:
def create_db_connection(_db_name):
    """
    Creates a database connection
    :param _db_name: Name of database to connect too
    :return: Database session
    """
    _cluster = Cluster(['localhost'])
    _session = _cluster.connect()

    _session.execute("CREATE KEYSPACE IF NOT EXISTS " + _db_name +
                     " WITH REPLICATION = { 'class' : 'SimpleStrategy', \
                     'replication_factor' : 1 }")
    _session.set_keyspace(_db_name)
    return _session, _cluster


def drop_tables(_session):
    """
    Drop pre-defined tables
    :param _session: Session to database
    """
    print('Dropping tables . . .')
    for _query in drop_table_queries:
        _session.execute(_query)


def create_tables(_session):
    """
    Create a pre-defined list of tables
    :param _session: Session to database
    """
    print('Creating tables. . .')
    for _query in create_table_queries:
        _session.execute(_query)


def query_table(_session, _query):
    """
    Method to run a query
    :param _session: Session to database
    :param _query: Query to run
    :return: Database rows from the result set
    """
    _rows = _session.execute(_query)
    return _rows


def insert_record_into_table(_session, _query, _values):
    """
    Method to insert records into a table
    :param _session: Session to database
    :param _query: Query to run
    :param _values: Values to be inserted
    """
    _session.execute(_query, _values)

## Setup helper methods

In [21]:
def get_file_path_list(_filepath):
    """
    Generates a list of file paths for a given directory
    :param _filepath: Folder directory path
    :return: List of file paths
    """
    _file_path_list = []
    # Create a for loop to create a list of files and collect each filepath
    for root, dirs, files in os.walk(_filepath):
        # join the file path and roots with the subdirectories using glob
        _file_path_list = glob.glob(os.path.join(root, '*'))
    return _file_path_list

def get_full_file_path_data(_file_path_list):
    """
    Create a list of data rows from a list of file paths
    :param _file_path_list: List of file paths
    :return: List of data rows
    """
    # initiating an empty list of rows that will be generated from each file
    _full_data_rows_list = []

    # for every filepath in the file path list
    for _f in _file_path_list:

        # reading csv file
        with open(_f, 'r', encoding='utf8', newline='') as _csvfile:
            # creating a csv reader object
            _csvreader = csv.reader(_csvfile)
            next(_csvreader)

            # extracting each data row one by one and append it
            for _line in _csvreader:
                _full_data_rows_list.append(_line)
    return _full_data_rows_list

def get_event_data_for_import(_full_data_rows_list, _event_data_file_path):
    """
    Parses the event data rows and produces a new list with only the needed
    columns for inserting into cassandra
    :param _full_data_rows_list: List of data rows
    :param _event_data_file_path: File path that the data will be written too
    """
    # creating a smaller event data csv file called event_datafile_full csv
    # that will be used to insert data into the \
    # Apache Cassandra tables
    csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL,
                         skipinitialspace=True)
    if os.path.exists(_event_data_file_path):
        print('Removing existing file: ', _event_data_file_path)
        os.remove(_event_data_file_path)

    with open(_event_data_file_path, 'w+', encoding='utf8', newline='') as f:
        writer = csv.writer(f, dialect='myDialect')
        writer.writerow(
            ['artist', 'firstName', 'gender', 'itemInSession', 'lastName',
             'length', 'level', 'location', 'sessionId', 'song', 'userId'])
        for row in full_data_rows_list:
            if row[0] == '':
                continue
            writer.writerow(
                (row[0], row[2], row[3], row[4], row[5], row[6], row[7],
                 row[8], row[12], row[13], row[16]))
        f.close()
    
    # check the number of rows in your csv file
    with open(_event_data_file_path, 'r', encoding='utf8') as f:
        print('Created file: ', _event_data_file_path, ' Total rows: ',
              sum(1 for line in f))
    f.close()

## Prepare event data file

In [22]:
# checking your current working directory
print('Starting etl script. . .')
print('\nPreparing data file. . .')
print('Current directory: ', os.getcwd())

# Get your current folder and subfolder event data
# filepath = os.getcwd() + '/event_data'
filepath = 'event_data'

file_path_list = get_file_path_list(filepath)
full_data_rows_list = get_full_file_path_data(file_path_list)

# uncomment the code below if you would like to get total number of rows
print('Data row length: ', len(full_data_rows_list))

event_data_file = '/home/workspace/event_datafile_new.csv'
get_event_data_for_import(full_data_rows_list, event_data_file)

Starting etl script. . .

Preparing data file. . .
Current directory:  /home/workspace
Data row length:  8056
Removing existing file:  /home/workspace/event_datafile_new.csv
Created file:  /home/workspace/event_datafile_new.csv  Total rows:  6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

In [23]:
def parse_data_file_insert_db(_session, _event_data_file):
    """
    Loop through data file and insert records into the database
    :param _session: Session to database
    :param _event_data_file: Data file
    """
    df = pd.read_csv(_event_data_file)
    # df.info()
    for index, row in df.iterrows():
        # insert records into the session_song_play table
        _session_song_play_values = (
            row['sessionId'], row['itemInSession'], row['artist'], row['song'],
            row['length'])
        insert_record_into_table(
            _session,
            insert_session_song_play,
            _session_song_play_values)

        # insert records into the session_song_play_list table
        _session_song_play_list_values = (
            row['userId'], row['sessionId'], row['itemInSession'],
            row['artist'],
            row['song'], row['firstName'], row['lastName'])
        insert_record_into_table(
            _session,
            insert_session_song_play_list,
            _session_song_play_list_values)

        # insert records into the session_play_artist table
        _session_play_artist_values = (
            row['song'], row['userId'], row['firstName'], row['lastName'])
        insert_record_into_table(
            _session,
            insert_session_play_artist,
            _session_play_artist_values)

def query_data(_session, _query, _question):
    _rows = query_table(_session, _query)

    # print('Printing rows. . . ')
    for _row in _rows:
        if _question == 1:
            print(
                "The song played during sessionId = 338, and itemInSession = 4 was - artist: ",
                _row.artist, " song title: ", _row.song_title,
                "  song length:",
                _row.song_length)
        elif _question == 2:
            print(_row.first_name, _row.last_name, 'played the song:',
                  _row.song_title, 'from artist:', _row.artist,
                  'during session id 182')
        elif _question == 3:
            print(_row.first_name, _row.last_name,
                  " listened to the song 'All Hands Against His Own'.")

## Create a cluster, keyspace, drop/create tables

In [24]:
print('\nSetting up database connection to sparkifydb')
session, cluster = create_db_connection('sparkifydb')
drop_tables(session)
create_tables(session)


Setting up database connection to sparkifydb
Dropping tables . . .
Creating tables. . .


## Insert data into tables

In [None]:
print('Parsing data to tables. . .')
parse_data_file_insert_db(session, event_data_file)

Parsing data to tables. . .


### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

In [9]:
query_data(session, select_session_song_play, 1)

The song played during sessionId = 338, and itemInSession = 4 was - artist:  Faithless  song title:  Music Matters (Mark Knight Dub)   song length: 495.30731201171875


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182


In [10]:
query_data(session, select_session_song_play_list, 2)

Sylvie Cruz played the song: Keep On Keepin' On from artist: Down To The Bone during session id 182
Sylvie Cruz played the song: Greece 2000 from artist: Three Drives during session id 182
Sylvie Cruz played the song: Kilometer from artist: Sebastien Tellier during session id 182
Sylvie Cruz played the song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit) from artist: Lonnie Gordon during session id 182


### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'


In [11]:
query_data(session, select_session_play_artist, 3)

Jacqueline Lynch  listened to the song 'All Hands Against His Own'.
Tegan Levine  listened to the song 'All Hands Against His Own'.
Sara Johnson  listened to the song 'All Hands Against His Own'.


### Drop the tables before closing out the sessions

In [12]:
drop_tables(session)

Dropping tables . . .


### Close the session and cluster connection¶

In [13]:
session.shutdown()
cluster.shutdown()