# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
filepath ='D:\DEND\sparkifyNOSQL-DB\eventData'

for root, dirs, files in os.walk(filepath):

    file_path_list = glob.glob(os.path.join(root,'*'))
    print(file_path_list)

['D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-01-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-02-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-03-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-04-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-05-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-06-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-07-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-08-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-09-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-10-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-11-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-12-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-13-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-14-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\\2018-11-15-events.csv', 'D:\\DEND\\sparkifyNOSQL-DB\\eventData\

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
full_data_rows_list = [] 
    
for f in file_path_list:

    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            full_data_rows_list.append(line) 
            
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
            'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


#### Creating a Cluster

In [5]:

from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'],port=19042, control_connection_timeout=10)

session = cluster.connect()

session.hosts

[<Host: 127.0.0.1:19042 datacenter1>]

#### Create Keyspace

In [6]:
"""
    The keyspace is same as a collection for Appache cassandra. 
"""
try:
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS sparkifydatabase
        WITH REPLICATION = 
        {'class' : 'SimpleStrategy', 'replication_factor': 1}
    """)
except Exception as e:
    print (e)

#### Set Keyspace

In [7]:

try:
    session.set_keyspace('sparkifydatabase')
except Exception as e:
    print (e)

## Create queries to ask the following three questions of the data



### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


This table has 5 columns session ID, item_in_session are integer attributes 
    and they both are primary keys, artist and song title are text attributes and
    a single float attribute that represent the duration of the song.  


In [8]:
create_table_1 = "CREATE TABLE IF NOT EXISTS song_info_by_SID_AND_UID"
create_table_1 = create_table_1 + (""" (session_id INT, item_in_session INT, artist TEXT, song_title TEXT,
                                        song_length FLOAT, PRIMARY KEY (session_id, item_in_session)
                                    )""")
try:
    session.execute(create_table_1)
except Exception as e:
    print(e)

We will use this file as a source file, and load into the table song_info_by_SID_AND_UID

In [9]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO song_info_by_SID_AND_UID(session_id, item_in_session, artist, song_title, song_length)"
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

#### Do a SELECT to verify that the data have been inserted into each table

This is the select query that will answer the first question, based on the session_ID and item_in_session that is mentioned in the question and return the artist name and song title and song duration.

In [10]:
select_query_1 = """ SELECT artist, song_title, song_length FROM song_info_by_SID_AND_UID
                     WHERE session_id = %s 
                     AND item_in_session = %s 
                    """                
try:
    rows = session.execute(select_query_1, (338, 4))
except Exception as e:
    print(e)
    
for row in rows:
    print("Artist: "+row.artist, "\nSong Title: "+row.song_title, "\nSong Length: "+str(row.song_length))

Artist: Faithless 
Song Title: Music Matters (Mark Knight Dub) 
Song Length: 495.30731201171875


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

To answer this question we will need to obtain (select) the artist name, song name, user first and last name from out table, we will need to filter by userId and sessionId, and order by itemInSession.

In [11]:
create_table_2 = "CREATE TABLE IF NOT EXISTS artist_info_by_SID_AND_UID"
create_table_2 = create_table_2 + (""" (user_id INT, session_id INT, item_in_session INT, 
                                        artist_name TEXT, song_title TEXT, user_first_name TEXT, 
                                        user_last_name TEXT, PRIMARY KEY ((user_id, session_id), item_in_session)
                )""")
try:
    session.execute(create_table_2)
except Exception as e:
    print(e)  

We will use this file as a source file, and load into the table artist_info_by_SID_AND_UID


In [12]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = "INSERT INTO artist_info_by_SID_AND_UID(user_id, session_id, item_in_session, artist_name, \
                                            song_title, user_first_name, user_last_name)"
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

This is the select query that will answer the second question 
    and return the artist name and song title and user first and last.

In [13]:
select_query_2 = """ SELECT artist_name, song_title, user_first_name, user_last_name FROM artist_info_by_SID_AND_UID
                     WHERE user_id = %s
                     AND session_id = %s
                     ORDER BY item_in_session ASC
                    """        
try:
    rows = session.execute(select_query_2, (10, 182))
except Exception as e:
    print(e)
    
for row in rows:
    print("Artist: "+row.artist_name, "\nSong Title: "+row.song_title, "\nUser First Name: "+row.user_first_name, "\nUser Last Name: "+row.user_last_name)

Artist: Down To The Bone 
Song Title: Keep On Keepin' On 
User First Name: Sylvie 
User Last Name: Cruz
Artist: Three Drives 
Song Title: Greece 2000 
User First Name: Sylvie 
User Last Name: Cruz
Artist: Sebastien Tellier 
Song Title: Kilometer 
User First Name: Sylvie 
User Last Name: Cruz
Artist: Lonnie Gordon 
Song Title: Catch You Baby (Steve Pitron & Max Sanna Radio Edit) 
User First Name: Sylvie 
User Last Name: Cruz


### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'


This table consists of four columns first column is song title and it's our filter column also, then the rest of columns are the user who listened to the song information.   

In [14]:
create_table_3 = "CREATE TABLE IF NOT EXISTS user_info_by_sonyTitle"
create_table_3 = create_table_3 + (""" (song_title TEXT, user_id INT, user_first_name TEXT, user_last_name TEXT, 
                                        PRIMARY KEY (song_title, user_id)
                                        )""")
try:
    session.execute(create_table_3)
except Exception as e:
    print(e)
                    

We will use this file as a source file, and load into the table user_info_by_sonyTitle

In [15]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = "INSERT INTO user_info_by_sonyTitle(song_title, user_id, user_first_name, user_last_name)"
        query = query + "VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

This is the select query that will answer the third question and return the user first and last.

In [16]:
select_query_3 = "SELECT user_first_name, user_last_name FROM user_info_by_sonyTitle WHERE song_title = %s;"
try:
    rows = session.execute(select_query_3, ('All Hands Against His Own'))
except Exception as e:
    print(e)

for row in rows:
    print("User First Name: "+row.user_first_name, "\nUser Last Name: "+row.user_last_name)

not all arguments converted during string formatting


### Drop the tables before closing out the sessions

In [17]:
# session.execute("DROP TABLE IF EXISTS song_details")
# session.execute("DROP TABLE IF EXISTS artist_details")
# session.execute("DROP TABLE IF EXISTS user_details")

### Close the session and cluster connectionÂ¶

In [18]:
# session.shutdown()
# cluster.shutdown()