# Overview

#### Objective: Move the CSVs into a denormalized Apache Cassandra database that we can run queries on.


## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
print(f"Current working directory : {os.getcwd()}") 

filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))
    print(file_path_list)

Current working directory : /home/workspace
['/home/workspace/event_data/2018-11-27-events.csv', '/home/workspace/event_data/2018-11-04-events.csv', '/home/workspace/event_data/2018-11-07-events.csv', '/home/workspace/event_data/2018-11-09-events.csv', '/home/workspace/event_data/2018-11-19-events.csv', '/home/workspace/event_data/2018-11-05-events.csv', '/home/workspace/event_data/2018-11-22-events.csv', '/home/workspace/event_data/2018-11-16-events.csv', '/home/workspace/event_data/2018-11-26-events.csv', '/home/workspace/event_data/2018-11-24-events.csv', '/home/workspace/event_data/2018-11-29-events.csv', '/home/workspace/event_data/2018-11-15-events.csv', '/home/workspace/event_data/2018-11-20-events.csv', '/home/workspace/event_data/2018-11-06-events.csv', '/home/workspace/event_data/2018-11-18-events.csv', '/home/workspace/event_data/2018-11-21-events.csv', '/home/workspace/event_data/2018-11-10-events.csv', '/home/workspace/event_data/2018-11-23-events.csv', '/home/workspace/ev

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
full_data_rows_list = [] 
    
for f in file_path_list:

    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        for line in csvreader:
            full_data_rows_list.append(line) 
            

print(f"Total rows : {len(full_data_rows_list)}")
 
print(f"Sample data:\n {full_data_rows_list[:5]}")

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)


with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))



Total rows : 8056
Sample data:
 [['Barry Tuckwell/Academy of St Martin-in-the-Fields/Sir Neville Marriner', 'Logged In', 'Mohammad', 'M', '0', 'Rodriguez', '277.15873', 'paid', 'Sacramento--Roseville--Arden-Arcade, CA', 'PUT', 'NextSong', '1.54051E+12', '961', 'Horn Concerto No. 4 in E flat K495: II. Romance (Andante cantabile)', '200', '1.54328E+12', '88'], ['Jimi Hendrix', 'Logged In', 'Mohammad', 'M', '1', 'Rodriguez', '239.82975', 'paid', 'Sacramento--Roseville--Arden-Arcade, CA', 'PUT', 'NextSong', '1.54051E+12', '961', 'Woodstock Inprovisation', '200', '1.54328E+12', '88'], ['Building 429', 'Logged In', 'Mohammad', 'M', '2', 'Rodriguez', '300.61669', 'paid', 'Sacramento--Roseville--Arden-Arcade, CA', 'PUT', 'NextSong', '1.54051E+12', '961', 'Majesty (LP Version)', '200', '1.54328E+12', '88'], ["The B-52's", 'Logged In', 'Gianna', 'F', '0', 'Jones', '321.54077', 'free', 'New York-Newark-Jersey City, NY-NJ-PA', 'PUT', 'NextSong', '1.54087E+12', '107', 'Love Shack', '200', '1.54328E

In [4]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Apache Cassandra code is in the cells below

#### Creating a Cluster

In [5]:
from cassandra.cluster import Cluster
cluster = Cluster()

try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
    print("Connection Established !!")
except Exception as e:
    print(f"Connection Failed !! Error : {e}")

Connection Established !!


#### Create Keyspace
##### In a relational data model, this would be a database. In Cassandra, databases are known as keyspaces.

In [6]:
keyspace_query = """CREATE KEYSPACE IF NOT EXISTS sparkify 
                    with REPLICATION = 
                    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
                """

try:
    session.execute(keyspace_query)
except Exception as e:
    print(f"Failed to create keyspace!! Error : {e}")

#### Set Keyspace

In [7]:
session.set_keyspace('sparkify')

### Now we'll need to create tables to run the following queries.

## We will create queries to ask the following three questions of the data:

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'


============================================================================================================================

### Query 1: Question

#### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

### Query 1: Description

#### A) This question expects the artist name, song title, and song length based on a specified sessionId and itemInSession.

#### B) For this query, we will create a new table for the query and select the artist name, song name, and song length from it. Then we'll filter by sessionId and itemInSession.

#### C) When creating the table, we will add NOT EXIST to the CREATE statement to check if the table exists and only create the table if it does not exist.

#### D) When creating the table, we include the column names and their accompanying data types. The sequence of these columns should also follow the order of the COMPOSITE PRIMARY KEY and CLUSTERING columns. This means that sessionId and itemInSession are ordered first in our CREATE and INSERT statements. Our columns in the table will include sessionId, itemInSession, artist song, and length.

#### E) Our PRIMARY KEY uniquely identifies each row in the newly created session_songs table. To filter results by sessionId and itemInSession, we'll need to use both as the primary key. If we were to select only one, there would be a filtering error when run.

============================================================================================================================

In [8]:
createtable_query1 = """CREATE TABLE IF NOT EXISTS session_songs (sessionId int, itemInSession int, artist text, song text, length float, PRIMARY KEY (sessionId, itemInSession))"""

try: 
    session.execute(createtable_query1)
    print("Table Created!!")
except Exception as e:
    print(f"Table creation failed!! Error : {e}")

Table Created!!


In [9]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO session_songs (sessionId, itemInSession, artist, song, length)" 
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        artist_name, user_name, gender, itemInSession, user_last_name, length, level, location, sessionId, song, userId = line
        session.execute(query, (int(sessionId), int(itemInSession), artist_name, song, float(length)))

In [10]:
select_query1 = "SELECT artist, song, length FROM  session_songs where sessionId = 338 and itemInSession = 4"
try:
    rows = session.execute(select_query1)
except Exception as e:
    print(e)
    
for row in rows:
    print(row)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=495.30731201171875)


============================================================================================================================

### Query 2: Question

#### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

### Query 2: Description

#### A) This question expects the artist name, song title, and user based on a specified userId and sessionId.

#### B) For this query, we will create a new table for the query and select the artist name, song name, and user first and last name from it. Then we'll filter by sessionId and itemInSession.

#### C) When creating the table, we will add NOT EXIST to the CREATE statement to check if the table exists and only create the table if it does not exist.

#### D) When creating the table, we include the column names and their accompanying data types. The sequence of these columns should also follow the order of the COMPOSITE PRIMARY KEY and CLUSTERING columns. This means that sessionId and userid are ordered first in our CREATE and INSERT statements. Our columns in the table will include sessionId, userId, artist, song, firstName, lastName, and itemInSession.

#### E) Our PRIMARY KEY uniquely identifies each row in the newly created user_session table. To filter results by sessionId and userid, we'll need to use both as the primary key. If we were to select only one, there would be a filtering error when run.

#### F) We'll also use the clause WITH CLUSTERING ORDER BY to sort our data on itemInSession.

============================================================================================================================

In [11]:
create_query2 = """CREATE TABLE IF NOT EXISTS user_session (sessionId int, userId int, artist text, 
song text, firstName text, lastName text, itemInSession int, PRIMARY KEY ((sessionId, userId), 
itemInSession)) WITH CLUSTERING ORDER BY (itemInSession ASC) """

try: 
    session.execute(create_query2)
    print("Table Created!!")
except Exception as e:
    print(f"Table creation failed!! Error : {e}")
                    

Table Created!!


In [14]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO user_session (sessionId, userId, artist, song, firstName, lastName, itemInSession) "
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s) "
        session.execute(query, (int(line[8]), int(line[10]), line[0], line[9], line[1], line[4], int(line[3])  ) )

In [15]:
select_query2 = "SELECT artist, song, firstName, lastName FROM  user_session where sessionId = 182 and userId = 10"
try:
    rows = session.execute(select_query2)
except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(artist='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')


============================================================================================================================

### Query 3: Question

#### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

### Query 3: Description

#### A) This question expects the first and last name of every user who listened to the specified song of 'All Hands Against His Own'

#### B) For this query, we will create a new table for the query and select the song name, and user first and last name from it. Then we'll filter by song name.

#### C) When creating the table, we will add NOT EXIST to the CREATE statement to check if the table exists and only create the table if it does not exist.

#### D) When creating the table, we include the column names and their accompanying data types. The sequence of these columns should also follow the order of the COMPOSITE PRIMARY KEY and CLUSTERING columns. This means that sessionId and itemInSession are ordered first in our CREATE and INSERT statements. Our columns in the table will include song, userId, firstName, and lastName.

#### E) Our PRIMARY KEY uniquely identifies each row in the newly created session_songs table. To filter results by sessionId and itemInSession, we'll need to use both as the primary key. If we were to select only one, there would be a filtering error when run.

============================================================================================================================

In [16]:
create_query3 = """CREATE TABLE IF NOT EXISTS user_song_ken (song text, userId int, firstName text, lastName text, PRIMARY KEY ((song), userId))"""

try: 
    session.execute(create_query3)
    print("Table Created!!")
except Exception as e:
    print(f"Table creation failed!! Error : {e}")

Table Created!!


In [17]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO user_song_ken (song, userId, firstName, lastName) "
        query = query + " VALUES (%s, %s, %s, %s) "
        session.execute(query, (line[9], int(line[10]), line[1], line[4] ) )

In [19]:
select_query3 = "SELECT song, firstName, lastName FROM user_song_ken where song = 'All Hands Against His Own'"
try:
    rows = session.execute(select_query3)
except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(song='All Hands Against His Own', firstname='Jacqueline', lastname='Lynch')
Row(song='All Hands Against His Own', firstname='Tegan', lastname='Levine')
Row(song='All Hands Against His Own', firstname='Sara', lastname='Johnson')


### We will now drop the tables before closing out the sessions.

In [20]:
session.execute("DROP TABLE IF EXISTS sparkify.session_songs")
session.execute("DROP TABLE IF EXISTS sparkify.user_session")
session.execute("DROP TABLE IF EXISTS sparkify.user_song_ken")

<cassandra.cluster.ResultSet at 0x7f0c1841c208>

### We'll now close the session and cluster connection.

In [21]:
session.shutdown()
cluster.shutdown()