Mount Google Drive to access files

In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


Install Java

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Downloading and unpacking Apache Spark and Hadoop:

In [3]:
!wget https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz
!rm spark-3.5.3-bin-hadoop3.tgz   # Tidying up

--2025-03-23 21:12:09--  https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400864419 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.3-bin-hadoop3.tgz’


2025-03-23 21:12:24 (26.7 MB/s) - ‘spark-3.5.3-bin-hadoop3.tgz’ saved [400864419/400864419]



Install mysql

In [4]:
! apt-get update
! apt-get install mysql-server

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
0% [Connecting to archive.ubuntu.com (185.125.190.81)] [1 InRelease 14.2 kB/129 kB 11%] [Connected t                                                                                                    Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 http:

import modules, set up environment

In [5]:
# import random
import numpy as np

import tracemalloc
import time

import os
os.environ['MYSQL_PWD'] = "root"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

!pip install mysql-connector-python
import mysql.connector

!pip install pandas
import pandas as pd

Collecting mysql-connector-python
  Downloading mysql_connector_python-9.2.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (6.0 kB)
Downloading mysql_connector_python-9.2.0-cp311-cp311-manylinux_2_28_x86_64.whl (34.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m34.0/34.0 MB[0m [31m20.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.2.0


Install findspark to locate Spark on system

In [6]:
!pip install -q findspark
import findspark
findspark.init()

We can now import SparkSession from `pyspark.sql` to create our entry point to Spark.

**Note:** If we were running our session on a cluster we would need the master name as an argument for `master()` - i.e. yarn. However, we'll be working in standalone mode so we'll just use `local[x]` where x is an int value > 0. This represents how many partitions should be created when using RDD, DataFrame etc. This should ideally be the number of CPU cores we have so we'll use `local[*]` here to indicate that we want to use all cores.  

In [7]:
from pyspark.sql import SparkSession

try:
  spark.stop()
except:
  pass

spark = SparkSession.builder.master("local[*]") \
  .config("spark.driver.memory", "15g") \
  .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) #  This will format our output tables a bit nicer when not using the show() method
spark.conf.set("spark.storage.memoryFraction", "0.6")
spark

Start mysql server

In [8]:
!mysql --version
!service mysql start

mysql  Ver 8.0.41-0ubuntu0.22.04.1 for Linux on x86_64 ((Ubuntu))
 * Starting MySQL database server mysqld
   ...done.


If your output above contains "su: warning: cannot change directory to /nonexistent: No such file or directory", please run the code below and rerun the code above. Otherwise, please ignore.

In [9]:
!sudo service mysql stop
!sudo usermod -d /var/lib/mysql/ mysql
!sudo service mysql start
# force root to use mysql_native_password and sets passwd to root
# flush privileges to changes take place immediately
!mysql -e "ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'root'; FLUSH PRIVILEGES;"

 * Stopping MySQL database server mysqld
   ...done.
 * Starting MySQL database server mysqld
   ...done.


Helper methods / global variables

In [10]:
db_name = "big_five"
# atm sampling 200000 rows from original csv, seed is for reproducibility (same rows sampled)
seed = 1

def connect_to_mysql():
  # Create a connection to the MySQL server and use big five db
  conn = mysql.connector.connect(user="root", host="localhost", password="root")
  return conn

def create_cursor(conn):
  # Create a cursor to interact with the MySQL server
  cursor = conn.cursor()
  return cursor

# instantiate db
def connect_to_db(cursor, db_name):
  create_stmt = f"CREATE DATABASE IF NOT EXISTS {db_name}"
  use_stmt = f"USE {db_name}"
  cursor.execute(create_stmt)
  cursor.execute(use_stmt)

Change to directory containing important files

In [11]:
%cd /content/drive/Shareddrives/big-data-drive/big-data-project/

/content/drive/Shareddrives/big-data-drive/big-data-project


Create tables

In [12]:
# must connect and create cursor each time
# because at end of cell we close connections
conn = connect_to_mysql()
cursor = create_cursor(conn)
connect_to_db(cursor, db_name)

# create tables
table_creation_queries= [
    """
    CREATE TABLE IF NOT EXISTS user (
        user_id INT AUTO_INCREMENT PRIMARY KEY,
        dateload DATETIME,
        screenw INT,
        screenh INT,
        introelapse INT,
        testelapse INT,
        endelapse INT,
        IPC INT,
        country VARCHAR(2),
        latitude FLOAT,
        longitude FLOAT
    )
    """,
    """
    CREATE TABLE IF NOT EXISTS question (
        question_id INT AUTO_INCREMENT PRIMARY KEY,
        question_text VARCHAR(255),
        trait_category VARCHAR(255)
    )
    """,
    """
    CREATE TABLE IF NOT EXISTS response (
        user_id INT,
        question_id INT,
        response_value TINYINT,
        response_time INT,
        PRIMARY KEY (user_id, question_id),
        FOREIGN KEY (user_id) REFERENCES user(user_id),
        FOREIGN KEY (question_id) REFERENCES question(question_id)
    )
    """
]

# execute table creation queries
for query in table_creation_queries:
  cursor.execute(query)

# queries that return results need to be fetched before running another query
# could also close cursor
# cursor.execute("SHOW tables;")
# tables = cursor.fetchall()
# print("Tables: ", tables)

# unnecesary here because data definition commands (like create table)
# are automatically committed in MySQL
# however, conn.commit() ensures that changes made to DB persist
conn.commit()
# close cursor and connection to free memory, prevent limit errors, unintended locks, etc.
cursor.close()
conn.close

Insert into question table

In [13]:
conn = connect_to_mysql()
cursor = create_cursor(conn)
connect_to_db(cursor, db_name)
# setup question table, question text is not provided with csv
# so creating a dictionary instead to insert proper data
questions = {
    "EXT1": "I am the life of the party.",
    "EXT2":	"I don't talk a lot.",
    "EXT3":	"I feel comfortable around people.",
    "EXT4":	"I keep in the background.",
    "EXT5": "I start conversations.",
    "EXT6":	"I have little to say.",
    "EXT7":	"I talk to a lot of different people at parties.",
    "EXT8":	"I don't like to draw attention to myself.",
    "EXT9":	"I don't mind being the center of attention.",
    "EXT10": "I am quiet around strangers.",
    "EST1":	"I get stressed out easily.",
    "EST2":	"I am relaxed most of the time.",
    "EST3":	"I worry about things.",
    "EST4":	"I seldom feel blue.",
    "EST5":	"I am easily disturbed.",
    "EST6":	"I get upset easily.",
    "EST7":	"I change my mood a lot.",
    "EST8":	"I have frequent mood swings.",
    "EST9":	"I get irritated easily.",
    "EST10": "I often feel blue.",
    "AGR1":	"I feel little concern for others.",
    "AGR2":	"I am interested in people.",
    "AGR3":	"I insult people.",
    "AGR4":	"I sympathize with others' feelings.",
    "AGR5":	"I am not interested in other people's problems.",
    "AGR6":	"I have a soft heart.",
    "AGR7":	"I am not really interested in others.",
    "AGR8":	"I take time out for others.",
    "AGR9":	"I feel others' emotions.",
    "AGR10": "I make people feel at ease.",
    "CSN1":	"I am always prepared.",
    "CSN2": "I leave my belongings around.",
    "CSN3": "I pay attention to details.",
    "CSN4": "I make a mess of things.",
    "CSN5": "I get chores done right away.",
    "CSN6": "I often forget to put things back in their proper place.",
    "CSN7": "I like order.",
    "CSN8": "I shirk my duties.",
    "CSN9": "I follow a schedule.",
    "CSN10": "I am exacting in my work.",
    "OPN1": "I have a rich vocabulary.",
    "OPN2": "I have difficulty understanding abstract ideas.",
    "OPN3": "I have a vivid imagination.",
    "OPN4": "I am not interested in abstract ideas.",
    "OPN5": "I have excellent ideas.",
    "OPN6": "I do not have a good imagination.",
    "OPN7": "I am quick to understand things.",
    "OPN8": "I use difficult words.",
    "OPN9": "I spend time reflecting on things.",
    "OPN10": "I am full of ideas.",
}

insert_stmt = """INSERT INTO question (trait_category, question_text)
            VALUES (%s, %s)"""

for key, value in questions.items():
  data = (key, value)
  cursor.execute(insert_stmt, data)

conn.commit()
cursor.close()
conn.close()

Read csv file and filter for Null, NONE, NaN, etc. values

In [14]:
# read csv file into a pandas dataframe
df = pd.read_csv('data-final.csv', delimiter="\t")

# creates dataframe where values are TRUE if condition is true, false otherwise
boolean_df = df != "NONE"
# each row represented by true iff no values were NONE
row_mask = boolean_df.all(axis=1)
# filter the dataframe, remove rows that were FALSE
df = df[row_mask]

# np.where(select subset of columns, replace_val, columns to apply to)
df.loc[:, "EXT1_E":"OPN10_E"] = np.where(df.loc[:, "EXT1_E":"OPN10_E"] < 0, np.nan, df.loc[:, "EXT1_E":"OPN10_E"])

# remove rows with any NA, Null, NaN values
df = df.dropna(axis=0, how="any")

# take a sample of 300000 records for testing
df = df.sample(n=300000, random_state=seed)

Update data types to match database definition

In [15]:
# altering data types to expected values for database
df['dateload'] = pd.to_datetime(df['dateload']).dt.strftime('%Y-%m-%d %H:%M:%S')
df['screenw'] = df['screenw'].astype(int)
df['screenh'] = df['screenh'].astype(int)
df['introelapse'] = df['introelapse'].astype(int)
df['testelapse'] = df['testelapse'].astype(int)
df['endelapse'] = df['endelapse'].astype(int)
df['IPC'] = df['IPC'].astype(int)
df['country'] = df['country'].astype(str)
df['lat_appx_lots_of_err'] = pd.to_numeric(df['lat_appx_lots_of_err'], downcast='float')
df['long_appx_lots_of_err'] = pd.to_numeric(df['long_appx_lots_of_err'], downcast='float')

Add users to user table in batches

In [16]:
conn = connect_to_mysql()
cursor = create_cursor(conn)
connect_to_db(cursor, db_name)
# extract columns dateload to longitude from dataframe as a list of tuples
# df.loc[:, "dateload":"long_appx_lots_of_err"] selects all rows from these columns
# itertuples(index=False, name=None) returns tuples without index as an element and name
user_values = list(df.loc[:, 'dateload':'long_appx_lots_of_err'].itertuples(index=False, name=None))
# print(len(user_values))
user_insert = """
  INSERT INTO user (dateload, screenw, screenh, introelapse, testelapse, endelapse, IPC, country, latitude, longitude)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# insert in smaller batches to prevent errors
batch_size = 10000
for i in range(0, len(user_values), batch_size):
  batch = user_values[i : i + batch_size]
  # recreate cursor to reset state
  batch_cursor = create_cursor(conn)
  batch_cursor.executemany(user_insert, batch)
  conn.commit() # commit after each batch
  batch_cursor.close()

Add user_id column to dataframe

In [17]:
conn = connect_to_mysql()
cursor = create_cursor(conn)
connect_to_db(cursor, db_name)

cursor.execute("SELECT user_id FROM user")
user_tuples = cursor.fetchall()
# fetchall returns a list of tuples, the id is the first element in every tuple
user_ids = [user[0] for user in user_tuples]
# print(user_ids)
df["user_id"] = user_ids

conn.commit()
# read_cursor.close()
cursor.close()
conn.close()

Create a dictionary which maps trait to question_id

In [18]:
conn = connect_to_mysql()
cursor = create_cursor(conn)
connect_to_db(cursor, db_name)

# create new df with columns question_id and trait_category from question table
cursor.execute("SELECT question_id, trait_category FROM question")
question_data = cursor.fetchall()
question_df = pd.DataFrame(data=question_data, columns=["question_id", "trait_category"])
# print(question_df.head(5))
# creates a dictionary where key = trait, value = id
question_dict = dict(zip(question_df["trait_category"], question_df["question_id"]))

conn.commit()
cursor.close()
conn.close()

Create a melted dataframe for response value and question id

In [19]:
conn = connect_to_mysql()
cursor = create_cursor(conn)
connect_to_db(cursor, db_name)

# print(question_dict)
# # new dataframe corresponding to response values
df_responses = df.loc[:, 'EXT1':'OPN10']
df_responses['user_id'] = df['user_id']
# print(df_responses.head(5))

# # flatten our dataframe
# # user_id stays the same in each row
# # create a new column trait_category whose values will be the columns of the original df
# # create a new column response value whose values are the entries for each column
melted_responses = df_responses.melt(id_vars=["user_id"], var_name="trait_category", value_name="response_value")
# # add column question_id to response df by using trait category to get value from dictionary
melted_responses["question_id"] = melted_responses["trait_category"].map(question_dict)
# # drop question text from dataframe because it is no longer needed
melted_responses.drop(columns=["trait_category"], inplace=True)
# print(melted_responses.head(5))

conn.commit()
cursor.close()
conn.close()

Create a melted dataframe for question_id and response_time

In [20]:
conn = connect_to_mysql()
cursor = create_cursor(conn)
connect_to_db(cursor, db_name)

# now do the same for response time
df_response_time = df.loc[:, 'EXT1_E':'OPN10_E']
df_response_time["user_id"] = df['user_id']

melted_times = df_response_time.melt(id_vars=["user_id"], var_name="trait_time", value_name="response_time")
# keys of the dict are in form 'EXT1', 'OPN1', etc. must strip _E to use
melted_times["question_id"] = melted_times["trait_time"].str.rstrip('_E').map(question_dict)
melted_times.drop(columns=["trait_time"], inplace=True)
# melted_times["response_time"] = melted_times["response_time"].where(melted_times["response_time"] > 0, None)
# print(melted_times.head(5)

conn.commit()
cursor.close()
conn.close()

Merge response value and time dataframes together. Result is a dataframe in format needed to insert into response table.

In [21]:
# merge the two dataframes together
# final df contains (user_id, question_id, response_value, response_time) - everything necessary to insert into responses
response_df = pd.merge(melted_responses, melted_times, how="inner", on=["user_id", "question_id"])
# reindex so that each row is in format needed to insert into db
df_response_final = response_df.reindex(columns=["user_id", "question_id", "response_value", "response_time"])
print(df_response_final.shape)
print(df_response_final.head(5))

(15000000, 4)
   user_id  question_id  response_value  response_time
0        1            1             1.0        10264.0
1        2            1             3.0        10708.0
2        3            1             3.0         3154.0
3        4            1             4.0        20878.0
4        5            1             4.0        22060.0


In [22]:
# creating the list of valid insert statements
response_values = list(df_response_final.itertuples(index=False, name=None))

insert_stmt = """
  INSERT INTO response (user_id, question_id, response_value, response_time)
    VALUES (%s, %s, %s, %s)
"""

insert_set = [insert_stmt.replace("(%s, %s, %s, %s)", str(response)) for response in response_values]

In [23]:
# deleting unused variables to free up memory before Spark execution
del boolean_df
del row_mask
del df
del user_values
del user_insert
del user_ids
del question_data
del question_df
del question_dict
del df_responses
del melted_responses
del melted_times
del response_df
del df_response_final

In [24]:
# Improved execution function to perform insert statements in batches
def execute_sql_partition(statements):
    try:
        # convert itertools.chain to a list to ensure len can be used
        statements = list(statements)

        conn = connect_to_mysql()
        cursor = conn.cursor()
        connect_to_db(cursor, db_name)

        batch_size = 10000

        for i in range(0, len(statements), batch_size):
            try:
                batch = statements[i : i + batch_size]
                batch_cursor = create_cursor(conn)
                batch_cursor.executemany(insert_stmt, batch)
                conn.commit()
                batch_cursor.close()
            except Exception as e:
                print(f"Error executing statement: {batch}\nError: {e}")
                conn.rollback()  # Rollback only failed statements, not everything

        cursor.close()
        conn.close()
    except Exception as e:
        print(f"Critical error: {e}")
    finally:
        if 'conn' in locals() and conn:
            conn.close()

In [None]:
from pyspark import StorageLevel

# Start tracing memory allocation
tracemalloc.start()
# Start execution timer
start_time = time.time()

# create connection to db and mysql cursor
conn = connect_to_mysql()
cursor = create_cursor(conn)
connect_to_db(cursor, db_name)

sc = spark.sparkContext

# create an rdd from the insert set of responses, partition into 16 and store in memory and disk
rdd_insert = sc.parallelize(insert_set, 16).persist(StorageLevel.MEMORY_AND_DISK)

# execute insert statements, in theory populating the response table
rdd_insert.foreachPartition(execute_sql_partition)

conn.commit()
cursor.close()
conn.close()

# Get memory usage details
current, peak = tracemalloc.get_traced_memory()
end_time = time.time()  # Stop execution timer

# Print memory stats
print(f"Current memory usage: {current / 1024 ** 2:.2f} MB")
print(f"Peak memory usage: {peak / 1024 ** 2:.2f} MB")
print(f"Execution time: {end_time - start_time:.2f} seconds")

# Stop tracing
tracemalloc.stop()

In [None]:
# dump created database into a sql file stored in the drive
# password: root
!mysqldump -u root big_five > big_five.sql