In [1]:
import pandas as pd
import numpy as np
import glob
import os

### Retrieve filepaths of csv files in '`/event_data`

In [2]:
# Determine the path of event data
directory = os.path.join(os.getcwd(), 'event_data')
print(f"Path of '/event_data': '{directory}'")

# Return a list of all CSV files in directory with glob
files = glob.glob(os.path.join(directory,'*.csv'))

Path of '/event_data': '/home/ubuntu/github-repos/data-modeling-with-cassandra/event_data'


### Concatenate DataFrames made from csv files and save it as `event_data_new.csv`

In [3]:
# Subset of columns to use
cols = ['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId']

# Cast column names to lower to be renamed after reading
cols_lower = [x.lower() for x in cols]

# Iterate over every file and map a pd.read_csv function with arguments
# to files iterable to create a concatenated DataFrame
df = pd.concat(map(lambda csv: pd.read_csv(csv, 
                                           skipinitialspace=True,
                                           quoting=1,
                                           usecols=cols,
                                           dtype={
                                               'userId': 'Int64',
                                               'sessionId': 'Int64',
                                               'itemInSession': 'Int64'
                                           }
                                           ), files), ignore_index=True)
# Drop null records as it's reflected in template project
df = df.dropna(subset='artist')
# Rename column names
df.columns = cols_lower
# Print out number of rows
print(f'Processed {len(df)} rows')

Processed 6820 rows


In [4]:
# Save the output as event_data_new.csv
df.to_csv('event_data_new.csv', index=False, encoding='utf8', quoting=1)

### Create a connection to Cassandra instance

In [5]:
# Establish a connection to local cluster
from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1']) 
session = cluster.connect()

In [6]:
# Create a keyspace and set current session to it
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)
 # Connect to keypace
session.set_keyspace('sparkify')

### Create tables and insert rows based on the queries we want to run:

- #### Query 1:  Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

In [7]:
query = '''
    SELECT 
        artist, 
        song, 
        length 
    FROM music_app_history_sessions
    WHERE sessionid = 338
    AND iteminsession = 4
'''
# Create table with primary keys for sessionid and iteminsession since we will be querying those
create_stmt = '''
    CREATE TABLE IF NOT EXISTS music_app_history_sessions
    (sessionid int, iteminsession int, artist text, song text, length decimal,
        PRIMARY KEY ((sessionid, iteminsession))
        );
'''
session.execute(create_stmt)

# Insert statement 
insert_stmt = '''
    INSERT INTO music_app_history_sessions (sessionid, iteminsession, artist, song, length)
    VALUES (%s, %s, %s, %s, %s)
'''

# Select only the columns we need
df_select = df[['sessionid', 'iteminsession', 'artist', 'song', 'length']]

In [8]:
# Most efficent ways to iter over rows in DataFrame can be found here:
# https://towardsdatascience.com/heres-the-most-efficient-way-to-iterate-through-your-pandas-dataframe-4dad88ac92ee

# Here df.to_dict('records') method is used
df_dict = df_select.to_dict('records')

# Convert a list of dictionaries to a list of tuple values
df_values = [tuple(record.values()) for record in df_dict]

for record in df_values:
    session.execute(insert_stmt, record)
    
# Validate the query
result = session.execute(query)
for row in result:
    print(row)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=Decimal('495.3073'))


- #### Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [9]:
query = '''
    SELECT 
        artist, 
        song, 
        firstname,
        lastname
    FROM music_app_history_user_activity
    WHERE userid = 10
    AND sessionid = 182
'''
# Create table with primary keys for userid and sessionid, with iteminsession as clustering key (to enable sorting)
create_stmt = '''
    CREATE TABLE IF NOT EXISTS music_app_history_user_activity
    (userid int, sessionid int, iteminsession int, artist text, song text, firstname text, lastname text,
        PRIMARY KEY ((userid, sessionid), iteminsession)
        );
'''
session.execute(create_stmt)

# Insert statement 
insert_stmt = '''
    INSERT INTO music_app_history_user_activity (userid, sessionid, iteminsession, artist, song, firstname, lastname)
    VALUES (%s, %s, %s, %s, %s, %s, %s)
'''

# Select only the columns we need
df_select = df[['userid', 'sessionid', 'iteminsession', 'artist', 'song', 'firstname', 'lastname']]

In [10]:
# Most efficent ways to iter over rows in DataFrame can be found here:
# https://towardsdatascience.com/heres-the-most-efficient-way-to-iterate-through-your-pandas-dataframe-4dad88ac92ee

# Here df.to_dict('records') method is used
df_dict = df_select.to_dict('records')

# Convert a list of dictionaries to a list of tuple values
df_values = [tuple(record.values()) for record in df_dict]
    
for record in df_values:
    session.execute(insert_stmt, record)
    
# Validate the query
result = session.execute(query)
for row in result:
    print(row)

Row(artist='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')


- #### Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [11]:
query = '''
    SELECT 
        song,
        firstname,
        lastname
    FROM music_app_history_songplays
    WHERE song = 'All Hands Against His Own'
'''
# Create table with primary key for song and userid as clustering key. Since we are extracting firstname and lastname of a user
# the best way is to cluster by userid not first and last name.

create_stmt = '''
    CREATE TABLE IF NOT EXISTS music_app_history_songplays
    (song text, userid int, firstname text, lastname text,
        PRIMARY KEY (song, userid)
        );
'''
session.execute(create_stmt)

# Insert statement 
insert_stmt = '''
    INSERT INTO music_app_history_songplays (song, userid, firstname, lastname)
    VALUES (%s, %s, %s, %s)
'''

# Select only the columns we need
df_select = df[['song', 'userid', 'firstname', 'lastname']]

In [12]:
# Most efficent ways to iter over rows in DataFrame can be found here:
# https://towardsdatascience.com/heres-the-most-efficient-way-to-iterate-through-your-pandas-dataframe-4dad88ac92ee

# Here df.to_dict('records') method is used
df_dict = df_select.to_dict('records')

# Convert a list of dictionaries to a list of tuple values
df_values = [tuple(record.values()) for record in df_dict]
    
for record in df_values:
    session.execute(insert_stmt, record)
    
# Validate the query
result = session.execute(query)
for row in result:
    print(row)

Row(song='All Hands Against His Own', firstname='Jacqueline', lastname='Lynch')
Row(song='All Hands Against His Own', firstname='Tegan', lastname='Levine')
Row(song='All Hands Against His Own', firstname='Sara', lastname='Johnson')


### Drop the tables and close the connection

In [13]:
session.execute('drop table music_app_history_sessions')
session.execute('drop table music_app_history_user_activity')
session.execute('drop table music_app_history_songplays')
session.shutdown()
cluster.shutdown()