# <center>Data Modelling with Apache Cassandra</center>

## Part I. ETL Pipeline for Pre-Processing the Files

### Import Python packages 

In [1]:
# Import Python packages 
from cassandra.cluster import Cluster
import os
import glob
import csv

### Create a list of file paths for each original event CSV data file

In [2]:
# Check the current working directory
print('Current Directory:', os.getcwd())

# Print the name of files sitting in current working directory
print('\nThe follwing files are included:')
for file in os.listdir():
    print(file)

Current Directory: /workspace/home

The follwing files are included:
event_data
Project_1B_ Project_Template.ipynb
images
.workspace-config.json
event_datafile_new.csv
.ipynb_checkpoints


In [3]:
# Get the file path to event data
filepath = os.path.join(os.getcwd(), 'event_data')

# Collect and print the file path of each event CSV file
files_list = glob.glob(os.path.join(filepath, '*.csv'))
            
print(files_list)

['/workspace/home/event_data/2018-11-13-events.csv', '/workspace/home/event_data/2018-11-23-events.csv', '/workspace/home/event_data/2018-11-19-events.csv', '/workspace/home/event_data/2018-11-16-events.csv', '/workspace/home/event_data/2018-11-20-events.csv', '/workspace/home/event_data/2018-11-27-events.csv', '/workspace/home/event_data/2018-11-11-events.csv', '/workspace/home/event_data/2018-11-05-events.csv', '/workspace/home/event_data/2018-11-03-events.csv', '/workspace/home/event_data/2018-11-09-events.csv', '/workspace/home/event_data/2018-11-01-events.csv', '/workspace/home/event_data/2018-11-30-events.csv', '/workspace/home/event_data/2018-11-29-events.csv', '/workspace/home/event_data/2018-11-22-events.csv', '/workspace/home/event_data/2018-11-17-events.csv', '/workspace/home/event_data/2018-11-24-events.csv', '/workspace/home/event_data/2018-11-26-events.csv', '/workspace/home/event_data/2018-11-10-events.csv', '/workspace/home/event_data/2018-11-02-events.csv', '/workspace

### Process the individual event files to consolidate data into a single streamlined CSV file for modelling in Apache Casssandra

In [4]:
# Check a sample CSV file to see the first 5 rows
!head -n 5 '/workspace/home/event_data/2018-11-13-events.csv'

artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
,Logged In,Kevin,M,0,Arellano,,free,"Harrisburg-Carlisle, PA",GET,Home,1.54001E+12,514,,200,1.54207E+12,66
Fu,Logged In,Kevin,M,1,Arellano,280.05832,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1.54001E+12,514,Ja I Ty,200,1.54207E+12,66
,Logged In,Maia,F,0,Burke,,free,"Houston-The Woodlands-Sugar Land, TX",GET,Home,1.54068E+12,510,,200,1.54207E+12,51
All Time Low,Logged In,Maia,F,1,Burke,177.84118,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1.54068E+12,510,A Party Song (The Walk of Shame),200,1.54207E+12,51


In [5]:
# Initialize an empty list to hold all data rows
data_rows = []
# Initialize an empty list to hold the headers
headers = []

# Iterate through each file in the list of files
for file in files_list:
    # Open the current file in read mode with UTF-8 encoding
    with open(file, 'r', encoding='utf-8') as csvfile:
        # Read all rows from the CSV file into a list
        rows = list(csv.reader(csvfile, delimiter=',', quotechar='"'))
        # If headers have not been set yet, set them to the first row of the current file
        if not headers:
            headers = rows[0]
        # Iterate through all rows except the header row
        for row in rows[1:]:
            # Check if the first element of the row is not an empty string
            if row[0]:
                # If not empty, append the row to the data_rows list
                data_rows.append(row)

# Open a new file in write mode with UTF-8 encoding to save the combined data
with open(
    'event_datafile_new.csv',
    'w',
    newline='',
    encoding='utf-8',
) as newfile:
    # Create a CSV writer object
    writer = csv.writer(
        newfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL
    )
    # Write the headers to the new file
    writer.writerow(headers)
    # Write each data row to the new file
    for row in data_rows:
        writer.writerow(row)

In [6]:
# Preview the first 5 rows of the consolidated event file
!head -n 5 'event_datafile_new.csv'

artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
Fu,Logged In,Kevin,M,1,Arellano,280.05832,free,"Harrisburg-Carlisle, PA",PUT,NextSong,1.54001E+12,514,Ja I Ty,200,1.54207E+12,66
All Time Low,Logged In,Maia,F,1,Burke,177.84118,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1.54068E+12,510,A Party Song (The Walk of Shame),200,1.54207E+12,51
Nik & Jay,Logged In,Wyatt,M,0,Scott,196.51873,free,"Eureka-Arcata-Fortuna, CA",PUT,NextSong,1.54087E+12,379,Pop-Pop!,200,1.54208E+12,9
Quad City DJ's,Logged In,Chloe,F,0,Cuevas,451.44771,free,"San Francisco-Oakland-Hayward, CA",PUT,NextSong,1.54094E+12,506,C'mon N' Ride It (The Train) (LP Version),200,1.54208E+12,49


In [7]:
# Check the headers of the CSV file
headers

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userId']

In [8]:
# Check the number of rows in the consolidated event file
with open('event_datafile_new.csv', 'r', encoding='utf-8') as file:
    print(
        len(file.readlines())
    )

6821


## Part II. Data Modelling in Apache Cassandra

**Now the CSV file titled <font color=red>event_datafile_new.csv</font> is ready to work with, located within the current working directory.  The <font color=red>event_datafile_new.csv</font> contains the following columns:**
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data appears like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

![image](https://github.com/user-attachments/assets/5a2cd3d4-b1d1-4052-837b-8e7ade421e31)

### Create a Cluster

In [9]:
try:
    # Connect to the Cassandra instance on the local machine
    cluster = Cluster(['127.0.0.1'])
    # Establish a session with the Cassandra cluster
    session = cluster.connect()
except Exception as e:
    print(e)

### Create Keyspace

In [10]:
try:
    # Execute a CQL command to create a new keyspace called "sparkifydb"
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS sparkifydb
        WITH REPLICATION = {
            'class': 'SimpleStrategy',
            'replication_factor': 1
        }
    """)
except Exception as e:
    print(e)

### Set Keyspace

In [11]:
try:
    # Set the active keyspace for the session to 'sparkifydb'
    session.set_keyspace('sparkifydb')
except Exception as e:
    print(e)

### Create queries to ask the following three questions of the data

**1. Get the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession = 4**

```SQL
SELECT artist, song, length 
FROM song_table 
WHERE sessionId = 338 AND itemInSession = 4
```


**2. Get only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182**
    
```SQL
SELECT artist, song, user
FROM artist_table 
WHERE userId = 10 AND sessionId = 182
```

**3. Get every user name (first and last) in the music app history who listened to the song 'All Hands Against His Own'**

```SQL
SELECT user
FROM user_table
WHERE song = 'All Hands Against His Own'
```

### Data Modelling for Query 1
Get the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

```SQL
SELECT artist, song, length 
FROM song_table 
WHERE sessionId = 338 AND itemInSession = 4
```

The data will be **partitioned** by `sessionId` with a **clustering** column of `itemInSession`

#### Create a Table named `song_table` based on the Query 1

In [12]:
# Create song_table for Query 1
try:
    session.execute("""
        CREATE TABLE IF NOT EXISTS song_table (
            sessionId            int,
            itemInSession        int,
            artist               text,
            song                 text,
            length               float,
            PRIMARY KEY ((sessionId), itemInSession)
        )
    """)
except Exception as e:
    print(e)

#### Load Data into `song_table`

In [13]:
# Specify the columns we want to extract from the CSV file
target_cols = ['sessionId', 'itemInSession', 'artist', 'song', 'length']

# Define the SQL INSERT query to insert data into the 'song_table'
insert_query = """
    INSERT INTO song_table (sessionId, itemInSession, 
                            artist, song, length)
    VALUES
    (%s, %s, %s, %s, %s)
"""

# Open the consolidated CSV file in read mode with UTF-8 encoding
with open('event_datafile_new.csv', 'r', encoding='utf-8') as file:
    # Read all rows from the CSV file into a list
    rows = list(csv.reader(file, delimiter=',', quotechar='"'))
    
    # Extract the headers (column names) from the first row
    headers = rows[0]

    # Find the indices of the target columns within the headers
    target_col_indices = [headers.index(col) for col in target_cols]

    # Iterate over each row in the CSV file, starting from the second row
    for row in rows[1:]:
        # Extract the relevant column values from each row using the indices
        sessionId, itemInSession, artist, song, length = [
            row[i] for i in target_col_indices
        ]

        # Execute the SQL INSERT query with the extracted values
        session.execute(
            insert_query,(
                int(sessionId), 
                int(itemInSession), 
                artist, 
                song, 
                float(length),
            )
        )

    # Print a message indicating that data loading is complete
    print(f'Data successfully loaded to the song_table!')

Data successfully loaded to the song_table!


#### Run a SELECT query to confirm that the data has been successfully inserted into the `song_table`

In [14]:
# Run a SELECT query to verify the data was loaded into the table
try:
    rows = session.execute("""
        SELECT artist, song, length
        FROM song_table
        WHERE sessionId = 338
        AND itemInSession = 4
    """)
except Exception as e:
    print(e)
    
for row in rows:
    print((row.artist, row.song, row.length))

('Faithless', 'Music Matters (Mark Knight Dub)', 495.30731201171875)


### Data Modelling for Query 2

Get only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    
```SQL
SELECT artist, song, user 
FROM artist_table 
WHERE userId = 10 AND sessionId = 182
```

Since the `song` column is to be sorted by `itemInSession`, we should **partition** data using a **composite partition key** of `userId` and `sessionId`. This way, within each partition, the data will be sorted by the **clustering** column `itemInSession`.

#### Create a Table named `artist_table` based on the Query 2

In [15]:
# Create artist_table for Query 2
try:
    session.execute("""
        CREATE TABLE IF NOT EXISTS artist_table (
            userId            int,
            sessionId         int,
            itemInSession     int,
            song              text,
            artist            text,
            user              text,
            PRIMARY KEY ((userId, sessionId), itemInSession)
        )
    """)
except Exception as e:
    print(e)     

#### Load Data into `artist_table`

In [16]:
# Specify the columns we want to extract from the CSV file
target_cols = ['userId', 'sessionId', 'itemInSession', 
               'song', 'artist', 'firstName', 'lastName']

# Define the SQL INSERT query to insert data into the 'artist_table'
insert_query = """
    INSERT INTO artist_table (userId, sessionId, itemInSession, 
                              song, artist, user)
    VALUES
    (%s, %s, %s, %s, %s, %s)
"""

# Open the consolidated CSV file in read mode with UTF-8 encoding
with open('event_datafile_new.csv', 'r', encoding='utf-8') as file:
    # Read all rows from the CSV file into a list
    rows = list(csv.reader(file, delimiter=',', quotechar='"'))
    
    # Extract the headers (column names) from the first row
    headers = rows[0]

    # Find the indices of the target columns within the headers
    target_col_indices = [headers.index(col) for col in target_cols]

    # Iterate over each row in the CSV file, starting from the second row
    for row in rows[1:]:
        # Extract the relevant column values from each row using the indices
        userId, sessionId, itemInSession, song, artist, firstName, lastName = [
            row[i] for i in target_col_indices
        ]

        # Execute the SQL INSERT query with the extracted values
        session.execute(
            insert_query,(
                int(userId),
                int(sessionId),
                int(itemInSession),
                song,
                artist,
                firstName + ' ' + lastName,  # Concatenate the first name and last name together to form the complete name
            )
        )

    # Print a message indicating that data loading is complete
    print(f'Data successfully loaded to the artist_table!')

Data successfully loaded to the artist_table!


#### Run a SELECT query to confirm that the data has been successfully inserted into the `artist_table`

In [17]:
# Run a SELECT query to verify the data was loaded into the table
try:
    rows = session.execute("""
        SELECT artist, song, user 
        FROM artist_table 
        WHERE userId = 10 AND sessionId = 182
    """)
except Exception as e:
    print(e)
    
for row in rows:
    print((row.artist, row.song, row.user))

('Down To The Bone', "Keep On Keepin' On", 'Sylvie Cruz')
('Three Drives', 'Greece 2000', 'Sylvie Cruz')
('Sebastien Tellier', 'Kilometer', 'Sylvie Cruz')
('Lonnie Gordon', 'Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', 'Sylvie Cruz')


### Data Modelling for Query 3

Get every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

```SQL
SELECT user 
FROM user_table
WHERE song = 'All Hands Against His Own'
```

The data will be **partitioned** by `song`, with `userId` as the **clustering** column, to uniquely identify the users who listen to the same song.

#### Create a Table named `user_table` based on the Query 3

In [18]:
# Create user_table for Query 3
try:
    session.execute("""
        CREATE TABLE IF NOT EXISTS user_table (
            song            text,
            userId          int,
            user            text,
            PRIMARY KEY ((song), userId)
        )
    """)
except Exception as e:
    print(e)     

#### Load Data into `user_table`

In [19]:
# Specify the columns we want to extract from the CSV file
target_cols = ['song', 'userId', 'firstName', 'lastName']

# Define the SQL INSERT query to insert data into the 'user_table'
insert_query = """
    INSERT INTO user_table (song, userId, user)
    VALUES
    (%s, %s, %s)
"""

# Open the consolidated CSV file in read mode with UTF-8 encoding
with open('event_datafile_new.csv', 'r', encoding='utf-8') as file:
    # Read all rows from the CSV file into a list
    rows = list(csv.reader(file, delimiter=',', quotechar='"'))
    
    # Extract the headers (column names) from the first row
    headers = rows[0]

    # Find the indices of the target columns within the headers
    target_col_indices = [headers.index(col) for col in target_cols]

    # Iterate over each row in the CSV file, starting from the second row
    for row in rows[1:]:
        # Extract the relevant column values from each row using the indices
        song, userId, firstName, lastName = [
            row[i] for i in target_col_indices
        ]

        # Execute the SQL INSERT query with the extracted values
        session.execute(
            insert_query,(
                song,
                int(userId),
                firstName + ' ' + lastName,   # Concatenate the first name and last name together to form the complete name
            )
        )

    # Print a message indicating that data loading is complete
    print(f'Data successfully loaded to the user_table!')

Data successfully loaded to the user_table!


#### Run a SELECT query to confirm that the data has been successfully inserted into the `user_table`

In [20]:
# Run a SELECT query to verify the data was loaded into the table
try:
    rows = session.execute("""
        SELECT user
        FROM user_table
        WHERE song = 'All Hands Against His Own'
    """)
except Exception as e:
    print(e)
    
for row in rows:
    print((row.user))

Jacqueline Lynch
Tegan Levine
Sara Johnson


### Drop the Tables Before Closing Out the Sessions

In [21]:
# Drop all tables before closing out the sessions
try:
    session.execute("""DROP TABLE IF EXISTS song_table""")
except Exception as e:
    print(e)
    
try:
    session.execute("""DROP TABLE IF EXISTS artist_table""")
except Exception as e:
    print(e)
    
try:
    session.execute("""DROP TABLE IF EXISTS user_table""")
except Exception as e:
    print(e)

### Close the Session and Cluster Connection¶

In [22]:
# Shut down the session to close the connection to the Cassandra database
session.shutdown()
# Shut down the cluster to close all connections associated with it
cluster.shutdown()