In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

# Data Source Preparation

### Extract data from CSV

In [2]:
# CSV file name of our data source
# Note: We'll extract data from various CSV files and load it into a single (destination) CSV file.
filename = 'event_datafile_new.csv'

In [20]:
def merge_csv(filename):
    """Extracts and load data from various CSV files into a single (destination) CSV file.
    
    Args:
        filename: The filename of a destination CSV file. 
    
    Returns:
        None
    """        
    
    print('Extracting data from CSV files started...')
    
    # Get the current folder and subfolder event data
    filepath = os.path.join(os.getcwd(), 'event_data')
    
    print(f' > filepath: {filepath}')
   
    # Create a for loop to create a list of files and collect each filepath
    for root, dirs, files in os.walk(filepath):
        # join the file path and roots with the subdirectories using glob
        csv_list = glob.glob(os.path.join(root, '*'))
       
    print(f' > number of csv files: {len(csv_list)}')
    
    # Initiating an empty list of rows that will be generated from each file
    rows = [] 
    
    # Extract each row in every file and append it to the rows list
    for f in csv_list:
        with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
            csvreader = csv.reader(csvfile) 
            next(csvreader)      
    
            # Append all rows
            for line in csvreader:
                rows.append(line) 
            
    print(f' > total rows: {len(rows)}')

    # creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
    # Apache Cassandra tables
    csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

    # Create new CSV file that will contain all rows from event_data CSV files
    with open(filename, 'w', encoding = 'utf8', newline='') as f:
        writer = csv.writer(f, dialect='myDialect')
        writer.writerow(['artist', 'firstName', 'gender', 'itemInSession', 'lastName', 'length', \
                         'level', 'location', 'sessionId', 'song', 'userId'])
        
        # Write rows    
        for row in rows:
            if (row[0] == ''):
                continue
            writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

    print(f' > destination CSV file: {filename}')        
            
    # Check total number of rows in newly created CSV file
    with open(filename, 'r', encoding = 'utf8') as f:
        print(f' > total rows in destination CSV file: {sum(1 for line in f)}')
            
    print('Extracting data from CSV files finished.')            


In [21]:
merge_csv(filename)

Extracting data from CSV files started...
 > filepath: D:\udacity\Data Engineering\project 2\event_data
 > number of csv files: 30
 > total rows: 8056
 > destination CSV file: event_datafile_new.csv
 > total rows in destination CSV file: 6821
Extracting data from CSV files finished.


### Load data from CSV into pandas dataframe

In [22]:
filepath = os.path.join(os.getcwd(), filename)
df = pd.read_csv(filename)
df.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Des'ree,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",139,You Gotta Be,8
1,Mr Oizo,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",139,Flat 55,8
2,Tamba Trio,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",139,Quem Quiser Encontrar O Amor,8
3,The Mars Volta,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",139,Eriatarka,8
4,Infected Mushroom,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",139,Becoming Insane,8


In [23]:
# Check shape
# Note: The dataset rowcount is equal to CSV rowcount minus 1 (header row)!
df.shape

(6820, 11)

In [24]:
# Check data types
df.dtypes

artist            object
firstName         object
gender            object
itemInSession      int64
lastName          object
length           float64
level             object
location          object
sessionId          int64
song              object
userId             int64
dtype: object

# Data Modeling by Apache Cassandra

### Common functions

In [97]:
def execute_query(session, query):
    """Execute query.
    
    Args:
        session: The Apache Cassandra session object.
        query (str): The query to execute.
    
    Returns:
        result: The cassandra.cluster.ResultSet object.
    """           
    try: 
        return session.execute(query)
    except Exception as e:
        print(e)    

        
def get_rowcount(session, table_name):
    """Returns COUNT of a given table.
    
    Args:
        session: The Apache Cassandra session object.
        table_name (str): The target table name.
    
    Returns:
        int: The rowcount value.
    """           
    query = f'SELECT COUNT(*) FROM {table_name};'
    result = execute_query(session, query)
    
    return result.current_rows[0].count


### Create cluster

In [93]:
from cassandra.cluster import Cluster
try: 
    cluster = Cluster(['127.0.0.1', '9042'])
    session = cluster.connect()
except Exception as e:
    print(e)

### Create and set keyspace (sparkify)

In [94]:
try:
    query = """
        CREATE KEYSPACE IF NOT EXISTS sparkify 
        WITH REPLICATION = 
        { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
    """
    session.execute(query)
    session.set_keyspace('sparkify')
except Exception as e:
    print(e)

### Queries

We want to create a model that will be suited to the following quieries:

[Table #1 - *session_items*]

1. Give me the **artist**, **song** title and song's **length** 
   that was heard during **sessionId** = 338 and **itemInSession** = 4.
   
[Table #2 - *user_sessions*]

2. Give me only the following: *artist*, *song* (sorted by *itemInSession*) and *user* (*first* and *last name*) 
   for **userid** = 10, **sessionid** = 182.
   
[Table #3 - *song_users*]

3. Give me every *user name* (*first* and *last*) 
   who listened to the **song** = 'All Hands Against His Own'.

### Create tables and insert data

We'll use the naming convention as describe on the apacha cassandra official site (https://cassandra.apache.org/doc/latest/data_modeling/data_modeling_logical.html).

#### Table #1: *songs_by_session_item*

Query description:

> Give me the **artist**, **song** title and song's **length** that was heard during **sessionId** = 338 and **itemInSession** = 4.

Query:

> ```SELECT artist, song, length FROM songs_by_session_item WHERE session_id = 338 and item_session = 4```

Explanation:

> The query returns the information about the *song* and its *artist* filtered by the *session item*. Since the "artist" information is included in the **song entity** we'll denominate the table as "songs" table. We'll also use the suffix "_by_session_item" to refer to the session item which is uniquelly identified by *session_id* and *session_item* column. Since one session item represents one song listened by user the PRIMARY KEY will be composed by **session_id** and **session_item** where session_id will be the PARTITION KEY and the session_item will be the CLUSTERING KEY.

Columns:
 - *session_id* (PK: PARTITION KEY, int)
 - *session_item* (PK: CLUSTERING KEY, int)
 - *artist* (text)
 - *song* (text)
 - *length* (float)

##### PRIMARY KEY check

In [67]:
# Use dataframe to check uniqueness of the PRIMARY KEY(session_id, session_item) - OK
df[['sessionId', 'itemInSession']].drop_duplicates().shape

(6820, 2)

##### Create

In [72]:
query = """
    CREATE TABLE IF NOT EXISTS songs_by_session_item (
        session_id int,
        session_item int,
        artist text,
        song text,
        length float,
        PRIMARY KEY ((session_id), session_item)
    );
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

##### Insert

In [84]:
query = """
    INSERT INTO songs_by_session_item
    (session_id, session_item, artist, song, length)
    VALUES (%s, %s, %s, %s, %s);
"""
# define progress variables
total = df.shape[0]          # total rows
percentage = 10;             # show progress on every 10% completed
portion = int(total * 0.1)   # 10% portion

print('Insert into songs_by_session_item table started. Please wait...')

try:
    # loop through all dataframe rows
    for ix, item in df.iterrows():
        # insert
        session.execute(query, \
                        (item.sessionId, item.itemInSession, item.artist, item.song, item.length))
    
        # show progress
        if (ix+1) % portion == 0:
            print(f' > {percentage}% done.')
            percentage += 10

    print('Insert completed.') 
except Exception as e:
    print(e)

Insert into songs_by_session_item table started. Please wait...
 > 10% done.
 > 20% done.
 > 30% done.
 > 40% done.
 > 50% done.
 > 60% done.
 > 70% done.
 > 80% done.
 > 90% done.
 > 100% done.
Insert completed.


##### Validate

In [74]:
# Check row count - should be 6820!
count = get_rowcount(session, 'songs_by_session_item')
total = len(df)
if count == total:
    status = 'OK'
else:
    status = 'NOT OK'
    
print(f"{status}: {count} of {total}")

OK: 6820 of 6820


In [75]:
# Test query #1
query1 = """
    SELECT artist, song, length 
    FROM songs_by_session_item 
    WHERE session_id = 338 and session_item = 4;
"""
data = []
result = execute_query(session, query1)
for row in result:
    data.append(row)
    
pd.DataFrame(data)

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


In [76]:
# Check it using pandas dataframe - OK!
df[(df.sessionId == 338) & (df.itemInSession == 4)] \
    .loc[:, ['artist', 'song', 'length']]

Unnamed: 0,artist,song,length
964,Faithless,Music Matters (Mark Knight Dub),495.3073


#### Table #2: *songs_by_session*

Query description:

> Give me only the following: *artist*, *song* (sorted by *itemInSession*) and *user* (*first* and *last name*) for **userid** = 10, **sessionid** = 182.

Query:

> ```SELECT artist, song, first_name, last_name FROM songs_by_session WHERE user_id = 10 and session_id = 182```

Explanation:

> The query returns the information about the *song*, its *artist* and the *user* filtered by the *session*. Since the "artist" information and the "user" information is included in the **song entity** we'll denominate the second table as "songs" table. Will use the suffix "_by_session" to refer to the session  which is uniquelly identified by *user_id* and *session_id* column. Since the songs must be ordered by *itemInSession* (=session_item) column we'll also include the *session_item* column as CLUSTERING KEY in the primary key. 

Columns:
 - *user_id* (PK: PARTITION KEY, int)
 - *session_id* (PK: PARTITION KEY, int)
 - *session_item* (PK: CLUSTERING KEY, int)
 - *artist* (text)
 - *song* (text)
 - *first_name* (text)
 - *last_name* (text)

##### PRIMARY KEY check

In [78]:
# Use dataframe to check the uniqueness of PRIMARY KEY(user_id, session_id, session_item) - OK
df[['userId', 'sessionId', 'itemInSession']].drop_duplicates().shape

(6820, 3)

We could have skipped this check since already the compound key (*session_id*, *session_item*) is unique in our dataset.

##### Create

In [79]:
query = """
    CREATE TABLE IF NOT EXISTS songs_by_session (
        user_id int,
        session_id int,
        session_item int,
        artist text,
        song text,
        first_name text,
        last_name text,
        PRIMARY KEY ((user_id, session_id), session_item)
    );
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

##### Insert

In [80]:
query = """
    INSERT INTO songs_by_session
    (user_id, session_id, session_item, artist, song, first_name, last_name)
    VALUES (%s, %s, %s, %s, %s, %s, %s);
"""
# define progress variables
total = df.shape[0]          # total rows
percentage = 10;             # show progress on every 10% completed
portion = int(total * 0.1)   # 10% portion

print('Insert into songs_by_session table started. Please wait...')

try:
    # loop through all dataframe rows
    for ix, item in df.iterrows():
        # insert
        session.execute(query, \
            (item.userId, item.sessionId, item.itemInSession, \
             item.artist, item.song, item.firstName, item.lastName))
    
        # show progress
        if (ix+1) % portion == 0:
            print(f' > {percentage}% done.')
            percentage += 10

    print('Insert completed.') 
except Exception as e:
    print(e)

Insert into songs_by_session table started. Please wait...
 > 10% done.
 > 20% done.
 > 30% done.
 > 40% done.
 > 50% done.
 > 60% done.
 > 70% done.
 > 80% done.
 > 90% done.
 > 100% done.
Insert completed.


##### Validate

In [81]:
# Check row count - should be 6820!
count = get_rowcount(session, 'songs_by_session')
total = len(df)
if count == total:
    status = 'OK'
else:
    status = 'NOT OK'
    
print(f"{status}: {count} of {total}")

OK: 6820 of 6820


In [82]:
# Test query #2
# Note: We'll include session_item column to check the ordering!
query2 = """
    SELECT session_item, artist, song, first_name, last_name 
    FROM songs_by_session
    WHERE user_id = 10 and session_id = 182;
"""
data = []
result = execute_query(session, query2)
for row in result:
    data.append(row)

pd.DataFrame(data)

Unnamed: 0,session_item,artist,song,first_name,last_name
0,0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,1,Three Drives,Greece 2000,Sylvie,Cruz
2,2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


In [83]:
# Check it using pandas dataframe - OK!
df[(df.userId == 10) & (df.sessionId == 182)].sort_values(by='itemInSession') \
    .loc[:, ['artist', 'song', 'firstName', 'lastName']]

Unnamed: 0,artist,song,firstName,lastName
151,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
152,Three Drives,Greece 2000,Sylvie,Cruz
153,Sebastien Tellier,Kilometer,Sylvie,Cruz
154,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


#### Table #3: *users_by_song*

Query description:

> Give me all users (by *first* and *last name*) who listened to the **song** = 'All Hands Against His Own'.

##### Check user uniqueness

Since the query selects the user by *first name* and *last name* (if we read it and follow it literally) columns we must check if our dataset guarantee the user uniqueness using these two keys (*first name*, *last name*). Generally, it is not a good idea to bound the identity of a person to these two keys since two people can share the same last name and first name.

In [54]:
# Let us check if there are any duplicates by (lastName, firstName)
df_users = df[['userId', 'lastName', 'firstName']].drop_duplicates()
df_users.loc[df_users.duplicated(subset=['lastName', 'firstName']), ['lastName', 'firstName']]

Unnamed: 0,lastName,firstName
1343,Robinson,Ava


In [55]:
# Which users share the same names?
df_users[(df_users.lastName == 'Robinson') & (df_users.firstName == 'Ava')]

Unnamed: 0,userId,lastName,firstName
143,50,Robinson,Ava
1343,13,Robinson,Ava


So there are duplicates by user names in our dataset. Obviously, our dataset does not guarantee the uniqueness of a user by his/her first name and last name. It can only be guaranteed by the **userId** column, therefore, this column should always accompany the **firstName** and the **lastName** columns to provide the *unique user*.

Query:

> ```SELECT user_id, first_name, last_name FROM users_by_song WHERE song = 'All Hands Against His Own'```

Explanation:

> The query returns the information about *users* filtered by *song*, thus we'll use the name "users_by_song" for our table. The PRIMARY KEY will consist of the *song* column (PARTITION KEY) and the *user_id* column (CLUSTERING KEY). 

Columns:
 - *song* (PK: PARTITION KEY, text)
 - *user_id* (PK: CLUSTERING KEY, int)
 - *first_name* (text)
 - *last_name* (text)

##### Create

In [85]:
query = """
    CREATE TABLE IF NOT EXISTS users_by_song (
        song text,
        user_id int,
        first_name text,
        last_name text,
        PRIMARY KEY ((song), user_id)
    );
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

##### Insert

In [86]:
# Check how many unique rows do we obtain by the key (song, userId, firstName, lastName)
df_users_by_song = df[['song', 'userId', 'firstName', 'lastName']].drop_duplicates()
df_users_by_song.shape

(6618, 4)

Although it does not make much difference we prefer to use only the unique rows from *df_users_by_song*.

In [88]:
query = """
    INSERT INTO users_by_song
    (song, user_id, first_name, last_name)
    VALUES (%s, %s, %s, %s);
"""
# define progress variables
total = df_users_by_song.shape[0]   # total rows
percentage = 10;                 # show progress on every 10% completed
portion = int(total * 0.1)       # 10% portion

print('Insert into users_by_song table started. Please wait...')

try:
    # loop through the (unique) dataframe rows
    for ix, item in df_users_by_song.iterrows():
        # insert
        session.execute(query, \
            (item.song, item.userId, item.firstName, item.lastName))
    
        # show progress
        if (ix+1) % portion == 0:
            print(f' > {percentage}% done.')
            percentage += 10

    print('Insert completed.') 
except Exception as e:
    print(e)

Insert into users_by_song table started. Please wait...
 > 10% done.
 > 20% done.
 > 30% done.
 > 40% done.
 > 50% done.
 > 60% done.
 > 70% done.
 > 80% done.
 > 90% done.
 > 100% done.
Insert completed.


##### Validate

In [98]:
# Check row count - should be 6618!
count = get_rowcount(session, 'users_by_song')
total = len(df_users_by_song)
if count == total:
    status = 'OK'
else:
    status = 'NOT OK'
    
print(f"{status}: {count} of {total}")

OK: 6618 of 6618


In [95]:
# Test query #3
# Note: We'll include session_item column to check the ordering!
query3 = """
    SELECT user_id, first_name, last_name 
    FROM users_by_song
    WHERE song = 'All Hands Against His Own';
"""
data = []
result = execute_query(session, query3)
for row in result:
    data.append(row)

pd.DataFrame(data)

Unnamed: 0,user_id,first_name,last_name
0,29,Jacqueline,Lynch
1,80,Tegan,Levine
2,95,Sara,Johnson


In [91]:
# Check it using pandas dataframe - OK!
df[df.song == 'All Hands Against His Own'].sort_values(by='userId') \
    .loc[:, ['userId', 'firstName', 'lastName']] \
    .drop_duplicates()

Unnamed: 0,userId,firstName,lastName
2442,29,Jacqueline,Lynch
2646,80,Tegan,Levine
219,95,Sara,Johnson


### Drop tables and close session

In [206]:
# Drop all tables
try:
    session.execute('DROP TABLE IF EXISTS sessions;')
    session.execute('DROP TABLE IF EXISTS user_sessions;')
    session.execute('DROP TABLE IF EXISTS songs;')
except Exception as e:
    print(e)

In [92]:
session.shutdown()
cluster.shutdown()