# Part I: ETL pipeline for preprocessing the files

Importing python packages 

In [1]:
import os
import csv
import glob
import pandas as pd
import cassandra
from cassandra.cluster import Cluster

Creating list of filepaths to process original event csv data files

In [2]:
filepath = os.getcwd() + '/event_data'

for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))

Processing the files to create the data file csv that will be used for Casssandra tables

In [3]:
full_data_rows_list = [] 
    
for f in file_path_list:
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile:
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
        for line in csvreader:
            full_data_rows_list.append(line)


csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                     'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

Checking the number of rows in the csv file

In [4]:
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


# Part II: Data modeling with Cassandra

## Creating the keyspace

In [5]:
# Connecting to local cluster
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

In [6]:
session.execute('''
    CREATE KEYSPACE IF NOT EXISTS music_library
    WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}
''')

<cassandra.cluster.ResultSet at 0x7f7b4ad9a630>

In [7]:
session.set_keyspace('music_library')

## Creating the tables

Firstly, let's check the columns and the rows in the generated dataset

In [8]:
gen_df = pd.read_csv('event_datafile_new.csv')
gen_df.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Barry Tuckwell/Academy of St Martin-in-the-Fie...,Mohammad,M,0,Rodriguez,277.15873,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Horn Concerto No. 4 in E flat K495: II. Romanc...,88
1,Jimi Hendrix,Mohammad,M,1,Rodriguez,239.82975,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Woodstock Inprovisation,88
2,Building 429,Mohammad,M,2,Rodriguez,300.61669,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Majesty (LP Version),88
3,The B-52's,Gianna,F,0,Jones,321.54077,free,"New York-Newark-Jersey City, NY-NJ-PA",107,Love Shack,38
4,Die Mooskirchner,Gianna,F,1,Jones,169.29914,free,"New York-Newark-Jersey City, NY-NJ-PA",107,Frisch und g'sund,38


In [9]:
gen_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6820 entries, 0 to 6819
Data columns (total 11 columns):
artist           6820 non-null object
firstName        6820 non-null object
gender           6820 non-null object
itemInSession    6820 non-null int64
lastName         6820 non-null object
length           6820 non-null float64
level            6820 non-null object
location         6820 non-null object
sessionId        6820 non-null int64
song             6820 non-null object
userId           6820 non-null int64
dtypes: float64(1), int64(3), object(7)
memory usage: 586.2+ KB


### Query 1
Let's create a table to respond to this question:

> Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4

It's clear that we need the sessionId and the itemInSession in the primary key, and beacuse they're enough to keep every row unique we'll keep it just these ones. The partition key will be sessionId, and the clusterin column will be itemInSession.

**Table creation**

In [10]:
query = '''
    CREATE TABLE IF NOT EXISTS session_library (
        session_id bigint, 
        item_in_session int, 
        artist_name text, 
        song_title text, 
        song_length float,
        PRIMARY KEY (session_id, item_in_session)
    )  
'''

session.execute(query)

<cassandra.cluster.ResultSet at 0x7f7b4ac99e80>

**Data insertion**

In [11]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = """
            INSERT INTO session_library (
                session_id, 
                item_in_session, 
                artist_name, 
                song_title, 
                song_length
            )
        """
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

**Query test**

In [12]:
testquery = '''
    SELECT artist_name, song_title, song_length 
    FROM session_library 
    WHERE session_id=338 AND item_in_session=4;
'''

try:
    rows = session.execute(testquery)
except Exception as e:
    print(e)

for row in rows:
    print(row.artist_name, '--', row.song_title, '--', row.song_length)

Faithless -- Music Matters (Mark Knight Dub) -- 495.30731201171875


### Query 2
The second question to answer with a query is:

> Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In this case our partition key will be composed by two columns: userId and sessionId because the query will filter out using these ones. Besides, we'll have a itemInSession as clustering column because we need to sort by this column too.

**Table creation**

In [13]:
query = '''
    CREATE TABLE IF NOT EXISTS user_library (
        user_id bigint,
        session_id bigint, 
        item_in_session int,
        artist_name text,
        song_title text, 
        user_first_name text,
        user_last_name text,
        PRIMARY KEY ((user_id, session_id), item_in_session))
        WITH CLUSTERING ORDER BY (item_in_session ASC);
'''

session.execute(query)

<cassandra.cluster.ResultSet at 0x7f7b2b3666d8>

**Data insertion**

In [14]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = """
            INSERT INTO user_library (
                user_id, 
                session_id, 
                item_in_session, 
                artist_name,
                song_title, 
                user_first_name, 
                user_last_name
            )
        """
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

**Query test**

In [15]:
testquery = '''
    SELECT artist_name, song_title, user_first_name, user_last_name 
    FROM user_library 
    WHERE user_id = 10 AND session_id = 182
    ORDER BY item_in_session ASC;
'''

try:
    rows = session.execute(testquery)
except Exception as e:
    print(e)

for row in rows:
    print(row.artist_name, '--', row.song_title, '--', row.user_first_name, row.user_last_name)

Down To The Bone -- Keep On Keepin' On -- Sylvie Cruz
Three Drives -- Greece 2000 -- Sylvie Cruz
Sebastien Tellier -- Kilometer -- Sylvie Cruz
Lonnie Gordon -- Catch You Baby (Steve Pitron & Max Sanna Radio Edit) -- Sylvie Cruz


### Query 3
Finally, the last question is:

> Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

The song column will go in the partition key since it's the filter criteria, but to ensure the uniqueness of each record we need to add userId as clustering column since multiple users can listen to the same song.

**Table creation**

In [16]:
query = '''
    CREATE TABLE IF NOT EXISTS song_library (
        song_title text, 
        user_id bigint,
        session_id bigint, 
        item_in_session int,
        user_first_name text,
        user_last_name text,
        PRIMARY KEY (song_title, user_id)
    );
'''

session.execute(query)

<cassandra.cluster.ResultSet at 0x7f7b203f6e10>

**Data insertion**

In [17]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) 
    for line in csvreader:
        query = """
            INSERT INTO song_library (
                song_title, 
                user_id,
                session_id, 
                item_in_session,
                user_first_name,
                user_last_name
            )
        """
        query = query + "VALUES (%s, %s, %s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), int(line[8]), int(line[3]), line[1], line[4]))

**Query test**

In [18]:
testquery = '''
    SELECT user_first_name, user_last_name 
    FROM song_library 
    WHERE song_title = 'All Hands Against His Own';
'''

try:
    rows = session.execute(testquery)
except Exception as e:
    print(e)

for row in rows:
    print(row.user_first_name, '--', row.user_last_name)

Jacqueline -- Lynch
Tegan -- Levine
Sara -- Johnson


## Droping tables and closing connection

In [19]:
try:
    query = "DROP TABLE IF EXISTS session_library"
    rows = session.execute(query)
except Exception as e:
    print(e)

try:
    query = "DROP TABLE IF EXISTS user_library"
    rows = session.execute(query)
except Exception as e:
    print(e)
    
try:
    query = "DROP TABLE IF EXISTS song_library"
    rows = session.execute(query)
except Exception as e:
    print(e)

In [20]:
session.shutdown()
cluster.shutdown()