In [1]:
import cassandra
from cassandra.cluster import Cluster
from dmc import CassandraConnector
import pandas as pd
import os
from tqdm import tqdm

### Preprocess the Flatfiles

In [2]:
def process_file(df, filename, folder_path) -> pd.DataFrame:
    """
        Processses a csv and appends it to an existing DataFrame.
        df: Original DataFrame.
        filename: Name of the csv file.
        folder_path: Path to the folder with the csv files.
    """
    subdf = pd.read_csv(folder_path+filename)
    date = filename[0:10]
    subdf["date"] = date
    df = df.append(subdf)
    return df

In [3]:
def parse_csv_folder(df, folder_path) -> pd.DataFrame:
    """
        Reads a folder, looks for csv files and appends them to a DataFrame.
        df: Original DataFrame
        folder_path: Path to the folder that contains the csv files.
    """
    csvs = os.listdir(folder_path)
    for csv in tqdm(csvs, total=len(csvs)):
        if ".csv" in csv:
            df = process_file(df, csv, folder_path)
    return df

### Generate the DataFrame with the data

In [4]:
df = pd.DataFrame()
df = parse_csv_folder(df, "../event_data/")
df = df[["artist", "firstName", "itemInSession", "lastName", "length", "level", "location", "sessionId", 
        "song", "userId"]]
df = df[df["artist"].notna()]
df = df.astype(str)
df["sessionId"] = df["sessionId"].astype(int)
df["itemInSession"] = df["itemInSession"].astype(int)
df["length"] = df["length"].astype(float)
df["userId"] = df["userId"].astype(float).astype(int)
df.reset_index(inplace=True, drop=True)
df.head()

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 30/30 [00:00<00:00, 141.65it/s]


Unnamed: 0,artist,firstName,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Harmonia,Ryan,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",583,Sehr kosmisch,26
1,The Prodigy,Ryan,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",583,The Big Gundown,26
2,Train,Ryan,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",583,Marry Me,26
3,Sony Wonder,Samuel,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",597,Blackbird,61
4,Van Halen,Tegan,2,Levine,289.38404,paid,"Portland-South Portland, ME",602,Best Of Both Worlds (Remastered Album Version),80


In [5]:
df.shape # Should be 6820

(6820, 10)

### Save the csv

In [6]:
df.to_csv("../event_datafile_new.csv")

### Create a cluster and connector to Cassandra

In [7]:
# The CassandraConnector object automatically connects to the specified host, creates a cluster, 
# and establishes a session.
conn = CassandraConnector(host="127.0.0.1", port=9042)

In [8]:
# Create the keyspace
KEYSPACE = "sparkifydb"
conn.create_keyspace(KEYSPACE)

In [9]:
conn.session.set_keyspace(KEYSPACE)

### Model, create and populate the tables based on the queries

- 1. Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
- 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
- 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### 1. Artist, song title and length in the app history heard under sessionId 338, itemInSession 4

Since we are being asked for songs listened under a specific pair of (sessionId, itemInSession), I think it is best to have them as our composite key for the table, with the sessionId being the primary key and the itemInSession sounds like a natural sorting column to use. I'm going to name this table songs_by_session.

In [10]:
# Model and create the table
table_name = "songs_by_session"
conn.drop_table(table_name)
create_query = """
    CREATE TABLE IF NOT EXISTS %s 
        (
            session_id INT, 
            item_in_session INT, 
            artist TEXT, 
            song_title TEXT,
            length DOUBLE, 
            PRIMARY KEY(session_id, item_in_session)
        )
""" % (table_name)
conn.session.execute(create_query)

<cassandra.cluster.ResultSet at 0x11c3a3310>

In [11]:
# Populate the table
for index, row in tqdm(df.iterrows(), total=df.shape[0]):
    insert_query = """
    INSERT INTO songs_by_session 
        (
            session_id, 
            item_in_session, 
            artist, 
            song_title, 
            length
        )
        VALUES (%s, %s, %s, %s, %s)
    """
    try:
        conn.session.execute(insert_query, (row["sessionId"], row["itemInSession"], row["artist"], row["song"], 
           row["length"]))
    except Exception as e:
        print(e)
        print(insert_query)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 6820/6820 [00:15<00:00, 434.63it/s]


In [12]:
# Select to check data
select_query = """
    SELECT artist, song_title, length FROM songs_by_session 
    WHERE session_id = 338 AND item_in_session = 4
"""
rows = conn.session.execute(select_query)
for row in rows:
    print(row)

Row(artist='Faithless', song_title='Music Matters (Mark Knight Dub)', length=495.3073)


#### 2. Name of artist, song(sorted by itemInSession), user (FN, LN), for userId = 10, sessionId= 182

For this query I'm going to work again with a similar composite key, formed by the user_id as the PK, and session_id, and item_in_session as the other keys. The only difference for this table is that I'm going to store the FN and LN of the user. This table will be named songs_by_session_user.

In [13]:
# Model and create the table
table_name = "songs_by_session_user"
conn.drop_table(table_name)
create_query = """
    CREATE TABLE IF NOT EXISTS %s 
        (
            user_id INT, 
            session_id INT,
            item_in_session INT,
            artist TEXT, 
            song_title TEXT,
            first_name TEXT,
            last_name TEXT,
            PRIMARY KEY((user_id, session_id), item_in_session)
        )
""" % (table_name)
conn.session.execute(create_query)

<cassandra.cluster.ResultSet at 0x11c4075d0>

In [14]:
# Populate the table
for index, row in tqdm(df.iterrows(), total=df.shape[0]):
    insert_query = """
    INSERT INTO songs_by_session_user 
        (
            user_id,
            session_id,
            item_in_session,
            artist, 
            song_title, 
            first_name,
            last_name
        )
        VALUES (%s, %s, %s, %s, %s, %s, %s)
    """
    try:
        conn.session.execute(insert_query, (row["userId"], row["sessionId"], row["itemInSession"], row["artist"], row["song"], 
           row["firstName"], row["lastName"]))
    except Exception as e:
        print(e)
        print(insert_query)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 6820/6820 [00:14<00:00, 478.96it/s]


In [15]:
# Select to check data
select_query = """
    SELECT artist, song_title, first_name, last_name FROM songs_by_session_user
    WHERE user_id = 10 AND session_id = 182
"""
rows = conn.session.execute(select_query)
for row in rows:
    print(row)

Row(artist='Down To The Bone', song_title="Keep On Keepin' On", first_name='Sylvie', last_name='Cruz')
Row(artist='Three Drives', song_title='Greece 2000', first_name='Sylvie', last_name='Cruz')
Row(artist='Sebastien Tellier', song_title='Kilometer', first_name='Sylvie', last_name='Cruz')
Row(artist='Lonnie Gordon', song_title='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', first_name='Sylvie', last_name='Cruz')


#### 3. User name (FN, LN) who listened to the song ('All Hands Against His Own')

Ok, for this one we need to create a PK formed by the song_title, and the user_id. The name of this table will be "users_who_listened_song".

In [16]:
# Model and create the table
table_name = "users_who_listened_song"
conn.drop_table(table_name)
create_query = """
    CREATE TABLE IF NOT EXISTS %s 
        (
            song_title TEXT,
            user_id INT,
            first_name TEXT,
            last_name TEXT,
            PRIMARY KEY(song_title, user_id)
        )
""" % (table_name)
conn.session.execute(create_query)

<cassandra.cluster.ResultSet at 0x11c3d2d90>

In [17]:
# Populate the table
for index, row in tqdm(df.iterrows(), total=df.shape[0]):
    insert_query = """
    INSERT INTO users_who_listened_song
        (
            song_title,
            user_id,
            first_name,
            last_name
        )
        VALUES (%s, %s, %s, %s)
    """
    try:
        conn.session.execute(insert_query, (row["song"], row["userId"],row["firstName"], row["lastName"]))
    except Exception as e:
        print(e)
        print(insert_query)

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 6820/6820 [00:14<00:00, 470.46it/s]


In [18]:
# Select to check data
select_query = """
    SELECT first_name, last_name FROM users_who_listened_song
    WHERE song_title = 'All Hands Against His Own'
"""
rows = conn.session.execute(select_query)
for row in rows:
    print(row)

Row(first_name='Jacqueline', last_name='Lynch')
Row(first_name='Tegan', last_name='Levine')
Row(first_name='Sara', last_name='Johnson')


### Drop the tables

In [19]:
conn.drop_table("songs_by_session")
conn.drop_table("songs_by_session_user")
conn.drop_table("users_who_listened_song")

### Close the connection

In [20]:
conn.session.shutdown()
conn.cluster.shutdown()