In [139]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [140]:
!pip install pyspark



In [141]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [142]:
spark

In [143]:
pip install -U scikit-learn



In [144]:
pip install findspark



In [145]:
import findspark

findspark.init()

In [176]:
# Use Spark to read in the dataset file.
data = spark.read.csv("/content/drive/MyDrive/BigDataAnalyticsAndVisualization/ObesityDataSet_raw_and_data_sinthetic.csv",inferSchema=True,header=True)

data.show()

+------+----+------+------+------------------------------+----+----+---+----------+-----+----+---+---+---+----------+--------------------+-------------------+
|Gender| Age|Height|Weight|family_history_with_overweight|FAVC|FCVC|NCP|      CAEC|SMOKE|CH2O|SCC|FAF|TUE|      CALC|              MTRANS|         NObeyesdad|
+------+----+------+------+------------------------------+----+----+---+----------+-----+----+---+---+---+----------+--------------------+-------------------+
|Female|21.0|  1.62|  64.0|                           yes|  no| 2.0|3.0| Sometimes|   no| 2.0| no|0.0|1.0|        no|Public_Transporta...|      Normal_Weight|
|Female|21.0|  1.52|  56.0|                           yes|  no| 3.0|3.0| Sometimes|  yes| 3.0|yes|3.0|0.0| Sometimes|Public_Transporta...|      Normal_Weight|
|  Male|23.0|   1.8|  77.0|                           yes|  no| 2.0|3.0| Sometimes|   no| 2.0| no|2.0|1.0|Frequently|Public_Transporta...|      Normal_Weight|
|  Male|27.0|   1.8|  87.0|                   

In [148]:
# Print the Schema of the DataFrame
data.printSchema()

root
 |-- Gender: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- family_history_with_overweight: string (nullable = true)
 |-- FAVC: string (nullable = true)
 |-- FCVC: double (nullable = true)
 |-- NCP: double (nullable = true)
 |-- CAEC: string (nullable = true)
 |-- SMOKE: string (nullable = true)
 |-- CH2O: double (nullable = true)
 |-- SCC: string (nullable = true)
 |-- FAF: double (nullable = true)
 |-- TUE: double (nullable = true)
 |-- CALC: string (nullable = true)
 |-- MTRANS: string (nullable = true)
 |-- NObeyesdad: string (nullable = true)



In [149]:
data.columns

['Gender',
 'Age',
 'Height',
 'Weight',
 'family_history_with_overweight',
 'FAVC',
 'FCVC',
 'NCP',
 'CAEC',
 'SMOKE',
 'CH2O',
 'SCC',
 'FAF',
 'TUE',
 'CALC',
 'MTRANS',
 'NObeyesdad']

In [179]:
data_no_duplicates = data.dropDuplicates()

print('Total observations in Dataset: ', data.count())
print('Remaining obsercations after removing duplicates form Dataset: ', data_no_duplicates.count())

Total observations in Dataset:  2111
Remaining obsercations after removing duplicates form Dataset:  2087


In [177]:
# Identify duplicate rows
duplicate_rows = data.groupBy(data.columns).count().where(col("count") > 1)

# Identify missing values
missing_values = data.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])

print("\nSummary statistics of the dataset:")
data.describe().show()

print("\nDuplicate rows:")
duplicate_rows.show(truncate=False)

print("\nMissing values:")
missing_values.show()


Summary statistics of the dataset:
+-------+------+-----------------+-------------------+------------------+------------------------------+----+------------------+------------------+------+-----+------------------+----+------------------+------------------+------+----------+-------------------+
|summary|Gender|              Age|             Height|            Weight|family_history_with_overweight|FAVC|              FCVC|               NCP|  CAEC|SMOKE|              CH2O| SCC|               FAF|               TUE|  CALC|    MTRANS|         NObeyesdad|
+-------+------+-----------------+-------------------+------------------+------------------------------+----+------------------+------------------+------+-----+------------------+----+------------------+------------------+------+----------+-------------------+
|  count|  2111|             2111|               2111|              2111|                          2111|2111|              2111|              2111|  2111| 2111|              2111|21

In [151]:
from pyspark.ml.feature import StringIndexer

# List of categorical columns
categorical_cols = ['Gender', 'family_history_with_overweight', 'FAVC', 'CAEC', 'SMOKE', 'SCC', 'CALC', 'MTRANS', 'NObeyesdad']

# Initialize StringIndexers for each categorical column
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in categorical_cols]

# Fit and transform the DataFrame with StringIndexers
indexed_data = data
for indexer in indexers:
    indexed_data = indexer.fit(indexed_data).transform(indexed_data)

indexed_data.show(5)

+------+----+------+------+------------------------------+----+----+---+---------+-----+----+---+---+---+----------+--------------------+-------------------+------------+------------------------------------+----------+----------+-----------+---------+----------+------------+----------------+
|Gender| Age|Height|Weight|family_history_with_overweight|FAVC|FCVC|NCP|     CAEC|SMOKE|CH2O|SCC|FAF|TUE|      CALC|              MTRANS|         NObeyesdad|Gender_index|family_history_with_overweight_index|FAVC_index|CAEC_index|SMOKE_index|SCC_index|CALC_index|MTRANS_index|NObeyesdad_index|
+------+----+------+------+------------------------------+----+----+---+---------+-----+----+---+---+---+----------+--------------------+-------------------+------------+------------------------------------+----------+----------+-----------+---------+----------+------------+----------------+
|Female|21.0|  1.62|  64.0|                           yes|  no| 2.0|3.0|Sometimes|   no| 2.0| no|0.0|1.0|        no|Publi

In [152]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# List of input feature columns
feature_columns = ['Age', 'Height', 'Weight', 'FCVC', 'NCP', 'CH2O', 'FAF', 'TUE'] + [col+"_index" for col in categorical_cols]

# Initialize VectorAssembler
assembler = VectorAssembler(
                  inputCols=feature_columns,
                  outputCol="features")

# Transform the DataFrame with VectorAssembler
final_data = assembler.transform(indexed_data)

# Show the transformed DataFrame
final_data.show(10)


+------+----+------+------+------------------------------+----+----+---+---------+-----+----+---+---+---+----------+--------------------+-------------------+------------+------------------------------------+----------+----------+-----------+---------+----------+------------+----------------+--------------------+
|Gender| Age|Height|Weight|family_history_with_overweight|FAVC|FCVC|NCP|     CAEC|SMOKE|CH2O|SCC|FAF|TUE|      CALC|              MTRANS|         NObeyesdad|Gender_index|family_history_with_overweight_index|FAVC_index|CAEC_index|SMOKE_index|SCC_index|CALC_index|MTRANS_index|NObeyesdad_index|            features|
+------+----+------+------+------------------------------+----+----+---+---------+-----+----+---+---+---+----------+--------------------+-------------------+------------+------------------------------------+----------+----------+-----------+---------+----------+------------+----------------+--------------------+
|Female|21.0|  1.62|  64.0|                           yes|

In [153]:
target_variable = final_data.select('NObeyesdad', 'NObeyesdad_index').distinct()
target_variable.show()

+-------------------+----------------+
|         NObeyesdad|NObeyesdad_index|
+-------------------+----------------+
|Insufficient_Weight|             6.0|
|      Normal_Weight|             5.0|
| Overweight_Level_I|             3.0|
|    Obesity_Type_II|             2.0|
|Overweight_Level_II|             4.0|
|     Obesity_Type_I|             0.0|
|   Obesity_Type_III|             1.0|
+-------------------+----------------+



In [154]:
final_data.select("features").show(10)

+--------------------+
|            features|
+--------------------+
|[21.0,1.62,64.0,2...|
|[21.0,1.52,56.0,3...|
|[23.0,1.8,77.0,2....|
|[27.0,1.8,87.0,3....|
|(17,[0,1,2,3,4,5,...|
|(17,[0,1,2,3,4,5,...|
|(17,[0,1,2,3,4,5,...|
|(17,[0,1,2,3,4,5,...|
|(17,[0,1,2,3,4,5,...|
|(17,[0,1,2,3,4,5,...|
+--------------------+
only showing top 10 rows



In [155]:
final_data_new = final_data.select("features",'NObeyesdad_index')

In [156]:
final_data_new.printSchema()

root
 |-- features: vector (nullable = true)
 |-- NObeyesdad_index: double (nullable = false)



In [157]:
final_data_new.show()

+--------------------+----------------+
|            features|NObeyesdad_index|
+--------------------+----------------+
|[21.0,1.62,64.0,2...|             5.0|
|[21.0,1.52,56.0,3...|             5.0|
|[23.0,1.8,77.0,2....|             5.0|
|[27.0,1.8,87.0,3....|             3.0|
|(17,[0,1,2,3,4,5,...|             4.0|
|(17,[0,1,2,3,4,5,...|             5.0|
|(17,[0,1,2,3,4,5,...|             5.0|
|(17,[0,1,2,3,4,5,...|             5.0|
|(17,[0,1,2,3,4,5,...|             5.0|
|(17,[0,1,2,3,4,5,...|             5.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|[21.0,1.72,80.0,2...|             4.0|
|(17,[0,1,2,3,4,5,...|             5.0|
|[41.0,1.8,99.0,2....|             0.0|
|(17,[0,1,2,3,4,5,...|             5.0|
|[22.0,1.7,66.0,3....|             5.0|
|(17,[0,1,2,3,4,5,...|             4.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|[30.0,1.71,82.0,3...|             4.0|
|(17,[0,1,2,3,4,5,...|             3.0|
+--------------------+----------------+
only showing top 20 rows



In [158]:
# Pass in the split between training/test as a list.
train_data,test_data = final_data_new.randomSplit([0.7,0.3], seed=25)

In [159]:
print(train_data.show(10))

+--------------------+----------------+
|            features|NObeyesdad_index|
+--------------------+----------------+
|(17,[0,1,2,3,4,5]...|             0.0|
|(17,[0,1,2,3,4,5]...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
+--------------------+----------------+
only showing top 10 rows

None


In [160]:
test_data.show(10)

+--------------------+----------------+
|            features|NObeyesdad_index|
+--------------------+----------------+
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
|(17,[0,1,2,3,4,5,...|             0.0|
+--------------------+----------------+
only showing top 10 rows



In [161]:
# @title LINEAR REGRESSION (LR)

In [162]:
spark = SparkSession.builder.appName('lr_model').getOrCreate()

from pyspark.ml.regression import LinearRegression

In [163]:
# Linear Regression Model object
lr = LinearRegression(labelCol='NObeyesdad_index', featuresCol="features")

# Fitting the model
lrModel = lr.fit(train_data)

# Coefficients and intercept for linear regression
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [-8.464590966251455e-16,-2.3898715131031593e-13,4.1556299129365935e-16,-4.180908323253981e-15,7.716401596859655e-16,8.16620538845909e-17,3.9323840696852124e-15,-1.907102997197993e-15,-2.1369059621835945e-14,-6.07494093339919e-15,-9.39532443355072e-15,-6.580915248440873e-16,6.215984699599609e-15,-2.5837449528363142e-15,-1.4954046495902473e-15,3.749107840190424e-15,1.0000000000000029] Intercept: 4.0132088757335286e-13


In [164]:
test_results = lrModel.evaluate(test_data)

In [165]:
# Make predictions on unlabled data
unlabeled_data = test_data.select('features')

lr_predictions = lrModel.transform(unlabeled_data)

In [166]:
lr_predictions.show()
print("Linear Regression RMSE: {}".format(test_results.rootMeanSquaredError))
print("Linear Regression MSE: {}".format(test_results.meanSquaredError))

+--------------------+--------------------+
|            features|          prediction|
+--------------------+--------------------+
|(17,[0,1,2,3,4,5,...|-4.18338590440547...|
|(17,[0,1,2,3,4,5,...|-1.62746783041124...|
|(17,[0,1,2,3,4,5,...|-2.26904561918509...|
|(17,[0,1,2,3,4,5,...|-3.35414553227793...|
|(17,[0,1,2,3,4,5,...|-8.92552384323848...|
|(17,[0,1,2,3,4,5,...|-9.28032294177675...|
|(17,[0,1,2,3,4,5,...|-2.33961402542932...|
|(17,[0,1,2,3,4,5,...|-2.02892534635006...|
|(17,[0,1,2,3,4,5,...|-4.22520957441559...|
|(17,[0,1,2,3,4,5,...|-7.6267040484531E-15|
|(17,[0,1,2,3,4,5,...|-1.8226363683256E-14|
|(17,[0,1,2,3,4,5,...|3.234557726477541...|
|(17,[0,1,2,3,4,5,...|-2.80080805054379...|
|(17,[0,1,2,3,4,5,...|-3.62913140276437...|
|(17,[0,1,2,3,4,5,...|9.897886670191211...|
|(17,[0,1,2,3,4,5,...|5.100086896271596...|
|(17,[0,1,2,3,4,5,...|-1.44903291643486...|
|(17,[0,1,2,3,4,5,...|1.018328240284919...|
|(17,[0,1,2,3,4,5,...|-2.75625347361304...|
|(17,[0,1,2,3,4,5,...|-2.0230408

In [167]:
# @title RANDOM FOREST (RF)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a Random Forest Classifier object
rf_classifier = RandomForestClassifier(labelCol="NObeyesdad_index", featuresCol="features")

# Fit the model
rf_model_classifier = rf_classifier.fit(train_data)

# Make predictions
rf_predictions_classifier = rf_model_classifier.transform(test_data)

# Make predictions on unlabled data
rf_unlabeled_data = test_data.select('features')
rf_predictions = rf_model_classifier.transform(unlabeled_data)

In [168]:
rf_predictions.show()

# Evaluate accuracy
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="NObeyesdad_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_accuracy.evaluate(rf_predictions_classifier)
print("Random Forest Accuracy:", accuracy)

# Evaluate precision
evaluator_precision = MulticlassClassificationEvaluator(labelCol="NObeyesdad_index", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(rf_predictions_classifier)
print("Random Forest Precision:", precision)

# Evaluate recall
evaluator_recall = MulticlassClassificationEvaluator(labelCol="NObeyesdad_index", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(rf_predictions_classifier)
print("Random Forest Recall:", recall)

# Evaluate F1-score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="NObeyesdad_index", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(rf_predictions_classifier)
print("Random Forest F1-score:", f1_score)

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(17,[0,1,2,3,4,5,...|[17.6491733829502...|[0.88245866914751...|       0.0|
|(17,[0,1,2,3,4,5,...|[17.0998389179632...|[0.85499194589816...|       0.0|
|(17,[0,1,2,3,4,5,...|[17.0998389179632...|[0.85499194589816...|       0.0|
|(17,[0,1,2,3,4,5,...|[17.1387023226523...|[0.85693511613261...|       0.0|
|(17,[0,1,2,3,4,5,...|[17.6491733829502...|[0.88245866914751...|       0.0|
|(17,[0,1,2,3,4,5,...|[17.6491733829502...|[0.88245866914751...|       0.0|
|(17,[0,1,2,3,4,5,...|[17.4099942249828...|[0.87049971124914...|       0.0|
|(17,[0,1,2,3,4,5,...|[16.8602110120199...|[0.84301055060099...|       0.0|
|(17,[0,1,2,3,4,5,...|[12.9773972450199...|[0.64886986225099...|       0.0|
|(17,[0,1,2,3,4,5,...|[15.5857755705249...|[0.77928877852624...|       0.0|
|(17,[0,1,2,

In [169]:
# @title Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lrr_classifier = LogisticRegression(labelCol="NObeyesdad_index", featuresCol="features")

# Fit the model
lrr_model_classifier = lrr_classifier.fit(train_data)

# Make predictions
lrr_predictions_classifier = lrr_model_classifier.transform(test_data)

# Make predictions on unlabeled data
unlabeled_data = test_data.select('features')
lrr_predictions = lrr_model_classifier.transform(unlabeled_data)


In [170]:
lrr_predictions.show()

# Evaluate the model
evaluator_accuracy_LogisticRegression = MulticlassClassificationEvaluator(labelCol="NObeyesdad_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_accuracy_LogisticRegression.evaluate(lrr_predictions_classifier)
print("LogisticRegression Accuracy:", accuracy)

# Evaluate precision
evaluator_precision_LogisticRegression = MulticlassClassificationEvaluator(labelCol="NObeyesdad_index", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision_LogisticRegression.evaluate(lrr_predictions_classifier)
print("LogisticRegression Precision:", precision)

# Evaluate recall
evaluator_recall_LogisticRegression = MulticlassClassificationEvaluator(labelCol="NObeyesdad_index", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall_LogisticRegression.evaluate(lrr_predictions_classifier)
print("LogisticRegression Recall:", recall)

# Evaluate F1-score
evaluator_f1_LogisticRegression = MulticlassClassificationEvaluator(labelCol="NObeyesdad_index", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1_LogisticRegression.evaluate(lrr_predictions_classifier)
print("LogisticRegression F1-score:", f1_score)

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(17,[0,1,2,3,4,5,...|[324.130833308336...|[1.0,1.3444956339...|       0.0|
|(17,[0,1,2,3,4,5,...|[326.494862031673...|[1.0,1.1422528236...|       0.0|
|(17,[0,1,2,3,4,5,...|[312.880785725742...|[1.0,8.3279602540...|       0.0|
|(17,[0,1,2,3,4,5,...|[309.419977987617...|[1.0,5.5457074056...|       0.0|
|(17,[0,1,2,3,4,5,...|[299.180877974745...|[1.0,4.2949866815...|       0.0|
|(17,[0,1,2,3,4,5,...|[306.384621293440...|[1.0,8.2173583691...|       0.0|
|(17,[0,1,2,3,4,5,...|[310.333576663647...|[1.0,7.9729193984...|       0.0|
|(17,[0,1,2,3,4,5,...|[303.030783727810...|[1.0,1.8122335138...|       0.0|
|(17,[0,1,2,3,4,5,...|[326.922709980687...|[1.0,3.7527307842...|       0.0|
|(17,[0,1,2,3,4,5,...|[346.835513087790...|[1.0,9.9282878333...|       0.0|
|(17,[0,1,2,

In [171]:

import pandas as pd
# Export predictions to CSV files
#lr_predictions.toPandas().to_csv("lr_predictions.csv", index=False)
#rf_predictions.toPandas().to_csv("rf_predictions.csv", index=False)
#lrr_predictions.toPandas().to_csv("lrr_predictions.csv", index=False)


In [172]:
data_with_predictions = data.join(lr_predictions.select(lr_predictions["prediction"].alias("prediction_lr")), how='inner').join(rf_predictions.select(rf_predictions["prediction"].alias("prediction_rf")), how='inner').join(lrr_predictions.select(lrr_predictions["prediction"].alias("prediction_lrr")), how='inner')
#final_data_with_rf_predictions = final_data.join(rf_predictions.select('prediction'), how='inner')
#final_data_with_lrr_predictions = final_data.join(lrr_predictions.select('prediction'), how='inner')

In [173]:
data_with_predictions.show()

+------+----+------+------+------------------------------+----+----+---+---------+-----+----+---+---+---+----+--------------------+-------------+--------------------+-------------+--------------+
|Gender| Age|Height|Weight|family_history_with_overweight|FAVC|FCVC|NCP|     CAEC|SMOKE|CH2O|SCC|FAF|TUE|CALC|              MTRANS|   NObeyesdad|       prediction_lr|prediction_rf|prediction_lrr|
+------+----+------+------+------------------------------+----+----+---+---------+-----+----+---+---+---+----+--------------------+-------------+--------------------+-------------+--------------+
|Female|21.0|  1.62|  64.0|                           yes|  no| 2.0|3.0|Sometimes|   no| 2.0| no|0.0|1.0|  no|Public_Transporta...|Normal_Weight|-4.18338590440547...|          0.0|           0.0|
|Female|21.0|  1.62|  64.0|                           yes|  no| 2.0|3.0|Sometimes|   no| 2.0| no|0.0|1.0|  no|Public_Transporta...|Normal_Weight|-4.18338590440547...|          0.0|           0.0|
|Female|21.0|  1.62|

In [174]:
#data_prediction = data_with_predictions.toPandas()
#data_prediction.to_csv('final_data.csv', header = True)

In [175]:
#data_prediction.to_csv('final_data_1.csv', header = True)