# Part I. ETL Pipeline for Pre-Processing the Files


### Import Python packages


In [1]:
import pandas as pd
import cassandra
import re
import os
import sys
import glob
import numpy as np
import json
import csv
import logging
import warnings

from tqdm.rich import tqdm
from rich import traceback
from pathlib import Path


In [2]:
src_path: str = "../src"
sys.path.append(src_path)
_ = traceback.install()
logging.basicConfig(force=True)
logging.getLogger().setLevel(logging.ERROR)
warnings.filterwarnings("ignore")


In [3]:
from cql_queries import *


In [4]:
data_path: Path = Path(os.getcwd()).parent.joinpath("event_data")


### Creating list of filepaths to process original event csv data files


In [5]:
file_path_list = [f.resolve() for f in data_path.glob("**/*-events.csv")]
file_path_list[:2]


[PosixPath('/home/uziel/Development/sparkify_cassandra/event_data/2018-11-21-events.csv'),
 PosixPath('/home/uziel/Development/sparkify_cassandra/event_data/2018-11-15-events.csv')]

### Processing the files to create the data file csv that will be used for Apache Casssandra tables

Only relevant columns are used, as well as those records without missing values (similar to the pre-selected rows in the original project description).


In [6]:
relevant_cols = [
    "sessionId",
    "itemInSession",
    "userId",
    "firstName",
    "lastName",
    "gender",
    "level",
    "location",
    "artist",
    "song",
    "length",
]
pd.concat([pd.read_csv(f, dtype=object) for f in file_path_list])[
    relevant_cols
].dropna().reset_index(drop=True).to_csv(data_path.joinpath("combined_event_data.csv"))


### Load combined data and check shape


In [7]:
data_df = pd.read_csv(
    data_path.joinpath("combined_event_data.csv"),
    index_col=0,
    quoting=csv.QUOTE_ALL,
)
print(data_df.shape)
data_df.head()


(6820, 11)


Unnamed: 0,sessionId,itemInSession,userId,firstName,lastName,gender,level,location,artist,song,length
0,774,4,80,Tegan,Levine,F,paid,"Portland-South Portland, ME",Facto Delafe y las flores azules,Enero en la playa,315.81995
1,671,0,97,Kate,Harrell,F,paid,"Lansing-East Lansing, MI",Kings Of Leon,Manhattan,204.2771
2,671,1,97,Kate,Harrell,F,paid,"Lansing-East Lansing, MI",Franz Ferdinand,Michael,204.12036
3,774,5,80,Tegan,Levine,F,paid,"Portland-South Portland, ME",Blue October,Drilled A Wire Through My Cheek,272.32608
4,671,2,97,Kate,Harrell,F,paid,"Lansing-East Lansing, MI",Elisa,Almeno Tu Nell'Universo,248.97261


# Part II. Creating query-optimized tables


#### Creating a Cluster


In [8]:
from cassandra.cluster import Cluster

try:
    cluster = Cluster(
        ["127.0.0.1"]
    )  # If you have a locally installed Apache Cassandra instance
    session = cluster.connect()
except Exception as e:
    print(e)


#### Create Keyspace


In [9]:
try:
    session.execute(
        """
        CREATE KEYSPACE IF NOT EXISTS sparkify
        WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
        """
    )

except Exception as e:
    print(e)


#### Set Keyspace


In [10]:
try:
    session.set_keyspace("sparkify")
except Exception as e:
    print(e)


## Create queries to ask the following three questions of the data


In [11]:
common_columns = [
    "sessionId int",
    "itemInSession int",
    "userId int",
    "firstName text",
    "lastName text",
    "gender text",
    "level text",
    "location text",
    "artist text",
    "song text",
    "length decimal",
]


### 1. Get artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

In [12]:
pk_cols = ("sessionId", "itemInSession")
sorted_cols = [*pk_cols, *(c for c in data_df.columns if c not in pk_cols)]


#### 1.1. Create table


In [13]:
session.execute(
    get_create_table_query(
        "session_library",
        common_columns + [f"PRIMARY KEY ({', '.join(pk_cols)})"],
    )
)


<cassandra.cluster.ResultSet at 0x7ff4bb1b8ee0>

#### 1.2. Insert rows


In [14]:
for index, row in tqdm(data_df.iterrows(), total=len(data_df)):
    try:
        session.execute(
            get_simple_insert_query("session_library", sorted_cols),
            tuple(row[sorted_cols].values),
        )
    except Exception as e:
        print(e)


Output()

#### 1.3. Select query


In [15]:
select_cols = ("artist", "song", "length")

rows = session.execute(
    get_simple_select_query(
        "session_library",
        select_cols,
        {"sessionId": 338, "itemInSession": 4},
    )
)

for row in rows:
    print([getattr(row, c.lower()) for c in select_cols])


['Faithless', 'Music Matters (Mark Knight Dub)', Decimal('495.3073')]


### 2. Get only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182


In [16]:
pk_cols = ("userId", "sessionId", "itemInSession")
sorted_cols = [*pk_cols, *(c for c in data_df.columns if c not in pk_cols)]


#### 2.1. Create table


In [17]:
session.execute(
    get_create_table_query(
        "user_library",
        common_columns + [f"PRIMARY KEY ({', '.join(pk_cols)})"],
    )
)


<cassandra.cluster.ResultSet at 0x7ff4a1b510f0>

#### 2.2. Insert rows


In [18]:
for index, row in tqdm(data_df.iterrows(), total=len(data_df)):
    try:
        session.execute(
            get_simple_insert_query("user_library", sorted_cols),
            tuple(row[sorted_cols].values),
        )
    except Exception as e:
        print(e)


Output()

#### 2.3. Select query


In [19]:
select_cols = ("artist", "song", "firstName", "lastName")

rows = session.execute(
    get_simple_select_query(
        "user_library",
        select_cols,
        {"userId": 10, "sessionId": 182},
    )
)

for row in rows:
    print([getattr(row, c.lower()) for c in select_cols])


['Down To The Bone', "Keep On Keepin' On", 'Sylvie', 'Cruz']
['Three Drives', 'Greece 2000', 'Sylvie', 'Cruz']
['Sebastien Tellier', 'Kilometer', 'Sylvie', 'Cruz']
['Lonnie Gordon', 'Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', 'Sylvie', 'Cruz']


### 3. Get every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'


In [20]:
pk_cols = ("song", "sessionId", "itemInSession")
sorted_cols = [*pk_cols, *(c for c in data_df.columns if c not in pk_cols)]


#### 3.1. Create table


In [21]:
session.execute(
    get_create_table_query(
        "song_library",
        common_columns + [f"PRIMARY KEY ({', '.join(pk_cols)})"],
    )
)


<cassandra.cluster.ResultSet at 0x7ff42fa7d030>

#### 3.2. Insert rows


In [22]:
for index, row in tqdm(data_df.iterrows(), total=len(data_df)):
    try:
        session.execute(
            get_simple_insert_query("song_library", sorted_cols),
            tuple(row[sorted_cols].values),
        )
    except Exception as e:
        print(e)


Output()

#### 3.3. Select query


In [23]:
select_cols = ("firstName", "lastName")

rows = session.execute(
    get_simple_select_query(
        "song_library",
        select_cols,
        {"song": "'All Hands Against His Own'"},
    )
)

for row in rows:
    print([getattr(row, c.lower()) for c in select_cols])


['Sara', 'Johnson']
['Jacqueline', 'Lynch']
['Tegan', 'Levine']


### Drop the tables before closing out the sessions


In [24]:
session.execute(get_drop_table_query("session_library"))
session.execute(get_drop_table_query("user_library"))
session.execute(get_drop_table_query("song_library"))


<cassandra.cluster.ResultSet at 0x7ff42c472da0>

### Close the session and cluster connection¶


In [25]:
session.shutdown()
cluster.shutdown()
