# Pre-processing the files

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import os
import csv

In [2]:
# delete processed_data.csv if already exists
! rm -f data/processed_data.csv

In [3]:
# collect the path of all the data files into a list
def collect_files():
    datapath = os.path.join(os.getcwd(), 'data')
    file_path_list = []
    for root, dirs, files in os.walk(datapath):
        for file in files:
            file_path = os.path.join(root, file)
            file_path_list.append(file_path)
    return file_path_list
files = collect_files()

In [4]:
# read all the files and add all the rows into data list
data = []
for file in files:
    with open(file, 'r', newline='') as f:
        reader = csv.reader(f)
        header = next(reader)
        for line in reader:
            data.append(line)
print(header) 
print('\nnumber of lines in data list:', len(data))

['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName', 'length', 'level', 'location', 'method', 'page', 'registration', 'sessionId', 'song', 'status', 'ts', 'userId']

number of lines in data list: 4965


In [5]:
# write all the rows into process_data.csv
with open('processed_data.csv', 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['artist','firstName','gender','itemInSession','lastName',
                     'length', 'level','location','sessionId','song','userId'])
    for line in data:
        if line[0]=='':
            continue
        writer.writerow([line[i] for i in [0, 2, 3, 4, 5, 6, 7, 8, 12, 13, -1]])

In [6]:
# check number of entries
with open('processed_data.csv', 'r', newline='') as f:
    print(sum(1 for line in f))

4224


## Create Apache Cassandra database

In [7]:
# connect to the cluster
from cassandra.cluster import Cluster

try:
    cluster = Cluster()
    session = cluster.connect()
except Exception as e:
    print('Error in cluster connection')
    print(e)

# create keyspace called sparkify
try:
    session.execute("""CREATE KEYSPACE IF NOT EXISTS sparkify 
                    WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}""")
except Exception as e:
    print('Error in creating keyspace')
    print(e)
    
# set keyspace to sparkify
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print('Error in setting keyspace')
    print(e)

## Build the database based on following queries

- 1. query the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

- 2. query the name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182   

- 3. query every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### query 1: the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

In [8]:
df = pd.read_csv('processed_data.csv')
df.head(2)

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,The Killers,Jayden,M,32,Graves,246.80444,paid,"Marinette, WI-MI",594,Read My Mind,25
1,Tamia,Jayden,M,33,Graves,243.09506,paid,"Marinette, WI-MI",594,Officially Missing You (Radio Version),25


In [9]:
# create table for query 1

query = """CREATE TABLE IF NOT EXISTS 
                table1 (
                artist TEXT, song TEXT, song_length FLOAT, session_id INT, item_in_session INT,
                PRIMARY KEY(session_id, item_in_session)
                );"""
try:
    session.execute(query)
except Exception as e:
    print('Error with creating table')
    print(e)                    

In [10]:
# get the index of each column 

for index, col in enumerate(df.columns):
    print(index, col)

0 artist
1 firstName
2 gender
3 itemInSession
4 lastName
5 length
6 level
7 location
8 sessionId
9 song
10 userId


In [11]:
# insert the data into music_history data

for row in df.values:
    query = """INSERT INTO table1 (
                    artist, song, song_length, session_id, item_in_session
                    )
                VALUES (%s, %s, %s, %s, %s);"""
    try:
        session.execute(query, (row[i] for i in [0, 9, 5, 8, 3]))
    except Exception as e:
        print(e)

In [12]:
# run the query to validate our table

try:
    rows = session.execute("""
                    SELECT artist, song, song_length 
                    FROM table1
                    WHERE session_id = 338 AND item_in_session = 4;
                    """)
except Except as e:
    print(e)

for row in rows:
    print(row)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', song_length=495.30731201171875)


#### query 2: the name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [13]:
# create table for query 2

query = """CREATE TABLE IF NOT EXISTS 
            table2 (
                artist TEXT, song TEXT, session_id INT, item_in_session INT, user TEXT, user_id INT,
                PRIMARY KEY(user_id, session_id, item_in_session)
                );"""
try:
    session.execute(query)
except Exception as e:
    print('Error with creating table')
    print(e)       
                    

In [14]:
# get the index of each column 

for index, col in enumerate(df.columns):
    print(index, col)

0 artist
1 firstName
2 gender
3 itemInSession
4 lastName
5 length
6 level
7 location
8 sessionId
9 song
10 userId


In [15]:
# insert the data into music_history data

for row in df.values:
    query = """INSERT INTO table2 (
                    artist, song, session_id, item_in_session, user, user_id
                    )
                VALUES (%s, %s, %s, %s, %s, %s);"""
    try:
        session.execute(query, (row[0], row[9], row[8], row[3], row[1]+' '+row[4], row[10]))
    except Exception as e:
        print(e)

In [16]:
# run the query to validate our table

try:
    rows = session.execute("""
                    SELECT artist, song, user
                    FROM table2
                    WHERE user_id = 10 AND session_id = 182;
                    """)
except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(artist='Down To The Bone', song="Keep On Keepin' On", user='Sylvie Cruz')
Row(artist='Three Drives', song='Greece 2000', user='Sylvie Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', user='Sylvie Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', user='Sylvie Cruz')


#### query 3: every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [17]:
# create table for query 3

query = """CREATE TABLE IF NOT EXISTS 
                table3 (
                song TEXT, user TEXT, user_id INT,
                PRIMARY KEY(song, user_id)
                );"""
try:
    session.execute(query)
except Exception as e:
    print('Error with creating table')
    print(e)        
                    

In [18]:
# get the index of each column 

for index, col in enumerate(df.columns):
    print(index, col)

0 artist
1 firstName
2 gender
3 itemInSession
4 lastName
5 length
6 level
7 location
8 sessionId
9 song
10 userId


In [19]:
# insert the data into music_history data

for row in df.values:
    query = """INSERT INTO table3 (
                    song, user, user_id
                    )
                VALUES (%s, %s, %s);"""
    try:
        session.execute(query, (row[9], row[1]+' '+row[4], row[10]))
    except Exception as e:
        print(e)

In [20]:
# run the query to validate our table

try:
    rows = session.execute("""
                    SELECT user 
                    FROM table3
                    WHERE song='All Hands Against His Own';
                    """)
except Except as e:
    print(e)

for row in rows:
    print(row)

Row(user='Jacqueline Lynch')
Row(user='Tegan Levine')
Row(user='Sara Johnson')


### Drop the tables before closing out the sessions

In [21]:
for table in ['table1', 'table2', 'table3']:
    try:
        rows = session.execute(f"""
                        DROP TABLE {table};
                        """)
    except Exception as e:
        print(f'Error with dropping {table} table')
        print(e)


### Close the session and cluster connection¶

In [22]:
session.shutdown()
cluster.shutdown()