# Sparkify Casandra database

### Import the required modules

In [1]:
import os
import glob
import pandas as pd
from cassandra.cluster import Cluster

### Define all functions

In [2]:
def get_all_data(filenames):
    """
    Read all the data from all the files
    filenames = list of filenames from which to read the data
    Returns: pandas df containing all the unfiltered data including the source column
    """
    dfs = []
    for fn in filenames:
        df = pd.read_csv(fn)
        df['source'] = fn
        dfs.append(df)
    data = pd.concat(dfs)
    return data

def insert_data(df, target_table, cols, session):
    """
    Use the df to insert data into the target table using the provided session
    df (pandas DataFrame): rows must be filtered (where applicable)
    target_table (string): name of table into which data must be inserted
    cols (list): a list of the columns (in order) matching the target_table
    session (cassandra.cluster.Session): Cassandra session to be used - keyspace must be set
    """
    d = df[cols].copy()
    query = f"INSERT INTO {target_table} ({', '.join(cols)})"
    query = query + f" VALUES ({', '.join(['%s']*len(cols))})"
    try:
        for row in d.values:
            session.execute(query,row.tolist())
    except Exception as e:
        print(e)

def drop_table(table_name, session):
    """
    Drop the table from the database if it exists
    table_name (string): name of the table to be dropped
    session (cassandra.cluster.Session): Cassandra session to be used - keyspace must be set
    """
    try:
        session.execute(f'DROP TABLE IF EXISTS {table_name}')
        return f'{table_name} dropped!'
    except Exception as e:
        print(e)
        return f'{table_name} not dropped!'
        

def drop_tables(table_list, session):
    """
    Drop multiple tables from the keyspace
    table_list (list): A list of table names (string) to drop.
    session (cassandra.cluster.Session): Cassandra session to be used - keyspace must be set
    """
    ret = []
    if isinstance(table_list, str):
        ret.append(drop_table(table_list, session))
        
    else:
        for table in table_list:
            ret.append(drop_table(table, session))
    return ret

### Working directories

In [3]:
# Get the data directory
data_dir = os.path.join(os.getcwd(),'event_data')

# Get all the filenames to be processed
filenames = glob.glob(os.path.join(data_dir,'*'), recursive=True)

### Get all the data from the raw files

In [4]:
all_data = get_all_data(filenames)

### Create the filtered dataset

In [5]:
# filter all the data to generate the events dataset where the artist is not null
cols = ['artist','firstName','gender','itemInSession','lastName',
        'length','level','location','sessionId','song','userId']
data = all_data[all_data.artist.notna()][cols].copy()
data['userId'] = data['userId'].astype(int)

# Write the filtered data to a file in the current working directory
data.to_csv('event_datafile_new.csv', index=False)

### Create connection to cluster

In [6]:
cluster = Cluster()
session = cluster.connect()

### Create Keyspace

In [7]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkifydb 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
    """)
except Exception as e:
    print(e)

### Set Keyspace

In [8]:
try:
    session.set_keyspace('sparkifydb')
except Exception as e:
    print(e)

## Create queries to ask the following three questions of the data

### Query 1:
#### Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4
```
SELECT artist, song as songTitle, length as songLength
FROM song_play_session_items
WHERE sessionId=338 AND itemInSession=4
```

### Query 2:
#### Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
```
SELECT artist, song as songTitle, firstName, lastName
FROM artist_song_session_plays
WHERE userId=10 AND sessionId=182
ORDER BY sessionId, itemInSession
```

### Query 3:
#### Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
```
SELECT firstName, lastName
FROM user_song_plays
WHERE song='All Hands Against His Own'
```

### Store the queries in variables and identify relevant columns for table

In [9]:
# Any columns used in an order by or where clause should form part of the Primary Composite Key

# Query 1
# Since the data is uniquely identified by the combination of sessionId and itemInSession, these two columns form the Composite Primary Key of the table.  
# We want the data distributed as evenly as possible, the better choice for the Primary Key is sessionId since itemInSession is expected to be skewed towards 0
query1 = """ 
SELECT artist, song as songTitle, length as songLength
FROM song_play_session_items
WHERE sessionId=338 AND itemInSession=4
"""
spsi_cols = ['sessionId','itemInSession','artist','song','length']


# Query 2
# The same logic applies to this query as in Query 1.  
# We use the sessionId as the Primary Key and userId and itemInSession as the clustering columns
# The query will first use userId and then sessionId in its where clause.  
# Finally, it will be ordered by itemInSession which means ordering by the other two columns too
query2 = """ 
SELECT artist, song as songTitle, firstName, lastName
FROM artist_song_session_plays
WHERE userId=10 AND sessionId=182
ORDER BY userId, itemInSession 
"""
assp_cols = ['sessionId','userId','itemInSession','artist','song','firstName','lastName']

# Query 3
# This query concerns which users listen to an artist
# Since a two users can share the same name, we need to use their userId as the unique identifier along with song.  Even though the userId is not required in the actual query.
# We therefore have song as the primary key and userId as the clustering column
query3 = """ 
SELECT firstName, lastName
FROM user_song_plays
WHERE song='All Hands Against His Own'
"""
usp_cols = ['song', 'userId', 'firstName', 'lastName']

### Create the song_play_session_items table and insert the data

In [10]:
# Drop the table if it already exists
drop_table('song_play_session_items', session)

# Create new table
create_song_play_session_items = """
CREATE TABLE IF NOT EXISTS song_play_session_items (
sessionId int, itemInSession int, artist text, song text, length decimal,
PRIMARY KEY (sessionId, itemInSession)
)
"""
try:
    session.execute(create_song_play_session_items)
except Exception as e:
    print(e)

# Insert the data
insert_data(data, 'song_play_session_items',spsi_cols,session)

### Run test query to see if it worked

In [11]:
try:
    rows = session.execute(query1)
except Exception as e:
    print(e)
    
for row in rows:
    print (row)

Row(artist='Faithless', songtitle='Music Matters (Mark Knight Dub)', songlength=Decimal('495.3073'))


In [12]:
# double check that it's correct from the pandas df:
data.query('sessionId==338').query('itemInSession==4')[spsi_cols[2:]]

Unnamed: 0,artist,song,length
54,Faithless,Music Matters (Mark Knight Dub),495.3073


### create the artist_song_session_plays table and insert the data

In [13]:
# Drop the table if it already exists
drop_table('artist_song_session_plays', session)

# Create new table
create_artist_song_session_plays = """
CREATE TABLE IF NOT EXISTS artist_song_session_plays (
sessionId int, userId int, itemInSession int, artist text, song text, firstName text, lastName text,
PRIMARY KEY (sessionId, userId, itemInSession)
)
"""
try:
    session.execute(create_artist_song_session_plays)
except Exception as e:
    print(e)
insert_data(data, 'artist_song_session_plays', assp_cols, session)

### Run test query to see if it worked

In [14]:
try:
    rows = session.execute(query2)
except Exception as e:
    print(e)
    
for row in rows:
    print (row)

Row(artist='Down To The Bone', songtitle="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
Row(artist='Three Drives', songtitle='Greece 2000', firstname='Sylvie', lastname='Cruz')
Row(artist='Sebastien Tellier', songtitle='Kilometer', firstname='Sylvie', lastname='Cruz')
Row(artist='Lonnie Gordon', songtitle='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')


In [15]:
# double check that it's correct from the pandas df:
data.sort_values('itemInSession').query('sessionId==182').query('userId==10')[assp_cols[3:]]

Unnamed: 0,artist,song,firstName,lastName
155,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
156,Three Drives,Greece 2000,Sylvie,Cruz
158,Sebastien Tellier,Kilometer,Sylvie,Cruz
159,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


### Query 3:
#### Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

### create the user_song_plays table and insert the data

In [16]:
# Drop the table if it already exists
drop_table('user_song_plays', session)

# Create new table
create_user_song_plays = """
CREATE TABLE IF NOT EXISTS user_song_plays (
song text, userId int, firstName text, lastName text, 
PRIMARY KEY (song, userId)
)
"""
try:
    session.execute(create_user_song_plays)
except Exception as e:
    print(e)
insert_data(data, 'user_song_plays', usp_cols, session)

### Run test query to see if it worked

In [17]:
try:
    rows = session.execute(query3)
except Exception as e:
    print(e)
    
for row in rows:
    print (row)

Row(firstname='Jacqueline', lastname='Lynch')
Row(firstname='Tegan', lastname='Levine')
Row(firstname='Sara', lastname='Johnson')


In [18]:
# double check that it's correct from the pandas df:
data.query("song=='All Hands Against His Own'")[usp_cols[2:]]

Unnamed: 0,firstName,lastName
57,Sara,Johnson
100,Tegan,Levine
315,Jacqueline,Lynch


We see that our tables were created and yields the correct result for the given queries!

## Drop the tables before closing out the sessions

In [19]:
drop_tables(['user_song_plays',
             'artist_song_session_plays',
             'song_play_session_items'], session)

['user_song_plays dropped!',
 'artist_song_session_plays dropped!',
 'song_play_session_items dropped!']

### Close the session and cluster connection

In [20]:
session.shutdown()
cluster.shutdown()