# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [2]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [6]:
# checking my current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
#     print(file_path_list)

E:\Projects\data_engineer_nanodegree\data_modeling_with_cassandra


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [7]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [8]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now we are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [9]:
from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, we need a session
session = cluster.connect()

#### Create Keyspace

In [10]:
# Creating a Keyspace named sparkify
try: session.execute(""" 
                         create keyspace if not exists sparkify 
                         with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 1}
                     """)
except Exception as e:
                     print(e)

#### Set Keyspace

In [11]:
# Setting KEYSPACE to sparkify in order to use it
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print(e)

### Query #1 
### Give me the artist, song title and song's length in the music app history
### that was heard during  sessionId = 338, and itemInSession  = 4

#### To answer this question we will need to get the artist name, song title, and song length 
#### from our table and we will need to filter by sessionId and itemInSession 
#### In CQL our query looks like:

``` SELECT artist, song_title, song_length from sessions_data where sessionId = 338 and itemInSession = 4 ```

- We will name our table sessions_data
- Our primary key will consist of partition key sessionId, and clustering key itemInSession so that we can filter by this attributes later on.
- The columns of our table will be: sessionId, itemInSession, artist, song_title and song_length.

#### the results should look like this:


| |artist   |song_title                     |song_length|
|-|---------|-------------------------------|-----------|
|0|Faithless|Music Matters (Mark Knight Dub)|495.307312 |

In [12]:
session_query = """create table if not exists sessions_data
                       (sessionId int, itemInSession int, artist varchar, song_title varchar, song_length float,
                       PRIMARY KEY (sessionId, itemInSession))"""
try:
    session.execute(session_query)
except Exception as e:
    print(e)

In [13]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "insert into sessions_data (sessionId, itemInSession, artist, song_title, song_length) "
        query = query + "values (%s, %s, %s, %s, %s)"
        
        # indexing items for easy line[]
        # artist_name[0], user_name[1], gender[2], itemInSession[3], user_last_name[4], length[5], level[6], location[7], sessionId[8], song[9], userId[10]
        try:
            session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))
        except Exception as e:
            print(e)

In [14]:
# Query 1:  Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

query = "select artist, song_title, song_length from sessions_data where sessionId = 338 and itemInSession = 4"
try:
    df = pd.DataFrame(list(session.execute(query)))
    print(df)
except Exception as e:
    print(e)

      artist                       song_title  song_length
0  Faithless  Music Matters (Mark Knight Dub)   495.307312


### Query #2
### Give me only the following: name of artist, song (sorted by itemInSession)
### and user (first and last name) for userid = 10, sessionid = 182

#### To answer this question we will need to get the artist name, song name, user name and user lastname from our table, 
#### we will need to filter by userId and sessionId, and order by itemInSession.

In CQL our query looks like:
```
SELECT itemInSession, artist, song_title, first_name, last_name from song_data where user_id = 10
and session_id = 182
```

- We will name our table song_data
- Our primary key will consist of composite partition key user_id, session_id.   
  The reason for this is that if we only use userId as partition key, the session_id which belongs to the same user will be put into different nodes,   
  which will have the performance issue when the volume of data is large.
- Our clustering key will be itemInSession so that our results are ordered by it.
- The columns of our table will be: user_id, session_id, itemInSession, artist, song_title, first_name and last_name.

#### the results should look like tihs:


|iteminsession|artist           |song_title                                          |first_name|last_name|
|-------------|-----------------|----------------------------------------------------|----------|---------|
|0            |Down To The Bone |Keep On Keepin' On                                  |Sylvie    |Cruz     |
|1            |Three Drives     |Greece 2000                                         |Sylvie    |Cruz     |
|2            |Sebastien Tellier|Kilometer                                           |Sylvie    |Cruz     |
|3            |Lonnie Gordon    |Catch You Baby (Steve Pitron & Max Sanna Radio Edit)|Sylvie    |Cruz     |

In [15]:
songs_query = """create table if not exists song_data
                       (user_id int, session_id int, , artist varchar, song_title varchar, first_name varchar, last_name varchar, itemInSession int,
                       PRIMARY KEY ((user_id, session_id), itemInSession))"""
try:
    session.execute(songs_query)
except Exception as e:
    print(e)                    

In [16]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "insert into song_data (user_id, session_id, artist, song_title, first_name, last_name, itemInSession)"
        query = query + "values(%s, %s, %s, %s, %s, %s, %s)"
        # indexing line items for easy line[]
        # artist_name[0], user_name[1], gender[2], itemInSession[3], user_last_name[4], length[5], level[6], location[7], sessionId[8], song[9], userId[10]
        try:
            session.execute(query, (int(line[10]), int(line[8]), line[0], line[9], line[1], line[4], int(line[3])))
        except Exception as e:
                            print(e)

In [17]:
# Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

query = "select itemInSession, artist, song_title, first_name, last_name from song_data where session_id = 182 and user_id = 10"
try:
    df = pd.DataFrame(list(session.execute(query)))
    print(df[['artist', 'song_title', 'first_name', 'last_name']])
except Exception as e:
    print(e)

              artist                                         song_title  \
0   Down To The Bone                                 Keep On Keepin' On   
1       Three Drives                                        Greece 2000   
2  Sebastien Tellier                                          Kilometer   
3      Lonnie Gordon  Catch You Baby (Steve Pitron & Max Sanna Radio...   

  first_name last_name  
0     Sylvie      Cruz  
1     Sylvie      Cruz  
2     Sylvie      Cruz  
3     Sylvie      Cruz  


### Query #3
### Give me every user name (first and last) in my music app history
### who listened to the song 'All Hands Against His Own'

#### To answer this question we will need to get the user first name and last name from our table,   
#### and we will need to filter by song name. 
#### As user name and lastname, in large datasets, are not unique, we will add the column userId to uniquely identify users.

In CQL our query looks like:

```
SELECT first_name, last_name from history_data where song_title = 'All Hands Against His Own'
```
- We will name our table history_data
- Our primary key will consist of partition key song_title, and clustering key user_id. This uniquely identifies our rows.
- The columns of our table will be: song_title, first_name, last_name and user_id.

the results should look like this:


| |first_name|last_name|
|-|----------|---------|
|0|Jacqueline|Lynch    |
|1|Tegan     |Levine   |
|2|Sara      |Johnson  |

In [18]:
user_history_query = """create table if not exists history_data
                       (song_title varchar, first_name varchar, last_name varchar, user_id int,
                       PRIMARY KEY (song_title, user_id))"""
try:
    session.execute(user_history_query)
except Exception as e:
    print(e)

In [19]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "insert into history_data (song_title, first_name, last_name, user_id)"
        query = query + "values(%s, %s, %s, %s)"
        # indexing line items for easy line[]
        # artist_name[0], user_name[1], gender[2], itemInSession[3], user_last_name[4], length[5], level[6], location[7], sessionId[8], song[9], userId[10] = line
        try:
            session.execute(query, (line[9], line[1], line[4], int(line[10])))
        except Exception as e:
            print(e)

In [20]:
# Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

query = "select first_name, last_name from history_data where song_title = 'All Hands Against His Own'"

try:
    df = pd.DataFrame(list(session.execute(query)))
    print(df)
except Exception as e:
    print(e)

   first_name last_name
0  Jacqueline     Lynch
1       Tegan    Levine
2        Sara   Johnson


### Dropping the tables before closing out the sessions

In [21]:
# Drop the table before closing out the sessions
query1 = "drop table sessions_data"
query2 = "drop table song_data"
query3 = "drop table history_data"

session.execute(query1)
session.execute(query2)
session.execute(query3)

<cassandra.cluster.ResultSet at 0xe20e997610>

### Closing the session and cluster connection¶

In [22]:
session.shutdown()
cluster.shutdown()