# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [3]:

import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [4]:
print(os.getcwd())

filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):

    file_path_list = glob.glob(os.path.join(root,'*'))


/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [5]:
full_data_rows_list = [] 
    
for f in file_path_list:

    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        for line in csvreader:
            full_data_rows_list.append(line) 

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [6]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

#### Creating a Cluster

In [7]:
from cassandra.cluster import Cluster
cluster = Cluster()

session = cluster.connect()

#### Create Keyspace

In [8]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [9]:
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)


### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data








### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

#### Since I am looking for a data with specific sessionid and iteminsession, the most efficient way of looking up for this data is to model the data with composite keys 'sessionId' and 'itemInSession'. This way, our nodes will be partitioned by hashe values generated from both  the 'itemInSession' and 'sessionId' and we avoid a full cluster scan. 


In [10]:
query = "CREATE TABLE IF NOT EXISTS app_history_artist_song \
        (sessionId int, itemInSession int, artist text, song text, length float, \
        PRIMARY KEY (sessionId, itemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e)  
                    

In [11]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
       
        query = "INSERT INTO app_history_artist_song (sessionId, itemInSession, artist, song, length)"
        query = query + "VALUES (%s, %s, %s, %s, %s)"
      
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5]), ))

#### Do a SELECT to verify that the data have been inserted into each table

In [12]:
query = "SELECT artist, song, length FROM app_history_artist_song \
         WHERE sessionId = 338 and itemInSession = 4"

try:
    rows = session.execute(query)
except Exception as e:
    print(e) 

for row in rows:
    print("artist: {} | song title: {} | length: {}".format(row.artist, row.song, row.length))

artist: Faithless | song title: Music Matters (Mark Knight Dub) | length: 495.30731201171875


### COPY AND REPEAT THE ABOVE THREE CELLS FOR EACH OF THE THREE QUESTIONS

### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

#### Since I am looking for a data with "specific" sessionid and iteminsession, the most efficient way of looking up for this data is to model the data with composite keys 'sessionId' and 'itemInSession'. This way, our nodes will be partitioned by hashe values generated from both  the 'itemInSession' and 'sessionId' and we avoid a full cluster scan. I have been asked to sort the data based on 'itemInSession' column. This can be done by defining the 'itemInSession' column as a clustering key in the primary key.

#### (partition key_1, partition key_2), clustering key_0, clustering key_1)

#### 'name of artist', song, first_name and last name are is data I have been asked to display. Thus, I choose them in select statement.


In [13]:
query = "CREATE TABLE IF NOT EXISTS music_app_history_session \
        (userId int, sessionId int, itemInSession int, artist text, song text, firstName text, lastName text, \
        PRIMARY KEY ((userId, sessionId), itemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e) 
                    

In [14]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
       
        query = "INSERT INTO music_app_history_session (userId, sessionId, itemInSession, artist, \
                                                        song, firstName, lastName)"
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
       
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

In [15]:
query = "SELECT artist, song, firstName, lastName FROM music_app_history_session \
         WHERE userId = 10 and sessionId = 182 \
         ORDER BY itemInSession DESC"

try:
    rows = session.execute(query)
except Exception as e:
    print(e) 

for row in rows:
    print("artist: {} | song: {} | user name: {} {}".format(
          row.artist, row.song, row.firstname, row.lastname))

artist: Lonnie Gordon | song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit) | user name: Sylvie Cruz
artist: Sebastien Tellier | song: Kilometer | user name: Sylvie Cruz
artist: Three Drives | song: Greece 2000 | user name: Sylvie Cruz
artist: Down To The Bone | song: Keep On Keepin' On | user name: Sylvie Cruz




### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### "...who listened to the song..." this is the key for writing the correct query. 
#### There are two specifications in the request based on which we have to fetch the data.
The most efficient data modelling would be to distribute the data over nodes using both 'user name' and 'song' in order to find the answer to this question as fast as possible.

In [16]:
query = "CREATE TABLE IF NOT EXISTS music_app_history_song \
        (song text, userId int, firstName text, lastName text, \
        PRIMARY KEY (song, userId))"

try:
    session.execute(query)
except Exception as e:
    print(e)  
                    

In [17]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
   
        query = "INSERT INTO music_app_history_song (song, userId, firstName, lastName)"
        query = query + "VALUES (%s, %s, %s, %s)"
     
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

In [18]:
query = "SELECT firstName, lastName FROM music_app_history_song \
         WHERE song = 'All Hands Against His Own'"

try:
    rows = session.execute(query)
except Exception as e:
    print(e) 

for row in rows:
    print("user name: {} {}".format(row.firstname, row.lastname))

user name: Jacqueline Lynch
user name: Tegan Levine
user name: Sara Johnson


### Drop the tables before closing out the sessions

In [19]:
session.execute("DROP TABLE app_history_artist_song")
session.execute("DROP TABLE music_app_history_session")
session.execute("DROP TABLE music_app_history_song")

<cassandra.cluster.ResultSet at 0x7f2610674a90>

### Close the session and cluster connection¶

In [20]:
session.shutdown()
cluster.shutdown()