# **COVENTRY UNIVERSITY**
CODE: CHARLES NWANKPA

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz


In [3]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz


In [4]:
!pip install -q findspark
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=827bb11b9ce128f9c4a5b3c40b157dd4a191709275c902688391d1a1aa63bbc1
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


Dataset Link [https://www.kaggle.com/datasets/ydalat/lifestyle-and-wellbeing-data]

## **Loading the data from the Orb using PySpark.**

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Lifestle & Wellbeing Analysis").getOrCreate()
orb = spark.read.csv('/content/lifestyle_wellbeing.csv', header=True, inferSchema=True)


In [6]:
orb.show()


+---------+--------------+------------+--------------+-----------+-----------------+--------------+-----------+--------+---------+--------------+----+-----------+-----------+-----------+-------------+--------------+-----------------+---------------+----------------+-----------------+----------+------+-----------------------+
|Timestamp|FRUITS_VEGGIES|DAILY_STRESS|PLACES_VISITED|CORE_CIRCLE|SUPPORTING_OTHERS|SOCIAL_NETWORK|ACHIEVEMENT|DONATION|BMI_RANGE|TODO_COMPLETED|FLOW|DAILY_STEPS|LIVE_VISION|SLEEP_HOURS|LOST_VACATION|DAILY_SHOUTING|SUFFICIENT_INCOME|PERSONAL_AWARDS|TIME_FOR_PASSION|WEEKLY_MEDITATION|       AGE|GENDER|WORK_LIFE_BALANCE_SCORE|
+---------+--------------+------------+--------------+-----------+-----------------+--------------+-----------+--------+---------+--------------+----+-----------+-----------+-----------+-------------+--------------+-----------------+---------------+----------------+-----------------+----------+------+-----------------------+
|   7/7/15|        

In [7]:
print(orb.count())


15972


In [8]:
#calculating the number of null values in each column
from pyspark.sql.functions import col, sum

orb.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in orb.columns)).show()


+---------+--------------+------------+--------------+-----------+-----------------+--------------+-----------+--------+---------+--------------+----+-----------+-----------+-----------+-------------+--------------+-----------------+---------------+----------------+-----------------+---+------+-----------------------+
|Timestamp|FRUITS_VEGGIES|DAILY_STRESS|PLACES_VISITED|CORE_CIRCLE|SUPPORTING_OTHERS|SOCIAL_NETWORK|ACHIEVEMENT|DONATION|BMI_RANGE|TODO_COMPLETED|FLOW|DAILY_STEPS|LIVE_VISION|SLEEP_HOURS|LOST_VACATION|DAILY_SHOUTING|SUFFICIENT_INCOME|PERSONAL_AWARDS|TIME_FOR_PASSION|WEEKLY_MEDITATION|AGE|GENDER|WORK_LIFE_BALANCE_SCORE|
+---------+--------------+------------+--------------+-----------+-----------------+--------------+-----------+--------+---------+--------------+----+-----------+-----------+-----------+-------------+--------------+-----------------+---------------+----------------+-----------------+---+------+-----------------------+
|        0|             0|           0| 

In [9]:
#computing basic statistics for numeric and string columns
orb.describe().show()


+-------+-----------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+------------+------+-----------------------+
|summary|  Timestamp|    FRUITS_VEGGIES|      DAILY_STRESS|    PLACES_VISITED|       CORE_CIRCLE|SUPPORTING_OTHERS|   SOCIAL_NETWORK|       ACHIEVEMENT|          DONATION|         BMI_RANGE|   TODO_COMPLETED|              FLOW|       DAILY_STEPS|      LIVE_VISION|       SLEEP_HOURS|     LOST_VACATION|    DAILY_SHOUTING|  SUFFICIENT_INCOME|   PERSONAL_AWARDS|  TIME_FOR_PASSION|WEEKLY_MEDITATION|         AGE|GENDER|WORK_LIFE_BALANCE_SCORE|
+-------+-----------+------------------+------------------+------------------+------------------+-----------------+-

In [10]:
#counting occurrences of unique values in the specified columns
orb.groupBy("GENDER").count().show()
orb.groupBy("AGE").count().show()


+------+-----+
|GENDER|count|
+------+-----+
|Female| 9858|
|  Male| 6114|
+------+-----+

+------------+-----+
|         AGE|count|
+------------+-----+
|  51 or more| 3390|
|    36 to 50| 4655|
|    21 to 35| 6108|
|Less than 20| 1819|
+------------+-----+



# Display the Orb's structure in a Schema

In [11]:
#displaying the schema in a tree format showing information on column names, data types, and nullability
orb.printSchema()


root
 |-- Timestamp: string (nullable = true)
 |-- FRUITS_VEGGIES: integer (nullable = true)
 |-- DAILY_STRESS: string (nullable = true)
 |-- PLACES_VISITED: integer (nullable = true)
 |-- CORE_CIRCLE: integer (nullable = true)
 |-- SUPPORTING_OTHERS: integer (nullable = true)
 |-- SOCIAL_NETWORK: integer (nullable = true)
 |-- ACHIEVEMENT: integer (nullable = true)
 |-- DONATION: integer (nullable = true)
 |-- BMI_RANGE: integer (nullable = true)
 |-- TODO_COMPLETED: integer (nullable = true)
 |-- FLOW: integer (nullable = true)
 |-- DAILY_STEPS: integer (nullable = true)
 |-- LIVE_VISION: integer (nullable = true)
 |-- SLEEP_HOURS: integer (nullable = true)
 |-- LOST_VACATION: integer (nullable = true)
 |-- DAILY_SHOUTING: integer (nullable = true)
 |-- SUFFICIENT_INCOME: integer (nullable = true)
 |-- PERSONAL_AWARDS: integer (nullable = true)
 |-- TIME_FOR_PASSION: integer (nullable = true)
 |-- WEEKLY_MEDITATION: integer (nullable = true)
 |-- AGE: string (nullable = true)
 |-- GE

# Checking for Unique Values

In [12]:
from pyspark.sql.types import StringType

#extracting the names of columns that are of string data type
string_columns = [f.name for f in orb.schema.fields if isinstance(f.dataType, StringType)]
print(string_columns)


['Timestamp', 'DAILY_STRESS', 'AGE', 'GENDER']


In [13]:
#Printing potential categorical columns based on their distinct values.

potential_categorical_columns = ['DAILY_STRESS', 'AGE', 'GENDER']

for col in potential_categorical_columns:
    print(f"Unique values for {col}:")
    orb.select(col).distinct().show(truncate=False)


Unique values for DAILY_STRESS:
+------------+
|DAILY_STRESS|
+------------+
|3           |
|0           |
|5           |
|1/1/00      |
|1           |
|4           |
|2           |
+------------+

Unique values for AGE:
+------------+
|AGE         |
+------------+
|51 or more  |
|36 to 50    |
|21 to 35    |
|Less than 20|
+------------+

Unique values for GENDER:
+------+
|GENDER|
+------+
|Female|
|Male  |
+------+



# Ordinal Encoding for the age range

In [14]:
from pyspark.sql.functions import when

#Encoding the 'AGE' column ordinally based on the age range
#Transform categorical string data into ordinal numerical data
#Added a new column to the orb called AGE_encoded

orb = orb.withColumn("AGE_encoded",
                   when(orb["AGE"] == "Less than 20", 0)  # if 'AGE' is "Less than 20", assign 0
                   .when(orb["AGE"] == "21 to 35", 1)     # else if 'AGE' is "21 to 35", assign 1
                   .when(orb["AGE"] == "36 to 50", 2)     # else if 'AGE' is "36 to 50", assign 2
                   .otherwise(3))                         # for all other cases (i.e., "51 or more"), assign 3


In [15]:
#viewing the old and new AGE column
#viewing the schema of the selected columns

age_check = orb.select("AGE", "AGE_encoded")
age_check.show()

age_check.printSchema()


+----------+-----------+
|       AGE|AGE_encoded|
+----------+-----------+
|  36 to 50|          2|
|  36 to 50|          2|
|  36 to 50|          2|
|51 or more|          3|
|51 or more|          3|
|51 or more|          3|
|51 or more|          3|
|  21 to 35|          1|
|  21 to 35|          1|
|51 or more|          3|
|  36 to 50|          2|
|  21 to 35|          1|
|  21 to 35|          1|
|  36 to 50|          2|
|  21 to 35|          1|
|51 or more|          3|
|  21 to 35|          1|
|  21 to 35|          1|
|  36 to 50|          2|
|  36 to 50|          2|
+----------+-----------+
only showing top 20 rows

root
 |-- AGE: string (nullable = true)
 |-- AGE_encoded: integer (nullable = false)



# Handling Outliers

In [16]:
from pyspark.sql.functions import mode

#If the value in the "DAILY_STRESS" column is "1/1/00", replace it with mode_value

# Calculate the mode
mode_value = orb.groupBy("DAILY_STRESS").count().orderBy("count", ascending=False).first()[0]

# Replace "1/1/00" with the mode
orb = orb.withColumn("DAILY_STRESS",
                   when(orb["DAILY_STRESS"] == "1/1/00", mode_value).otherwise(orb["DAILY_STRESS"]))

print("Unique values for DAILY_STRESS:")
orb.select("DAILY_STRESS").distinct().show(truncate=False)


Unique values for DAILY_STRESS:
+------------+
|DAILY_STRESS|
+------------+
|3           |
|0           |
|5           |
|1           |
|4           |
|2           |
+------------+



# String Indexing and Label Encoding

In [17]:
# Import necessary libraries and functions
from pyspark.ml.feature import StringIndexer

# Creating StringIndexers for columns that need to be encoded
stress_indexer = StringIndexer(inputCol="DAILY_STRESS", outputCol="DAILY_STRESS_encoded")
gender_indexer = StringIndexer(inputCol="GENDER", outputCol="GENDER_encoded")


# Applying the indexers, fit and transform the orb
orb_encoded = stress_indexer.fit(orb).transform(orb)
orb_encoded = gender_indexer.fit(orb_encoded).transform(orb_encoded)


# Dropping irrelevant features

In [18]:
# Dropping the irrelevant columns: DAILY_STRESS, GENDER, AGE, Timestamp
orb_encoded = orb_encoded.drop("DAILY_STRESS", "GENDER", "AGE", "Timestamp")


# Verifying the changes
orb_encoded.show()

+--------------+--------------+-----------+-----------------+--------------+-----------+--------+---------+--------------+----+-----------+-----------+-----------+-------------+--------------+-----------------+---------------+----------------+-----------------+-----------------------+-----------+--------------------+--------------+
|FRUITS_VEGGIES|PLACES_VISITED|CORE_CIRCLE|SUPPORTING_OTHERS|SOCIAL_NETWORK|ACHIEVEMENT|DONATION|BMI_RANGE|TODO_COMPLETED|FLOW|DAILY_STEPS|LIVE_VISION|SLEEP_HOURS|LOST_VACATION|DAILY_SHOUTING|SUFFICIENT_INCOME|PERSONAL_AWARDS|TIME_FOR_PASSION|WEEKLY_MEDITATION|WORK_LIFE_BALANCE_SCORE|AGE_encoded|DAILY_STRESS_encoded|GENDER_encoded|
+--------------+--------------+-----------+-----------------+--------------+-----------+--------+---------+--------------+----+-----------+-----------+-----------+-------------+--------------+-----------------+---------------+----------------+-----------------+-----------------------+-----------+--------------------+--------------

# Feature Correlation Coefficients with the target variable

In [19]:
# Dropping the target variable - WORK_LIFE_BALANCE_SCORE from the features
# Calculating the correlation of each feature with the target variable using the stat.corr
features = orb_encoded.drop("WORK_LIFE_BALANCE_SCORE").columns
correlations = {feature: orb_encoded.stat.corr(feature, "WORK_LIFE_BALANCE_SCORE") for feature in features}

# Features with correlation greater than 0.05 with the target variable is considered relevant and is stored in the selected_features list
selected_features = [feature for feature, corr in correlations.items() if abs(corr) > 0.05]

for feature in selected_features:
    print(f"{feature}: {correlations[feature]}")


FRUITS_VEGGIES: 0.45225543341770175
PLACES_VISITED: 0.5296154261850013
CORE_CIRCLE: 0.5075409316031967
SUPPORTING_OTHERS: 0.5488506172519293
SOCIAL_NETWORK: 0.41258642200121615
ACHIEVEMENT: 0.5612442258186958
DONATION: 0.4588286737240075
BMI_RANGE: -0.251987625295981
TODO_COMPLETED: 0.5455026588978805
FLOW: 0.4781992550765471
DAILY_STEPS: 0.4229812298773749
LIVE_VISION: 0.47131104364063336
SLEEP_HOURS: 0.19639496277423
LOST_VACATION: -0.2662432369602547
DAILY_SHOUTING: -0.27315294943990065
SUFFICIENT_INCOME: 0.40356119216507597
PERSONAL_AWARDS: 0.5042236290010405
TIME_FOR_PASSION: 0.5169669717583426
WEEKLY_MEDITATION: 0.41617097563152766
AGE_encoded: 0.11994167016979483


In [20]:
orb_encoded.show()

+--------------+--------------+-----------+-----------------+--------------+-----------+--------+---------+--------------+----+-----------+-----------+-----------+-------------+--------------+-----------------+---------------+----------------+-----------------+-----------------------+-----------+--------------------+--------------+
|FRUITS_VEGGIES|PLACES_VISITED|CORE_CIRCLE|SUPPORTING_OTHERS|SOCIAL_NETWORK|ACHIEVEMENT|DONATION|BMI_RANGE|TODO_COMPLETED|FLOW|DAILY_STEPS|LIVE_VISION|SLEEP_HOURS|LOST_VACATION|DAILY_SHOUTING|SUFFICIENT_INCOME|PERSONAL_AWARDS|TIME_FOR_PASSION|WEEKLY_MEDITATION|WORK_LIFE_BALANCE_SCORE|AGE_encoded|DAILY_STRESS_encoded|GENDER_encoded|
+--------------+--------------+-----------+-----------------+--------------+-----------+--------+---------+--------------+----+-----------+-----------+-----------+-------------+--------------+-----------------+---------------+----------------+-----------------+-----------------------+-----------+--------------------+--------------

# Splitting the data for Training and Testing purposes

In [21]:
# Splitting the data into training and test sets (70% training, 30% test)
train_data, test_data = orb_encoded.randomSplit([0.7, 0.3], seed=1234)


# Linear Regression Model

In [22]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Using VectorAssembler to create a feature vector column
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

# Linear Regression Model
lr = LinearRegression(featuresCol="features", labelCol="WORK_LIFE_BALANCE_SCORE")

# Training the model
lr_model = lr.fit(train_data)

# Evaluating on the test set
test_results = lr_model.evaluate(test_data)



# Model Evaluation Metrics

In [23]:
# Summary statistics
print("Number of Instances:", train_data.count())
print("Root Mean Squared Error (RMSE):", test_results.rootMeanSquaredError)
print("R2:", test_results.r2)
print("Mean Absolute Error:", test_results.meanAbsoluteError)
print("Explained Variance:", test_results.explainedVariance)

# Coefficients and Intercept
print("\nIntercept:", lr_model.intercept)
print("Coefficients:")
for coef, feature in zip(lr_model.coefficients, selected_features):
    print(feature, ":", coef)


# Residuals Analysis
residuals = test_results.residuals
residuals.show()



Number of Instances: 11189
Root Mean Squared Error (RMSE): 4.22861904828392
R2: 0.9910959824034354
Mean Absolute Error: 3.4526600412562765
Explained Variance: 2000.9061602983868

Intercept: 531.0772282260863
Coefficients:
FRUITS_VEGGIES : 3.388952281300163
PLACES_VISITED : 1.7273192824152128
CORE_CIRCLE : 1.8045110780532192
SUPPORTING_OTHERS : 1.6124784294380312
SOCIAL_NETWORK : 1.6052872507483962
ACHIEVEMENT : 1.7666624184752893
DONATION : 3.361357008118315
BMI_RANGE : -17.376283582497326
TODO_COMPLETED : 1.7535070334888643
FLOW : 1.7922319502690915
DAILY_STEPS : 1.8943385233467975
LIVE_VISION : 1.764564399343618
SLEEP_HOURS : 2.2252460415096866
LOST_VACATION : -1.8614612057076418
DAILY_SHOUTING : -2.1231262378639295
SUFFICIENT_INCOME : 18.01316378203299
PERSONAL_AWARDS : 1.6440096349217956
TIME_FOR_PASSION : 1.8101734784865224
WEEKLY_MEDITATION : 1.9010496620009825
AGE_encoded : -0.036176157389726606
+-------------------+
|          residuals|
+-------------------+
|  12.274835481690

# Cross Validation

In [24]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# Creating a ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 1.0]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) # Elastic Net Parameter (Ridge = 0)
             .build())

# Creating 5-fold CrossValidator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="WORK_LIFE_BALANCE_SCORE"),
                          numFolds=5)

# Running cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train_data)

# Predictions using the model selected from cross-validation
prediction = cvModel.transform(test_data)

# Computing metrics on test data
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="WORK_LIFE_BALANCE_SCORE", metricName="rmse")
rmse_test = evaluator.evaluate(prediction)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse_test}")

evaluator_r2 = RegressionEvaluator(predictionCol="prediction", labelCol="WORK_LIFE_BALANCE_SCORE", metricName="r2")
r2_test = evaluator_r2.evaluate(prediction)
print(f"R2 on test data: {r2_test}")


Root Mean Squared Error (RMSE) on test data: 4.228502610751527
R2 on test data: 0.9910964727514907


In [25]:
train_data

DataFrame[FRUITS_VEGGIES: int, PLACES_VISITED: int, CORE_CIRCLE: int, SUPPORTING_OTHERS: int, SOCIAL_NETWORK: int, ACHIEVEMENT: int, DONATION: int, BMI_RANGE: int, TODO_COMPLETED: int, FLOW: int, DAILY_STEPS: int, LIVE_VISION: int, SLEEP_HOURS: int, LOST_VACATION: int, DAILY_SHOUTING: int, SUFFICIENT_INCOME: int, PERSONAL_AWARDS: int, TIME_FOR_PASSION: int, WEEKLY_MEDITATION: int, WORK_LIFE_BALANCE_SCORE: double, AGE_encoded: int, DAILY_STRESS_encoded: double, GENDER_encoded: double, features: vector]

# Random Forest Model and Evaluation

In [26]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Initializing the Random Forest Regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="WORK_LIFE_BALANCE_SCORE")

# Training the model on training data
rf_model = rf.fit(train_data)

# Predictions on test data
predictions_rf = rf_model.transform(test_data)


# 1. Number of Instances
num_instances = predictions_rf.count()
print("Number of Instances:", num_instances)

# 2. RMSE
evaluator_rmse = RegressionEvaluator(labelCol="WORK_LIFE_BALANCE_SCORE", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions_rf)
print("Root Mean Squared Error (RMSE):", rmse)

# 3. R2
evaluator_r2 = RegressionEvaluator(labelCol="WORK_LIFE_BALANCE_SCORE", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions_rf)
print("R2:", r2)

# 4. MAE
evaluator_mae = RegressionEvaluator(labelCol="WORK_LIFE_BALANCE_SCORE", predictionCol="prediction", metricName="mae")
mae = evaluator_mae.evaluate(predictions_rf)
print("Mean Absolute Error (MAE):", mae)

Number of Instances: 4783
Root Mean Squared Error (RMSE): 22.478697759053222
R2: 0.74838809725237
Mean Absolute Error (MAE): 18.039114871912577


In [27]:
train_data

DataFrame[FRUITS_VEGGIES: int, PLACES_VISITED: int, CORE_CIRCLE: int, SUPPORTING_OTHERS: int, SOCIAL_NETWORK: int, ACHIEVEMENT: int, DONATION: int, BMI_RANGE: int, TODO_COMPLETED: int, FLOW: int, DAILY_STEPS: int, LIVE_VISION: int, SLEEP_HOURS: int, LOST_VACATION: int, DAILY_SHOUTING: int, SUFFICIENT_INCOME: int, PERSONAL_AWARDS: int, TIME_FOR_PASSION: int, WEEKLY_MEDITATION: int, WORK_LIFE_BALANCE_SCORE: double, AGE_encoded: int, DAILY_STRESS_encoded: double, GENDER_encoded: double, features: vector]

# Gradient Boosted Tree Model

In [28]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator


In [29]:
gbt = GBTRegressor(featuresCol="features", labelCol="WORK_LIFE_BALANCE_SCORE", maxIter=10)
gbt_model = gbt.fit(train_data)


In [30]:
gbt_predictions = gbt_model.transform(test_data)


In [31]:
feature_importance = gbt_model.featureImportances
for i in range(len(selected_features)):
    print(selected_features[i], ":", feature_importance[i])


FRUITS_VEGGIES : 0.026134649683816857
PLACES_VISITED : 0.0612996789828739
CORE_CIRCLE : 0.07222044342548321
SUPPORTING_OTHERS : 0.12233311138662055
SOCIAL_NETWORK : 0.04070903216839264
ACHIEVEMENT : 0.026793061163484743
DONATION : 0.07693557463894003
BMI_RANGE : 0.13368348788216922
TODO_COMPLETED : 0.053394984462204136
FLOW : 0.014041869523987432
DAILY_STEPS : 0.06034053552173583
LIVE_VISION : 0.023650211505585227
SLEEP_HOURS : 0.0
LOST_VACATION : 0.08630511844928927
DAILY_SHOUTING : 0.05067301485105004
SUFFICIENT_INCOME : 0.01961892840552255
PERSONAL_AWARDS : 0.02993623735530808
TIME_FOR_PASSION : 0.07225620718318697
WEEKLY_MEDITATION : 0.029673853410349074
AGE_encoded : 0.0


# GBT Model Evaluation