# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [21]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [22]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    print(json.dumps(file_path_list, indent=4))

/home/workspace
[
    "/home/workspace/event_data/2018-11-27-events.csv",
    "/home/workspace/event_data/2018-11-04-events.csv",
    "/home/workspace/event_data/2018-11-07-events.csv",
    "/home/workspace/event_data/2018-11-09-events.csv",
    "/home/workspace/event_data/2018-11-19-events.csv",
    "/home/workspace/event_data/2018-11-05-events.csv",
    "/home/workspace/event_data/2018-11-22-events.csv",
    "/home/workspace/event_data/2018-11-16-events.csv",
    "/home/workspace/event_data/2018-11-26-events.csv",
    "/home/workspace/event_data/2018-11-24-events.csv",
    "/home/workspace/event_data/2018-11-29-events.csv",
    "/home/workspace/event_data/2018-11-15-events.csv",
    "/home/workspace/event_data/2018-11-20-events.csv",
    "/home/workspace/event_data/2018-11-06-events.csv",
    "/home/workspace/event_data/2018-11-18-events.csv",
    "/home/workspace/event_data/2018-11-21-events.csv",
    "/home/workspace/event_data/2018-11-10-events.csv",
    "/home/workspace/event_dat

In [23]:
# Print the headers
file = open(file_path_list[0])
csvreader = csv.reader(file)
columns = next(csvreader)
print("header=", json.dumps(["row[" + str(index) + "] = " + column for index, column in enumerate(columns)], indent=4))

header= [
    "row[0] = artist",
    "row[1] = auth",
    "row[2] = firstName",
    "row[3] = gender",
    "row[4] = itemInSession",
    "row[5] = lastName",
    "row[6] = length",
    "row[7] = level",
    "row[8] = location",
    "row[9] = method",
    "row[10] = page",
    "row[11] = registration",
    "row[12] = sessionId",
    "row[13] = song",
    "row[14] = status",
    "row[15] = ts",
    "row[16] = userId"
]


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [24]:
def create_single_event_data(event_datafile):
    full_data_rows_list = [] 

    for f in file_path_list:
        with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
            csvreader = csv.reader(csvfile) 
            next(csvreader)

            for line in csvreader:
                full_data_rows_list.append(line) 

    print("A row", json.dumps([str(index) + ") " + columns[index] + " = " + value for index, value in enumerate(full_data_rows_list[0])], indent=4))
    print("Total Number of rows", len(full_data_rows_list))

    # Write single file
    csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)
    
    with open(event_datafile, 'w', encoding = 'utf8', newline='') as f:
        writer = csv.writer(f, dialect='myDialect')
        writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                    'level','location','sessionId','song','userId'])
        for row in full_data_rows_list:
            if (row[0] == ''):
                continue
            # We are collecting: artist, firstName, gender, itemInSession, lastName, length, level, location, sessionId, song, userId
            writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [25]:
event_datafile = 'event_datafile_new.csv'
create_single_event_data(event_datafile)

A row [
    "0) artist = Barry Tuckwell/Academy of St Martin-in-the-Fields/Sir Neville Marriner",
    "1) auth = Logged In",
    "2) firstName = Mohammad",
    "3) gender = M",
    "4) itemInSession = 0",
    "5) lastName = Rodriguez",
    "6) length = 277.15873",
    "7) level = paid",
    "8) location = Sacramento--Roseville--Arden-Arcade, CA",
    "9) method = PUT",
    "10) page = NextSong",
    "11) registration = 1.54051E+12",
    "12) sessionId = 961",
    "13) song = Horn Concerto No. 4 in E flat K495: II. Romance (Andante cantabile)",
    "14) status = 200",
    "15) ts = 1.54328E+12",
    "16) userId = 88"
]
Total Number of rows 8056


In [26]:
df = pd.read_csv(event_datafile)
print("Total:\n", df.shape[0])

Total:
 6820


It means we have:
8056 - 6820 = 1236 empty "artist" rows on the "event_data" which we have traversed.

In [27]:
print("\nUnique value counts:\n", df.nunique())


Unique value counts:
 artist           3148
firstName          84
gender              2
itemInSession     123
lastName           86
length           3994
level               2
location           63
sessionId         776
song             5190
userId             96
dtype: int64


In [28]:
print("\nUnique value count for group by ['sessionId', 'userId']: ", \
      df.groupby(['sessionId', 'userId']).agg('count').shape[0])


Unique value count for group by ['sessionId', 'userId']:  776


It means that we do not have same sessionIds for distinct users:

['sessionId', 'userId'] group count (776) == Unique values of sessionId (776)

In [29]:
print("\nUnique value count for group by ['artist', 'song']: ", \
      df.groupby(['artist', 'song']).agg('count').shape[0])


Unique value count for group by ['artist', 'song']:  5296


It means that we have some logs that the same songs belong to different artists:

['artist', 'song'] group count (5296) != Unique values of song (5190)

In [30]:
print("\nUnique value count for group by ['firstName', 'lastName']: ", \
      df.groupby(['firstName', 'lastName']).agg('count').shape[0])


Unique value count for group by ['firstName', 'lastName']:  95


We have 2 users whith the same firstName & lastName:

['firstName', 'lastName'] group count (95) != Unique values of userId (96)

In [31]:
print("\nUnique value count for group by ['sessionId', 'itemInSession']: ", \
      df.groupby(['sessionId', 'itemInSession']).agg('count').shape[0])


Unique value count for group by ['sessionId', 'itemInSession']:  6820


It means combination of ['sessionId', 'itemInSession'] can be uniquely identify all the rows.

['firstName', 'lastName'] group count (6820) == Total Row count

In [32]:
print("\nUnique value count for group by ['song', 'firstName', 'lastName']: ", \
      df.groupby(['song', 'firstName', 'lastName']).agg('count').shape[0])


Unique value count for group by ['song', 'firstName', 'lastName']:  6618


In [33]:
df2 = df.sort_values(['firstName', 'lastName']).groupby(['song', 'firstName', 'lastName'])
df2.head(20)

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
826,Explosions In The Sky,Adelyn,F,0,Jordan,497.47546,free,"Chicago-Naperville-Elgin, IL-IN-WI",458,Your Hand In Mine,7
4296,Method Man,Adelyn,F,1,Jordan,204.64281,free,"Chicago-Naperville-Elgin, IL-IN-WI",391,The Motto,7
4297,The Stanley Brothers,Adelyn,F,2,Jordan,179.69587,free,"Chicago-Naperville-Elgin, IL-IN-WI",391,I'm A Man Of Constant Sorrow,7
4298,Dexter Freebish,Adelyn,F,3,Jordan,210.54649,free,"Chicago-Naperville-Elgin, IL-IN-WI",391,Deeper,7
6600,Static-X,Adelyn,F,0,Jordan,183.69261,free,"Chicago-Naperville-Elgin, IL-IN-WI",6,Dirthouse (Album Version),7
357,Finger Eleven,Adler,M,1,Barrera,259.68281,free,"New York-Newark-Jersey City, NY-NJ-PA",198,Obvious Heart,100
358,Shaggy,Adler,M,2,Barrera,212.97587,free,"New York-Newark-Jersey City, NY-NJ-PA",198,Intoxication,100
360,Klaus Doldinger's Passport,Adler,M,3,Barrera,460.43383,free,"New York-Newark-Jersey City, NY-NJ-PA",198,Fairy Tale,100
438,Jem,Adler,M,0,Barrera,242.18077,free,"New York-Newark-Jersey City, NY-NJ-PA",301,Amazing Life,100
439,Jagged Edge featuring Run of Run DMC,Adler,M,1,Barrera,248.37179,free,"New York-Newark-Jersey City, NY-NJ-PA",301,Let's Get Married,100


In [34]:
## Query 3: Give me every user name (first and last) in my music app history 
## who listened to the song 'All Hands Against His Own'

print("\nUnique value count for group by ['song', 'sessionId', 'itemInSession']: ", \
      df.groupby(['song', 'sessionId', 'itemInSession']).agg('count').shape[0])


Unique value count for group by ['song', 'sessionId', 'itemInSession']:  6820


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Table Defitions

In [35]:
column_definitions = {
    "artist": {"line_order": 0, "type": "text"},
    "first_name": {"line_order": 1, "type": "text"},
    "gender": {"line_order": 2, "type": "text"},
    "item_in_session": {"line_order": 3, "type": "int"},
    "last_name": {"line_order": 4, "type": "text"},
    "length": {"line_order": 5, "type": "float"},
    "level": {"line_order": 6, "type": "text"},
    "location": {"line_order": 7, "type": "text"},
    "session_id": {"line_order": 8, "type": "int"},
    "song": {"line_order": 9, "type": "text"},
    "user_id": {"line_order": 10, "type": "int"}
}

#### Creating a Cluster

In [36]:
from cassandra.cluster import Cluster
cluster = Cluster()
session = cluster.connect()

#### Create Keyspace

In [37]:
# Create a Keyspace 
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS music_app_project 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [38]:
# Set KEYSPACE to the keyspace specified above
try:
    session.set_keyspace('music_app_project')
except Exception as e:
    print(e)

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




In [39]:
## We need to prepare a table for the following kind of queries:

## Query 1:  Give me the artist, song title and song's length in the music app history that was heard during \
## sessionId = 338, and itemInSession = 4

# On this query where clause values can be changed but we want mainly song related data. 
# Partition key: session_id
# Clustering key: item_in_session
# Group of both matches all rows as I described above.

table_1_definitions = {
    "table_name": 'songs_log_data',
    "fields": ['session_id', 'item_in_session', 'artist', 'length', 'song'],
    "partition_keys": ['session_id'],
    "clustering_keys": ['item_in_session']
}

query_1 = f"select artist, song, length from {table_1_definitions['table_name']} WHERE session_id=338 and item_in_session=4"

In [40]:
# Drop Table
def drop_table(table_name):
    query = f"DROP TABLE {table_name}"
    try:
        rows = session.execute(query)
    except Exception as e:
        print(e)

In [41]:
# drop_table(table_1_definitions['table_name'])

In [42]:
def get_create_table_query(table_definition):
    # Get Create Table cQL
    query_create = f"CREATE TABLE IF NOT EXISTS {table_definition['table_name']} "
    table_columns = [f"{field} {column_definitions[field]['type']}" for field in table_definition['fields']]
    query_create = query_create + \
                   f"({', '.join(table_columns)}, " + \
                   f"PRIMARY KEY (({', '.join(table_definition['partition_keys'])}), " + \
                                 f"{', '.join(table_definition['clustering_keys'])}))"
    return query_create


def create_table(table_definition):
    # Create Table
    query_create = get_create_table_query(table_definition)
    try:
        session.execute(query_create)
    except Exception as e:
        print(e)

In [43]:
create_table(table_1_definitions)

In [44]:
def get_field_value(field, line):
    # Get field value by using filed defitions
    column_def = column_definitions[field]
    if column_def['type'] == 'text':
        return line[column_def['line_order']]
    elif column_def['type'] == 'int':
        return int(line[column_def['line_order']])
    elif column_def['type'] == 'float':
        return float(line[column_def['line_order']])
    
    raise NotImplementedError("Undefined Field Type")

def insert_into_table(table_definition):
    # Insert logs to table
    fields = table_definition['fields']
    query = f"INSERT INTO {table_definition['table_name']} ({', '.join(fields)})" + \
            f" VALUES ({', '.join(['%s'] * len(fields))})"

    with open(event_datafile, 'r', encoding='utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader)
        for line in csvreader:
            session.execute(query, tuple([get_field_value(field, line) for field in fields]))


In [45]:
insert_into_table(table_1_definitions)

In [46]:
def print_table_row_count(table_name):
    # Print Table Row Count
    try:
        rows = session.execute(f"select count(*) from {table_name}")
    except Exception as e:
        print(e)

    for row in rows:
        print (f"Table {table_name} has: {str(row)}")

In [47]:
print_table_row_count(table_1_definitions['table_name'])

Table songs_log_data has: Row(count=6820)


#### Do a SELECT to verify that the data have been inserted into each table

In [48]:
def execute_query(query_to_execute):
    try:
        rows = session.execute(query_to_execute)
    except Exception as e:
        print(e)

    for row in rows:
        print (row)

In [49]:
execute_query(query_1)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=495.30731201171875)


### COPY AND REPEAT THE ABOVE THREE CELLS FOR EACH OF THE THREE QUESTIONS

In [50]:
## Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)
## for userid = 10, sessionid = 182

# On this query where clause values can be changed but we want mainly songs & users related data. 
# Partition key: session_id
# Composite clustering key is: (user_id, item_in_session, first_name, last_name)
#   Since user_id is required on where clause we need to put it in "clustering columns"

table_2_definitions = {
    "table_name": 'users_and_songs_log_data',
    "fields": ['user_id', 'session_id', 'item_in_session', \
               'artist', 'first_name', 'last_name', 'gender', 'length', 'level', 'location', 'song'],
    "partition_keys": ['user_id', 'session_id'],
    "clustering_keys": ['item_in_session']
}

query_2 = f"select artist, song, first_name, last_name from {table_2_definitions['table_name']} \
              WHERE user_id=10 and session_id=182"
                    

In [51]:
# drop_table(table_2_definitions['table_name'])

In [52]:
create_table(table_2_definitions)

In [53]:
insert_into_table(table_2_definitions)

In [54]:
print_table_row_count(table_2_definitions['table_name'])

Table users_and_songs_log_data has: Row(count=6820)


In [55]:
execute_query(query_2)

Row(artist='Down To The Bone', song="Keep On Keepin' On", first_name='Sylvie', last_name='Cruz')
Row(artist='Three Drives', song='Greece 2000', first_name='Sylvie', last_name='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', first_name='Sylvie', last_name='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', first_name='Sylvie', last_name='Cruz')


In [56]:
## Query 3: Give me every user name (first and last) in my music app history 
## who listened to the song 'All Hands Against His Own'

# On this query where clause value can be changed but we want mainly user and listened music related data. 
# Partition key: song
# Composite clustering key is: (first_name, last_name)


table_3_definitions = {
    "table_name": 'users_song_log_data',
    "fields": ['song', 'session_id', 'item_in_session', \
               'first_name', 'last_name', 'user_id', 'artist', 'gender', 'length', 'level', 'location'],
    "partition_keys": ['song'],
    "clustering_keys": ['session_id', 'item_in_session']
}

query_3 = f"select first_name, last_name from {table_3_definitions['table_name']} \
              WHERE song='All Hands Against His Own'"                    

In [57]:
# drop_table(table_3_definitions['table_name'])

In [58]:
create_table(table_3_definitions)

In [59]:
insert_into_table(table_3_definitions)

In [60]:
print_table_row_count(table_3_definitions['table_name'])

Table users_song_log_data has: Row(count=6820)


In [61]:
execute_query(query_3)

Row(first_name='Sara', last_name='Johnson')
Row(first_name='Jacqueline', last_name='Lynch')
Row(first_name='Tegan', last_name='Levine')


### Drop the tables before closing out the sessions

In [254]:
## TO-DO: Drop the table before closing out the sessions

In [63]:
drop_table(table_1_definitions['table_name'])
drop_table(table_2_definitions['table_name'])
drop_table(table_3_definitions['table_name'])

### Close the session and cluster connection¶

In [64]:
session.shutdown()
cluster.shutdown()