# ETL Pipeline for Pre-Processing the Files

### Install Python libraries

In [1]:
!pip install pandas cassandra-driver numpy



### Import Python packages

In [20]:
import csv
import os

### Creating list of filepaths to process original event csv data files

In [3]:
# Relative path (assuming your initial project structure)
data_folder_path = os.path.join(
    os.path.dirname(os.getcwd()), "data", "raw", "event_data"
)
# print(f'The constructed path is: {data_folder_path}')

if not os.path.exists(data_folder_path):
    raise FileNotFoundError(
        f'The directory "{data_folder_path}" was not found. Check the path'
    )

# Your data processing code here (example)
file_paths = []
for root, _, files in os.walk(data_folder_path):
    for file in files:
        file_paths.append(os.path.join(root, file))

print(f"Found {len(file_paths)} files in the event data folder")

Found 30 files in the event data folder


### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [4]:
# Initiating an empty list of rows that will be generated from each file
data_rows = []

# For every file in the file path list
for file_path in file_paths:
    # Reading .csv file
    with open(file_path, "r", encoding="utf8", newline="") as csv_file:
        # Creating a CSV reader object
        csv_reader = csv.reader(csv_file)
        next(csv_reader)  # Skip header

        # Extracting each data row one by one and append it
        for line in csv_reader:
            data_rows.append(line)

print(f"Total rows: {len(data_rows)}")
print(f"Sample data:\n {data_rows[:5]}")

Total rows: 8056
Sample data:
 [['', 'Logged In', 'Walter', 'M', '0', 'Frye', '', 'free', 'San Francisco-Oakland-Hayward, CA', 'GET', 'Home', '1.54092E+12', '38', '', '200', '1.54111E+12', '39'], ['', 'Logged In', 'Kaylee', 'F', '0', 'Summers', '', 'free', 'Phoenix-Mesa-Scottsdale, AZ', 'GET', 'Home', '1.54034E+12', '139', '', '200', '1.54111E+12', '8'], ["Des'ree", 'Logged In', 'Kaylee', 'F', '1', 'Summers', '246.30812', 'free', 'Phoenix-Mesa-Scottsdale, AZ', 'PUT', 'NextSong', '1.54034E+12', '139', 'You Gotta Be', '200', '1.54111E+12', '8'], ['', 'Logged In', 'Kaylee', 'F', '2', 'Summers', '', 'free', 'Phoenix-Mesa-Scottsdale, AZ', 'GET', 'Upgrade', '1.54034E+12', '139', '', '200', '1.54111E+12', '8'], ['Mr Oizo', 'Logged In', 'Kaylee', 'F', '3', 'Summers', '144.03873', 'free', 'Phoenix-Mesa-Scottsdale, AZ', 'PUT', 'NextSong', '1.54034E+12', '139', 'Flat 55', '200', '1.54111E+12', '8']]


In [5]:
# Creating a smaller "events.csv" file that will be used to insert data into the Apache Cassandra tables
csv.register_dialect("myDialect", quoting=csv.QUOTE_ALL, skipinitialspace=True)

event_file = os.path.join(os.path.dirname(os.getcwd()), "data", "events.csv")

with open(event_file, "w", encoding="utf8", newline="") as new_file:
    writer = csv.writer(new_file, dialect="myDialect")
    writer.writerow(
        [
            "artist",
            "firstName",
            "gender",
            "itemInSession",
            "lastName",
            "length",
            "level",
            "location",
            "sessionId",
            "song",
            "userId",
        ]
    )

    for row in data_rows:
        if row[0] == "":
            continue

        writer.writerow(
            (
                row[0],
                row[2],
                row[3],
                row[4],
                row[5],
                row[6],
                row[7],
                row[8],
                row[12],
                row[13],
                row[16],
            )
        )

In [6]:
# Checking the number of rows in new "events.csv" file
with open(event_file, "r", encoding="utf8") as file:
    print(sum(1 for line in file))

6821


### Working with <font color=red>events.csv</font> file, which contains the following columns: 
- artist 
- firstName
- gender
- itemInSession
- lastName
- length
- level
- location
- sessionId
- song
- userId

### Creating a Cluster

In [7]:
from cassandra.cluster import Cluster

try:
    # Connect to the local Cassandra cluster
    cluster = Cluster(["127.0.0.1"])  # Replace with your Cassandra node IPs if needed
    session = cluster.connect()

    print("Successfully connected to Cassandra!")
except Exception as e:
    print(f"Could not connect to Cassandra: {e}")

Successfully connected to Cassandra!


### Creating Keyspace

In [8]:
# Creating Keyspace
keyspace_query = """
    CREATE KEYSPACE IF NOT EXISTS sparkify 
    with REPLICATION = 
    {'class': 'SimpleStrategy', 'replication_factor': 1}
"""

try:
    session.execute(keyspace_query)
except Exception as e:
    print(f"Failed to create keyspace: {e}")

### Setting Keyspace

In [9]:
# Setting KEYSPACE to the keyspace specified above
session.set_keyspace("sparkify")

### Query 1
For query 1, we need a way to run query on sessionId and itemInSession. So, our primary key must have these columns. We can partition the data on sessionId.
Our Select query: SELECT artist, song, length FROM session_item WHERE sessionId = 338 AND itemInSession = 4;
Our Primary key will be (sessionId, itemInSession), where sessionId is the partition key and  itemInSession is the clustering column.

In [10]:
# Creating table for query_1
query_1 = """
    CREATE TABLE IF NOT EXISTS session_item(
        artist text,
        song text,
        length float,
        sessionId int,
        itemInSession int,
        PRIMARY KEY (sessionId, itemInSession)
    );
"""

try:
    session.execute(query_1)
except Exception as e:
    print(f"Table creation failed: {e}")

In [11]:
# Reading csv file and inserting rows into cassandra tables
with open(event_file, encoding="utf8") as csv_file:
    csv_reader = csv.reader(csv_file)
    next(csv_reader)  # Skip header

    for line in csv_reader:
        query = """
            INSERT INTO session_item
                (artist, song, length, sessionId, itemInSession) 
            VALUES
                (%s, %s, %s, %s, %s);
        """
        session.execute(
            query, (line[0], line[10], float(line[5]), int(line[8]), int(line[3]))
        )

#### Do a SELECT to verify that the data have been inserted into each table

In [12]:
# SELECT statement to verify the data was entered into the table
select_query_1 = """
    SELECT artist, song, length 
    FROM  session_item 
    WHERE sessionId = 338 
      AND itemInSession = 4
"""

try:
    rows = session.execute(select_query_1)
    for row in rows:
        print(row)
except Exception as e:
    print(e)

Row(artist='Faithless', song='50', length=495.30731201171875)


### Query 2
For query 2, we need a way to run query on sessionId and userId. Also, we need the data sorted on itemInSession. So, our primary key must have these columns. We can partition the data on a composite key (sessionId, userId).
Our Select query : SELECT artist, song, firstName, lastName FROM  user_session WHERE sessionId = 182 AND userId = 10;
Our Primary key will be ((sessionId, userId), itemInSession)), where (sessionId, userId) is the partition key and  itemInSession is the clustering column.
Also, we are using the clause - WITH CLUSTERING ORDER BY (itemInSession ASC), to sort our data based on itemInSession.

In [13]:
# Creating table for query2
query_2 = """
    CREATE TABLE IF NOT EXISTS user_session(
        sessionId int,
        userId int,
        artist text,
        song text,
        firstName text,
        lastName text,
        itemInSession int,
        PRIMARY KEY ((sessionId, userId), itemInSession)) WITH CLUSTERING ORDER BY (itemInSession ASC
    );
"""

try:
    session.execute(query_2)
except Exception as e:
    print(f"Table creation failed: {e}")

In [14]:
with open(event_file, encoding="utf8") as new_file:
    csv_reader = csv.reader(new_file)
    next(csv_reader)  # Skip header

    for line in csv_reader:
        query = """
            INSERT INTO user_session
                (sessionId, userId, artist, song, firstName, lastName, itemInSession)
            VALUES
                (%s, %s, %s, %s, %s, %s, %s);
        """
        session.execute(
            query,
            (
                int(line[8]),
                int(line[10]),
                line[0],
                line[9],
                line[1],
                line[4],
                int(line[3]),
            ),
        )

In [15]:
# SELECT statement to verify the data was entered into the table
select_query_2 = """
    SELECT artist, song, firstName, lastName 
    FROM user_session
    WHERE sessionId = 182
      AND userId = 10;
"""

try:
    rows = session.execute(select_query_2)
    for row in rows:
        print(row)
except Exception as e:
    print(e)

Row(artist='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')


## Query 3
For query 3, we need a way to run query on song. So, our primary key must have song. Also, the query should be such that it does not contain duplicate users for a song. So we need to model data in such a way that we don't allow duplicate users for a song in our table. This can be acheived by including userId in our primary key.
Our Select query: SELECT song, firstName, lastName FROM user_song WHERE song = 'All Hands Against His Own';
Our Primary key will be ((song), userId)), where song is the partition key and  userId is the clustering column.

In [16]:
# Creating table for query_3
query_3 = """
    CREATE TABLE IF NOT EXISTS user_song(
        song text,
        userId int,
        firstName text,
        lastName text,
        PRIMARY KEY ((song), userId)
    );
"""

try:
    session.execute(query_3)
except Exception as e:
    print(f"Table creation failed: {e}")

In [17]:
with open(event_file, encoding="utf8") as new_file:
    csv_reader = csv.reader(new_file)
    next(csv_reader)  # Skip header

    for line in csv_reader:
        query = """
            INSERT INTO user_song
                (song, userId, firstName, lastName)
            VALUES
                (%s, %s, %s, %s);
        """
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

In [18]:
# SELECT statement to verify the data was entered into the table
select_query_3 = """
    SELECT song, firstName, lastName
    FROM user_song
    WHERE song = 'All Hands Against His Own';
"""

try:
    rows = session.execute(select_query_3)
    for row in rows:
        print(row)
except Exception as e:
    print(e)

Row(artist='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')


### Drop the tables before closing out the sessions
If you want to use, change cell from "markdown" to "code"

session.execute("DROP TABLE IF EXISTS session_item")
session.execute("DROP TABLE IF EXISTS user_session")
session.execute("DROP TABLE IF EXISTS user_song")

### Close the session and cluster connection

In [19]:
session.shutdown()
cluster.shutdown()