# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [7]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
import pandas as pd

#### Creating list of filepaths to process original event csv data files

In [2]:
print(os.getcwd())
filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))
    print(file_path_list)

/home/workspace
['/home/workspace/event_data/2018-11-27-events.csv', '/home/workspace/event_data/2018-11-04-events.csv', '/home/workspace/event_data/2018-11-07-events.csv', '/home/workspace/event_data/2018-11-09-events.csv', '/home/workspace/event_data/2018-11-19-events.csv', '/home/workspace/event_data/2018-11-05-events.csv', '/home/workspace/event_data/2018-11-22-events.csv', '/home/workspace/event_data/2018-11-16-events.csv', '/home/workspace/event_data/2018-11-26-events.csv', '/home/workspace/event_data/2018-11-24-events.csv', '/home/workspace/event_data/2018-11-29-events.csv', '/home/workspace/event_data/2018-11-15-events.csv', '/home/workspace/event_data/2018-11-20-events.csv', '/home/workspace/event_data/2018-11-06-events.csv', '/home/workspace/event_data/2018-11-18-events.csv', '/home/workspace/event_data/2018-11-21-events.csv', '/home/workspace/event_data/2018-11-10-events.csv', '/home/workspace/event_data/2018-11-23-events.csv', '/home/workspace/event_data/2018-11-02-events.c

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
full_data_rows_list = [] 
for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
             
        for line in csvreader:
            full_data_rows_list.append(line) 
            
# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [22]:
from cassandra.cluster import Cluster
import pandas as pd
cluster = Cluster()
session = cluster.connect()

#### Create Keyspace

In [23]:
query = """
    CREATE KEYSPACE IF NOT EXISTS sparkify
    WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}
"""
session.execute(query)

<cassandra.cluster.ResultSet at 0x7f52bc2aa128>

#### Set Keyspace

In [24]:
session.set_keyspace('sparkify')

# Query 1 Table:
* session_id
* item_in_session
* artist_name
* song_title
* song_length
* primary key (session_id and item_in_session)

This table is going to answer what artist, song and length of the song were listened on X session in the app history, so that, we use session_id and item_in_session as the primary key to be able to not duplicate the information and to differentiate between sessions.

### Creating the table

In [25]:
query = """
    create table if not exists sessions (
        session_id int,
        item_in_session int,
        artist_name text,
        song_title text,
        song_length float,
        PRIMARY KEY(session_id, item_in_session))
"""
session.execute(query)

<cassandra.cluster.ResultSet at 0x7f52bc2ab7f0>

### Inserting the data

In [26]:

file  = 'event_datafile_new.csv'
df_event = pd.read_csv('event_datafile_new.csv')
df_event.head(3)
for i, row in df_event.iterrows():
    row = list(row)
    query = """insert into sessions (
                                    session_id,
                                    item_in_session,
                                    artist_name,
                                    song_title,
                                    song_length)
    """
    query = query + "values (%s, %s, %s, %s, %s)"
    session_id = row[8]
    item_in_session = row[3]
    artist_name = row[0]
    song_title = row[9]
    song_length = row[5]
    
    session.execute(query, (int(session_id), 
                            int(item_in_session),
                            artist_name, song_title, 
                            float(song_length)))

### Querying for a specific case where session_id = 338 and item_in_session = 4

In [27]:
query_rows = session.execute('select artist_name, song_title, song_length from sessions where session_id = 338 and item_in_session = 4')
for row in query_rows:
    print(row.artist_name + ' |', row.song_title + ' |', row.song_length)                    

Faithless | Music Matters (Mark Knight Dub) | 495.30731201171875


### Querying for a general SELECT In order to check if the data was uploaded succesfully

In [28]:
query_rows = session.execute('select artist_name, song_title, song_length from sessions')
for row in query_rows:
    print(row.artist_name + ' |', row.song_title + ' |', row.song_length)                    

Regina Spektor | The Calculation (Album Version) | 191.08526611328125
Octopus Project | All Of The Champs That Ever Lived | 250.95791625976562
Tegan And Sara | So Jealous | 180.06158447265625
Dragonette | Okay Dolores | 153.39056396484375
Lil Wayne / Eminem | Drop The World | 229.58975219726562
Soulja Boy Tell'em | Let Me Get Em | 201.1162872314453
Bodo Wartke | Liebeslied (Sprachen: Deutsch_ Englisch_ FranzÃÂ¶sisch_ Italienisch_ Spanisch_ HollÃÂ¤ndisch_ Japanisch_ Russisch_ Griechisch_ Klingonisch_ Hessisch) | 645.2763061523438
Evanescence | Bring Me To Life | 237.11302185058594
Van Halen | Good Enough | 243.1734161376953
The Academy Is... | Paper Chase (Album Version) | 209.7628173828125
Dwight Yoakam | You're The One | 239.3072967529297
The Far East Movement featuring Wiz Khalifa and Bionik | Lowridin | 265.50811767578125
Amon Amarth | The Hero | 238.5236358642578
Modest Mouse | The Good Times Are Killing Me | 259.47381591796875
Bad Company | Morning Sun | 246.961181640625
Bodyrox

# Query 2 Table: 
* user_id
* session_id
* artist_name
* song_title
* first_name
* last_name
* primary key (user_id and session_id)

The primary key composed by user_id and session_id were chosen to answer a specific query which answers the question what artist this user listen's to, or who is listening to X artist, in that way we focus our attention in not duplicating user_id and also having session_id as clustering column allow us to know how much does X user listens to Y artist.

### Creating the table

In [29]:
query = """
    create table if not exists user_songs(
        user_id int,
        session_id int,
        artist_name text,
        song_title text,
        first_name text,
        last_name text,
        item_in_session int,
        PRIMARY KEY ((user_id, session_id), item_in_session))
        with clustering order by(item_in_session desc)
"""
session.execute(query)

                    

<cassandra.cluster.ResultSet at 0x7f52bc4ee668>

### Inserting the data

In [None]:
file  = 'event_datafile_new.csv'
df_event = pd.read_csv('event_datafile_new.csv')

for i, row in df_event.iterrows():
    row = list(row)
    query = """insert into user_songs (
                                    user_id,
                                    session_id,
                                    artist_name,
                                    song_title,
                                    first_name,
                                    last_name,
                                    item_in_session)
    """
    query = query + "values (%s, %s, %s, %s, %s, %s, %s)"
    user_id = row[10]
    session_id = row[8]
    artist_name = row[0]
    song_title = row[9]
    first_name = row[1]
    last_name = row[4]
    item_in_session = int(row[3])
    
    session.execute(query, (int(user_id),
                            int(session_id),
                            artist_name,
                            song_title,
                            first_name,
                            last_name,
                            int(item_in_session)))

### Querying for a specific case where user_id = 10 and session_id = 182

In [12]:
query_rows = session.execute('select artist_name, song_title, first_name, last_name from user_songs where user_id = 10 and session_id = 182')
for row in query_rows:
    print(row.artist_name + ' |', row.song_title + ' |', row.first_name, row.last_name)  

Lonnie Gordon | Catch You Baby (Steve Pitron & Max Sanna Radio Edit) | Sylvie Cruz
Sebastien Tellier | Kilometer | Sylvie Cruz
Three Drives | Greece 2000 | Sylvie Cruz
Down To The Bone | Keep On Keepin' On | Sylvie Cruz


### Querying for a general SELECT In order to check if the data was uploaded succesfully

In [13]:
query_rows = session.execute('select artist_name, song_title, first_name, last_name from user_songs')
for row in query_rows:
    print(row.artist_name + ' |', row.song_title + ' |', row.first_name, row.last_name)                    

Evergreen Terrace | Zero | Emily Benson
Ghostland Observatory | Stranger Lover | Emily Benson
System of a Down | Sad Statue | Emily Benson
Denise Jannah | 'Round Midnight | Kinsley Young
Common / Vinia Mojica / Roy Hargrove / Femi Kuti | Time Traveling (A Tribute To Fela) | Kinsley Young
Machine Head | Halo (Explicit Album Version) | Kinsley Young
Karsh Kale | Break of Dawn | Kinsley Young
Abydos | Green's Guidance For A Stategy Adventure Game | Kinsley Young
Robert Calvert | The Making Of Midgard (2007 Digital Remaster) | Kinsley Young
Nine Inch Nails | Dead Souls (LP Version) | Kinsley Young
Yonder Mountain String Band | Boatman's Dance | Kinsley Young
Barry Tuckwell/Academy of St Martin-in-the-Fields/Sir Neville Marriner | Horn Concerto No. 4 in E flat K495: II. Romance (Andante cantabile) | Kinsley Young
Jesse Cook | Byzantium Underground | Kinsley Young
Leona Lewis | Forgive Me | Kinsley Young
Rufus Wainwright | Hallelujah | Kinsley Young
Base Ball Bear | Sayonara-Nostalgia | Kins

# Query 3 Table: 
* user_id
* song_title
* first_name
* last_name
* primary key (song_title and user_id)

This table will answer every user that has listened X song or songs, it actually can be useful to model some clustering algorithm for recommendations, so that our partition key starts with song_title to distribute the data across the nodes and user_id to let us know which user listened X song.

### Creating the table

In [16]:
query = """
    create table if not exists songplays(
        user_id int,
        song_title text,
        first_name text,
        last_name text,
        PRIMARY KEY (song_title, user_id))
"""
session.execute(query)                

<cassandra.cluster.ResultSet at 0x7f52bc444860>

### Inserting the data

In [17]:
file  = 'event_datafile_new.csv'
df_event = pd.read_csv('event_datafile_new.csv')
for i, row in df_event.iterrows():
    row = list(row)
    query = """insert into songplays (
                                    user_id,
                                    song_title,
                                    first_name,
                                    last_name)
    """
    query = query + "values (%s, %s, %s, %s)"
    user_id = row[10]
    song_title = row[9]
    first_name = row[1]
    last_name = row[4]
    
    session.execute(query, (int(user_id),
                            song_title,
                            first_name,
                            last_name))

### Querying for a specific case where song_title  = All Hands Against His Own

In [18]:
query_rows = session.execute("select first_name, last_name, song_title from songplays where song_title = 'All Hands Against His Own'")
for row in query_rows:
    print(row.first_name, row.last_name + ' | ', row.song_title)  

Jacqueline Lynch |  All Hands Against His Own
Tegan Levine |  All Hands Against His Own
Sara Johnson |  All Hands Against His Own


### Querying for a general SELECT In order to check if the data was uploaded succesfully

In [19]:
query_rows = session.execute('select first_name, last_name, song_title from songplays')
for row in query_rows:
    print(row.first_name, row.last_name + ' | ', row.song_title)                    

Chloe Cuevas |  Wonder What's Next
Chloe Cuevas |  In The Dragon's Den
Aleena Kirby |  Too Tough (1994 Digital Remaster)
Chloe Cuevas |  Rio De Janeiro Blue (Album Version)
Lily Koch |  My Place
Jacob Klein |  My Place
Layla Griffin |  The Lucky Ones
Tegan Levine |  I Want You Now
Mohammad Rodriguez |  Why Worry
Kate Harrell |  TvÃÂ¡rÃÂ­ v TvÃÂ¡r
Kinsley Young |  Lord Chancellor's Nightmare Song
Jayden Graves |  Misfit Love
Sara Johnson |  Eat To Live (Amended Version)
Wyatt Scott |  Hey_ Soul Sister
Ryan Smith |  Hey_ Soul Sister
Carlos Carter |  Hey_ Soul Sister
Jacqueline Lynch |  Hey_ Soul Sister
Harper Barrett |  Hey_ Soul Sister
Aleena Kirby |  Hey_ Soul Sister
Hayden Brock |  Hey_ Soul Sister
Jacob Klein |  Hey_ Soul Sister
Magdalene Herman |  Hey_ Soul Sister
Tegan Levine |  Hey_ Soul Sister
Kate Harrell |  Hey_ Soul Sister
Avery Watkins |  You Never Let Go
Cecilia Owens |  I Found That Essence Rare
Kate Harrell |  My Missing
Chloe Cuevas |  To Them These Streets Belong
Kate

### Drop the tables before closing out the sessions

In [20]:
session.execute('drop table sessions')
session.execute('drop table user_songs')
session.execute('drop table songplays')

InvalidRequest: Error from server: code=2200 [Invalid query] message="unconfigured table sessions"

### Close the session and cluster connection¶

In [61]:
session.shutdown()
cluster.shutdown()