#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [2]:
def event_data_path_list():
    # Get your current folder and subfolder event data
    filepath = os.getcwd() + '/event_data'

    # Create a for loop to create a list of files and collect each filepath
    for root, dirs, files in os.walk(filepath):
        file_path_list = glob.glob(os.path.join(root,'*'))
    return file_path_list

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [3]:
def combine_to_processed_data(file_path_list, output_path):
    # initiating an empty list of rows that will be generated from each file
    full_data_rows_list = [] 
        
    # for every filepath in the file path list 
    for f in file_path_list:

    # reading csv file 
        with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
            # creating a csv reader object 
            csvreader = csv.reader(csvfile) 
            next(csvreader)

            for line in csvreader:
                full_data_rows_list.append(line) 
                
    # creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
    # Apache Cassandra tables
    csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

    if not os.path.exists(os.path.dirname(output_path)):
        os.makedirs(os.path.dirname(output_path))

    with open(output_path, 'w', encoding = 'utf8', newline='') as f:
        writer = csv.writer(f, dialect='myDialect')
        writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                    'level','location','sessionId','song','userId'])
        for row in full_data_rows_list:
            if (row[0] == ''):
                continue
            writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

    # check the number of rows in your csv file
    with open(file, 'r', encoding = 'utf8') as f:
        print('Rows written in file: ', sum(1 for line in f))

In [4]:
file = 'processed_data/event_datafile_new.csv'
combine_to_processed_data(event_data_path_list(), file)

Rows written in file:  6821


#### Creating a Cluster

In [5]:
from cassandra.cluster import Cluster
cluster = Cluster()

session = cluster.connect()

#### Create Keyspace

In [6]:
def create_and_set_keyspace(keyspace_name):    
    cql_command = f"""
    CREATE KEYSPACE IF NOT EXISTS {keyspace_name} 
    WITH REPLICATION = 
    {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }}"""
    session.execute(cql_command)
    session.set_keyspace(keyspace_name)

#### Set Keyspace

In [7]:
create_and_set_keyspace('streaming_app')

## Following queries will answer the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




In [8]:
# Show the data
file = 'processed_data/event_datafile_new.csv'
df = pd.read_csv(file)
df.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Harmonia,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",583,Sehr kosmisch,26
1,The Prodigy,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",583,The Big Gundown,26
2,Train,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",583,Marry Me,26
3,Sony Wonder,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",597,Blackbird,61
4,Van Halen,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",602,Best Of Both Worlds (Remastered Album Version),80


In [9]:
# Check column uniqueness
{column: len(df[col].unique()) for column,col in zip(df, df.columns)}

{'artist': 3148,
 'firstName': 84,
 'gender': 2,
 'itemInSession': 123,
 'lastName': 86,
 'length': 3994,
 'level': 2,
 'location': 63,
 'sessionId': 776,
 'song': 5190,
 'userId': 96}

In [11]:
# Check uniqueness of PRIMARY KEY
unique_count_df = df[['sessionId', 'itemInSession']].drop_duplicates()
unique_count_df.shape[0]

6820

In [12]:
drop_query = 'drop table if exists session_history'
create_query = """
create table if not exists session_history
(
    session_id text, 
    item_in_session text,
    artist text,
    song_title text,
    song_length text,
    PRIMARY KEY (session_id, item_in_session)
)
"""
session.execute(drop_query)
session.execute(create_query)

<cassandra.cluster.ResultSet at 0x1177b7e48>

In [14]:
file = 'processed_data/event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "insert into session_history (session_id, item_in_session, artist, song_title, song_length)"
        query = query + " values (%s, %s, %s, %s, %s)"
        session.execute(query, (line[8], line[3], line[0], line[9], line[5]))

#### SELECT to verify that the data have been inserted into each table

In [15]:
## Query 1:  Give me the artist, song title and song's length in the music app history that was heard during \
## sessionId = 338, and itemInSession = 4
query_1 = """
select artist, song_title, song_length
from session_history
where session_id = '338'
and item_in_session = '4'
"""       

rows = session.execute(query_1)
for row in rows:
    print(row.artist, row.song_title, row.song_length)

Faithless Music Matters (Mark Knight Dub) 495.3073


In [16]:
# Check uniqueness of PRIMARY KEY
unique_count_df = df[['userId', 'sessionId', 'itemInSession']].drop_duplicates()
unique_count_df.shape[0]

6820

In [18]:
drop_query = 'drop table if exists user_history'
create_query = """
create table if not exists user_history
(
    user_id text,
    session_id text, 
    item_in_session text,
    artist text,
    first_name text,
    last_name text,
    song_title text,
    PRIMARY KEY (user_id, session_id, item_in_session)
)
"""
session.execute(drop_query)
session.execute(create_query)

<cassandra.cluster.ResultSet at 0x1170f6be0>

In [19]:
file = 'processed_data/event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "insert into user_history (user_id, session_id, item_in_session, artist, first_name, last_name, song_title)"
        query = query + " values (%s, %s, %s, %s, %s, %s, %s)"
        session.execute(query, (line[10], line[8], line[3], line[0], line[1], line[4], line[9]))

In [20]:
## Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)\
## for userid = 10, sessionid = 182
query_2 = """
select artist, song_title, first_name, last_name
from user_history
where session_id = '182'
and user_id = '10'
"""

rows = session.execute(query_2)
for row in rows:
    print(row.artist, row.song_title, row.first_name, row.last_name)

Down To The Bone Keep On Keepin' On Sylvie Cruz
Three Drives Greece 2000 Sylvie Cruz
Sebastien Tellier Kilometer Sylvie Cruz
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


In [21]:
# Check uniqueness of PRIMARY KEY
unique_count_df = df[['song', 'sessionId', 'itemInSession']].drop_duplicates()
unique_count_df.shape[0]

6820

In [22]:
drop_query = 'drop table if exists song_history'
create_query = """
create table if not exists song_history
(
    song_title text,
    session_id text, 
    item_in_session text,
    first_name text,
    last_name text,
    PRIMARY KEY (song_title, session_id, item_in_session)
)
"""
session.execute(drop_query)
session.execute(create_query)

<cassandra.cluster.ResultSet at 0x1177b7c50>

In [23]:
file = 'processed_data/event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "insert into song_history (song_title, session_id, item_in_session, first_name, last_name)"
        query = query + " values (%s, %s, %s, %s, %s)"
        session.execute(query, (line[9], line[8], line[3], line[1], line[4]))

In [24]:
## Query 3: Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
query_3 = """
select first_name, last_name
from song_history
where song_title = 'All Hands Against His Own'
"""

rows = session.execute(query_3)
for row in rows:
    print (row.first_name, row.last_name)

Sara Johnson
Jacqueline Lynch
Tegan Levine


### Drop the tables before closing out the sessions

In [25]:
tables = [
    'session_history',
    'user_history',
    'song_history'
]
for table in tables:
    session.execute(f'drop table if exists {table}')

### Close the session and cluster connection¶

In [26]:
session.shutdown()
cluster.shutdown()