# Project Overview: This project will solve the hypothetical challenge of providing tables which can properly answer queries proposed by a music app streaming company who would like to gain insight into song and user activity. Currently the information exists in a directory containing CSV files, making easy analysis of the information impossible. We'll need to set up ETL processes to move the information into an Apache Cassandra database where we can create tables curated to the queries of interest.

## Part I. Let's build an ETL pipeline to preprocess our files

#### We'll start with some imports

In [None]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Next I'll create filepaths to process the original event csv data files

In [None]:
# checking the current working directory
print(os.getcwd())

# Get the current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #uncomment to see file_path_list print(file_path_list)

#### Then I'll process the files to create the data file csv that will be used for my Apache Casssandra tables

In [None]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #uncomment to see each row that's being appended print(line)
            full_data_rows_list.append(line) 
            
print(len(full_data_rows_list))
#uncomment to see full_data_rows_list print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [None]:
# check the number of rows in the csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

# Part II. Here we'll take care of the Apache Cassandra coding portion of the project.

## Now we are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Apache Cassandra Code

#### We begin our session by creating a cluster and connection 

In [None]:
# Make a connection to a Cassandra instance for your local machine 
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])
# To establish connection and begin executing queries, need a session
    session = cluster.connect()
except Exception as e:
    print(e)

#### Let's create a keyspace to work in

In [None]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS music_app_history 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Now to set our keyspace

In [None]:
try:
    session.set_keyspace('music_app_history')
except Exception as e:
    print(e)

## Here I'll create seperate tables designed to answer the following queries:

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### To answer the first query let's create a table with sessionId and itemInSession as our composite primary key since these are the columnns which we were provided selection criteria for. We'll only use relevant columns from the dataset for the remainder of the table creation. In this instance, some relevant columns to include would be: artist, song and length since these all contain information requested in the query. Lastly, i'll iterate over the csv file, extract the values from the columns included in the table and insert these values into the session_track table. 

In [None]:
query = "CREATE TABLE IF NOT EXISTS session_track "
query = query + "(sessionId int, itemInSession int, artist text, song text, length float, PRIMARY KEY (sessionId, itemInSession))"
try:    
    session.execute(query)
except Exception as e:
    print(e)

In [None]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO session_track (sessionId, itemInSession, artist, song, length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

#### Let's perform a select statement for the first query on the table session_track to validate that the values were properly inserted into the table. We want to select the artist, song, and length using a WHERE statement on our composite primary key with the values requested from the query.

In [None]:
query = "select artist, song, length from session_track WHERE sessionId=338 and itemInSession=4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song, row.length)

### Here I repeat the above processes for additional queries

#### To answer the second query let's create a table with userId and sessionId as our composite partition key since these are the columns which we were provided selection criteria for. itemInSession will be our clustering column as we want to sort songs via this column. We'll only use relevant columns from the dataset for the remainder of the table creation. In this example, some relevant columns to include would be: artist, song, firstName and lastName since these all contain information requested in the query. Lastly, i'll iterate over the csv file, extract the values from the columns included in the table and insert these values into the user_track table. 

In [None]:
query1 = "CREATE TABLE IF NOT EXISTS user_track "
query1 = query1 + "(userId int, sessionId int, itemInSession int, artist text, song text, firstName text, \
         lastName text, PRIMARY KEY ((userId, sessionId), itemInSession))"
try:    
    session.execute(query1)
except Exception as e:
    print(e)
    
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query1 = "INSERT INTO user_track (userId, sessionId, itemInSession, artist, song, firstName, lastName)"
        query1 = query1 + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query1, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

#### To answer the third query, let's create a table with song as our partition key since this is the column we were provided search criteria for. The remainder of the composite primary key will include userId, firstName and lastName so that we can receive all the unique rows from the dataset and not overwrite any existing rows while also having access to the information requested in the query. Lastly, i'll iterate over the csv file, extract the values from the columns included in the table and insert these values into the song_track table. 

In [None]:
query2 = "CREATE TABLE IF NOT EXISTS song_track "
query2 = query2 + "(song text, userId int, firstName text, lastName text, \
         PRIMARY KEY (song, userId, firstName, lastName))"
try:    
    session.execute(query2)
except Exception as e:
    print(e)
    
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query2 = "INSERT INTO song_track (song, userId, firstName, lastName)"
        query2 = query2 + " VALUES (%s, %s, %s, %s)"
        session.execute(query2, (line[9], int(line[10]), line[1], line[4]))

#### Let's do another select statement, this time for the second query on the table user_track to validate that the values were properly inserted into the table. We want to select the artist, song, firstName and lastName using a WHERE statement on our composite partition key with the values requested from the query.

In [None]:
query1 = "select artist, song, firstName, lastName from user_track WHERE userId=10 and sessionId=182"
try:
    rows = session.execute(query1)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist, row.song, row.firstname, row.lastname)

#### Let's do a final select statement for the third query on the table song_track to validate that the values were properly inserted into the table. We want to select the firstName and lastName using a WHERE statement with the values requested from the query on our partition key.

In [None]:
query2 = "select firstName, lastName from song_track WHERE song='All Hands Against His Own'"
try:
    rows = session.execute(query2)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.firstname, row.lastname)

### I'll drop the tables and keyspace now that we're done with them

In [None]:
drop = "DROP TABLE IF EXISTS session_track"
try:
    rows = session.execute(drop)
except Exception as e:
    print(e)
drop1 = "DROP TABLE IF EXISTS user_track"
try:
    rows = session.execute(drop1)
except Exception as e:
    print(e)
drop2 = "DROP TABLE IF EXISTS song_track"
try:
    rows = session.execute(drop2)
except Exception as e:
    print(e)    

In [None]:
dropK = "DROP KEYSPACE IF EXISTS music_app_history"
try:
    rows = session.execute(dropK)
except Exception as e:
    print(e)

### Finally, I'll close the connection and shutdown the cluster

In [None]:
session.shutdown()
cluster.shutdown()