# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/workspace/home


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
# print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
# print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [4]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II. Apache Cassandra Coding

## The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

#### Creating a Cluster

In [5]:
from cassandra.cluster import Cluster
try: 
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace

In [6]:
# Create a Keyspace
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS practice 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

#### Set Keyspace

In [7]:
# Set KEYSPACE to the keyspace specified above
try:
    session.set_keyspace('practice')
except Exception as e:
    print(e)

In [8]:
file = 'event_datafile_new.csv'

## Develop Queries to Address Key Data Insights

### 1. Retrieve the artist, song title, and song length from the music app history for a specific session and item:
   - Query the data to identify the artist, song title, and song duration listened to during `sessionId = 338` and `itemInSession = 4`.

### 2. Retrieve user-specific song playback information sorted by session activity:
   - For `userId = 10` and `sessionId = 182`, obtain the artist's name, song title (sorted by `itemInSession`), and the user’s first and last name, providing a clear view of the listening history within the session.

### 3. Retrieve the list of users who listened to a specific song:
   - Identify every user (first and last name) in the app history who listened to the song 'All Hands Against His Own', enabling analysis of listener demographics for this particular track.




In [9]:
## Query 1: Retrieve the artist, song title, and song length from the music app history 
## for the session with sessionId = 338 and itemInSession = 4

## Define the new table as songDetails1
query = "CREATE TABLE IF NOT EXISTS session_song_history "
## Using sessionId and itemInSession as the primary key since they uniquely identify each row.
## Place the primary key columns first in the table structure.
query = query + "(sessionId int, itemInSession int, artist text, song text, length float, PRIMARY KEY (sessionId, itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # Skip the header row
    for line in csvreader:
        ## Prepare the INSERT statement for each row in the CSV file
        query = "INSERT INTO session_song_history (sessionId, itemInSession, artist, song, length)"
        query = query + " VALUES (%s, %s, %s, %s, %s)"
        ## Map each CSV column to the corresponding table column in the INSERT statement
        session.execute(query, ( int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

## Execute a SELECT statement to confirm data was correctly inserted into the table
## Query by the specified sessionId and itemInSession values
query = "SELECT * FROM session_song_history WHERE sessionId = 338 and itemInSession = 4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
for row in rows:
    print("artist: " + row.artist + " || title: " + row.song + " || length: " + str(row.length))

artist: Faithless || title: Music Matters (Mark Knight Dub) || length: 495.30731201171875


In [10]:
## Query 2: Retrieve the artist's name, song title (sorted by itemInSession), 
## and user's first and last name for userId = 10 and sessionId = 182

query = "CREATE TABLE IF NOT EXISTS user_song_playlist "
## Since song titles need to be sorted by itemInSession, we include itemInSession in the query
## and add it to the primary key to support sorting.
## A composite partition key is used to optimize query performance.
query = query + "(userId int, sessionId int, itemInSession int, artist text, song text, firstName text, lastName text, PRIMARY KEY ((userId, sessionId), itemInSession))"
try:
    session.execute(query)
except Exception as e:
    print(e)
               

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # Skip the header row
    for line in csvreader:
        ## Prepare the INSERT statement for each row in the CSV file
        query = "INSERT INTO user_song_playlist (userId, sessionId, itemInSession, artist, song, firstName, lastName)"
        query = query + " VALUES (%s, %s, %s, %s, %s, %s, %s)"
        ## Map each CSV column to the corresponding table column in the INSERT statement
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

## Execute a SELECT statement to confirm data was correctly inserted and sorted by itemInSession
query = "SELECT * FROM user_song_playlist WHERE userId = 10 and sessionId = 182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print("artist: " + row.artist + " || title: " + row.song + " || user: " + row.firstname + " " + row.lastname)

artist: Down To The Bone || title: Keep On Keepin' On || user: Sylvie Cruz
artist: Three Drives || title: Greece 2000 || user: Sylvie Cruz
artist: Sebastien Tellier || title: Kilometer || user: Sylvie Cruz
artist: Lonnie Gordon || title: Catch You Baby (Steve Pitron & Max Sanna Radio Edit) || user: Sylvie Cruz


In [11]:
## Query 3: Retrieve every user's first and last name in the music app history 
## who listened to the song 'All Hands Against His Own'

query = "CREATE TABLE IF NOT EXISTS who_listened "
## Both first and last names are included to identify each user, 
## but userId alone could also uniquely identify rows if needed.
query = query + "(song text, userId int, firstName text, lastName text, PRIMARY KEY (song, userId))"
try:
    session.execute(query)
except Exception as e:
    print(e)
               

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # Skip the header row
    for line in csvreader:
        ## Prepare the INSERT statement for each row in the CSV file
        query = "INSERT INTO who_listened (song, userId, firstName, lastName)"
        query = query + " VALUES (%s, %s, %s, %s)"
        ## Map each CSV column to the corresponding table column in the INSERT statement
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

## Execute a SELECT statement to retrieve users who listened to 'All Hands Against His Own'
query = "SELECT * FROM who_listened WHERE song = 'All Hands Against His Own'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print("user: " + row.firstname + " " + row.lastname)

user: Jacqueline Lynch
user: Tegan Levine
user: Sara Johnson


### Drop the tables before closing out the sessions

In [12]:
## Drop the table before closing out the sessions
query = "DROP table session_song_history"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

query = "DROP table user_song_playlist"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
query = "DROP table who_listened"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

### Close the session and cluster connection¶

In [13]:
session.shutdown()
cluster.shutdown()