# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [26]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [27]:
# checking your current working directory
print("Current directory:" + os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# list of all CSV files to be consumed and exported to Cassandra
file_path_list = []

# Create a for loop to create a list of files and collect each filepath
for file in glob.glob(os.path.join(filepath, '*')):
    if file.endswith(".csv"):
        file_path_list.append(file)
        
print("CSV files to be consumed:")
for f in file_path_list:
    print(f)

Current directory:/home/workspace
CSV files to be consumed:
/home/workspace/event_data/2018-11-30-events.csv
/home/workspace/event_data/2018-11-23-events.csv
/home/workspace/event_data/2018-11-22-events.csv
/home/workspace/event_data/2018-11-29-events.csv
/home/workspace/event_data/2018-11-11-events.csv
/home/workspace/event_data/2018-11-14-events.csv
/home/workspace/event_data/2018-11-20-events.csv
/home/workspace/event_data/2018-11-15-events.csv
/home/workspace/event_data/2018-11-05-events.csv
/home/workspace/event_data/2018-11-28-events.csv
/home/workspace/event_data/2018-11-25-events.csv
/home/workspace/event_data/2018-11-16-events.csv
/home/workspace/event_data/2018-11-18-events.csv
/home/workspace/event_data/2018-11-24-events.csv
/home/workspace/event_data/2018-11-04-events.csv
/home/workspace/event_data/2018-11-19-events.csv
/home/workspace/event_data/2018-11-26-events.csv
/home/workspace/event_data/2018-11-12-events.csv
/home/workspace/event_data/2018-11-27-events.csv
/home/wor

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [28]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:
    # reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        # extracting each data row one by one and append it        
        for line in csvreader:
            full_data_rows_list.append(line) 
            
# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

# create file event_datafile_new with data from all the CSV files in file_path_list
with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [29]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print("Number of rows in event_datafile_new.csv:", sum(1 for line in f))

Number of rows in event_datafile_new.csv: 6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [30]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspace

In [31]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [32]:
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

#### Utility function to run queries

In [33]:
def run_query(session, query):
    """
    Runs CQL query in an open Cassandra session
    """
    try:
        session.execute(query)
    except Exception as e:
        print(e)

### Creating tables

#### Table 1: Create table songplays_by_session_item
This table allows us to retrieve data such as query 1 in the requirements: "Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4".

This table provides artist, song, song length information to be retrieved by session_id or item_in_session.

Data Modeling decisions explained:
- Columns item_in_session and session_id are part of the primary key because a combination of (session_id, item_in_session) identifies a unique row
- `session_id` and `item_in_session` need to be part of the WHERE clause in our CQL queries so they are both part of the primary key
- There is a hierarchy in the data in this order (from the least to the most granular): user > session > item_in_session 
- The relationship between `session_id` and `user_id` is "there is only one user for one given session and there are many sessions for one given user"
- `session_id` is the partition key because it's highest level of granularity we will have in our WHERE clauses. As we'll always query by session_id and item_in_session, it's sufficient to have data partitioned by session_id. In other words, all the data for a given session will be part of one partition.




In [1]:
# Table 1: Create table songplays_by_session_item: allows 
query = "CREATE TABLE IF NOT EXISTS songplays_by_session_item"
query = query + """
(
    session_id int, 
    item_in_session int, 
    artist text, 
    song text, 
    length decimal, 
    PRIMARY KEY (session_id, item_in_session) 
)
"""
run_query(session, query)

NameError: name 'run_query' is not defined

#### Table 2: Create table songplays_by_user_session
This table allows us to retrieve data such as query 2 in the requirements: "Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182".


This table provides artist, song, first and last name of the user for a given user and session (specified by `user_id` and `session_id`)

The relationship between `session_id` and `user_id` is "there is only one user for one given session and there are many sessions for one given user". 

Data modeling decisions explained:
- Column `user_id` is part of the primary because we need `user_id` to be part of the WHERE clause.
- There is a hierarchy in the data in this order (from the least to the most granular): user > session > item_in_session
- The relationship between `session_id` and `user_id` is "there is only one user for one given session and there are many sessions for one given user"
- Column `user_id` is the partition key here as it is the least granular (i.e. highest level) field we need in our WHERE clause. Partitioning data by user is interesting as we'll reach the desired partition simply by specifying one user_id.
- Columns `item_in_session` and `session_id` are part of the primary key because a combination of (`session_id`, `item_in_session`) identifies a unique row 
- Column `item_in_session` is a clustering key because we need results sorted by `item_in_session` 

In [49]:
# Table 2: Create table songplays_by_user_session
query = "CREATE TABLE IF NOT EXISTS songplays_by_user_session"
query = query + """
(
    user_id int, 
    session_id int, 
    item_in_session int, 
    artist text, 
    song text,
    first_name text, 
    last_name text, 
    PRIMARY KEY ((user_id, session_id), item_in_session)
)"""
run_query(session, query)

#### Table 3: Create table users_by_song
Allows users to retrieve data such as in query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

Data Modeling decisions explained:
- Column `song` is the partition key because that's the only field in our WHERE clause. 
- We also need columns `first_name` and `last_name` because they need to be retrieved and a `song` would not be unique. In other words, one song may have been heard by multiple users so we need to reflect that in our primary key.

In [50]:
# Create table users_by_song
query = "CREATE TABLE IF NOT EXISTS users_by_song"
query = query + "(song text, user_id int, first_name text, last_name text, PRIMARY KEY (song, user_id))"
run_query(session, query)


## Populating tables

In [51]:
# We have provided part of the code to set up the CSV file. Please complete the Apache Cassandra code below#
file = 'event_datafile_new.csv'

# column indexes
ARTIST_COLUMN_INDEX = 0
FIRST_NAME_COLUMN_INDEX = 1
ITEMINSESSION_COLUMN_INDEX = 3
LAST_NAME_COLUMN_INDEX = 4
LENGTH_COLUMN_INDEX = 5
SESSIONID_COLUMN_INDEX = 8
SONG_COLUMN_INDEX = 9
USERID_COLUMN_INDEX = 10


In [52]:
# Populate table 1: songplays_by_session_item
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO songplays_by_session_item (session_id, item_in_session, artist, song, length) "
        query = query +  " VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (
                                int(line[SESSIONID_COLUMN_INDEX]), 
                                int(line[ITEMINSESSION_COLUMN_INDEX]),
                                line[ARTIST_COLUMN_INDEX], 
                                line[SONG_COLUMN_INDEX], 
                                float(line[LENGTH_COLUMN_INDEX])
                               ))

In [53]:
# Populate table 2: songplays_by_user_session
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO songplays_by_user_session (user_id, session_id, item_in_session, artist, song, first_name, last_name) "
        query = query +  " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (
                                int(line[USERID_COLUMN_INDEX]), 
                                int(line[SESSIONID_COLUMN_INDEX]), 
                                int(line[ITEMINSESSION_COLUMN_INDEX]),
                                line[ARTIST_COLUMN_INDEX], 
                                line[SONG_COLUMN_INDEX], 
                                line[FIRST_NAME_COLUMN_INDEX], 
                                line[LAST_NAME_COLUMN_INDEX]
                               ))

In [54]:
# Populate table 3: users_by_song
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO users_by_song (song, user_id, first_name, last_name) "
        query = query +  " VALUES (%s, %s, %s, %s)"
        session.execute(query, (
                                line[SONG_COLUMN_INDEX],
                                int(line[USERID_COLUMN_INDEX]),
                                line[FIRST_NAME_COLUMN_INDEX], 
                                line[LAST_NAME_COLUMN_INDEX]                               
                               ))

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


In [55]:
query = "SELECT artist, song, length FROM songplays_by_session_item WHERE session_id=338 AND item_in_session=4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

data = {'artist': [], 'song': [], 'length': []} 
for row in rows:
    data['artist'].append(row.artist)
    data['song'].append(row.song) 
    data['length'].append(row.length)   
    
# visualize results as a pandas dataframe    
pd.DataFrame(data)

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.3073


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [56]:
query = "SELECT artist, song, first_name, last_name FROM songplays_by_user_session WHERE user_id=10 AND session_id=182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
data = {'artist': [], 'song': [], 'first_name': [], 'last_name': []} 

for row in rows:
    data['artist'].append(row.artist)
    data['song'].append(row.song) 
    data['first_name'].append(row.first_name)
    data['last_name'].append(row.last_name)       

# visualize results as a pandas dataframe
pd.DataFrame(data)    

Unnamed: 0,artist,song,first_name,last_name
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'


In [57]:
query = "SELECT song, user_id, first_name, last_name FROM users_by_song WHERE song='All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
data = {'song': [], 'user_id': [], 'first_name': [], 'last_name': []} 

for row in rows:
    data['song'].append(row.song) 
    data['user_id'].append(row.user_id)     
    data['first_name'].append(row.first_name)
    data['last_name'].append(row.last_name)    
    
# visualize results as a pandas dataframe
pd.DataFrame(data)    

Unnamed: 0,song,user_id,first_name,last_name
0,All Hands Against His Own,29,Jacqueline,Lynch
1,All Hands Against His Own,80,Tegan,Levine
2,All Hands Against His Own,95,Sara,Johnson


### Drop the tables before closing out the sessions

In [59]:
tables = ['songplays_by_session_item', 'songplays_by_user_session', 'users_by_song']
for t in tables:
    print('Dropping table', t)
    run_query(session, "DROP TABLE " + t)


Dropping table songplays_by_session_item
Error from server: code=2200 [Invalid query] message="unconfigured table songplays_by_session_item"
Dropping table songplays_by_user_session
Error from server: code=2200 [Invalid query] message="unconfigured table songplays_by_user_session"
Dropping table users_by_song
Error from server: code=2200 [Invalid query] message="unconfigured table users_by_song"


### Close the session and cluster connection¶

In [25]:
session.shutdown()
cluster.shutdown()