In [15]:
sc

In [16]:
# Load the libraries
import os
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, StringType
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer

In [17]:
# Due the different version of python can be found and each machine or libriries 
# Avoid warnigs
import warnings
warnings.filterwarnings("ignore")

# 2. Data Understanding Phase

Practical Big Data (PySparkSQL) - Part 1

In [41]:
#Import PySparkSQL
import pyspark
from pyspark.sql import SparkSession        

# Create SparkSession
spark = (SparkSession
  .builder
  .appName("SparkSQL")
  .getOrCreate())

# Path to dataset
csv_file = "/user/user1/Maternal_Health_Risk.csv"

In [42]:
# Read and create a temporary view
df = (spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load(csv_file))
df.createOrReplaceTempView("rawdata")

                                                                                

In [43]:
# Visualise inferred schema
data = spark.sql("SELECT * FROM rawdata")
data.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- SystolicBP: integer (nullable = true)
 |-- DiastolicBP: integer (nullable = true)
 |-- BS: double (nullable = true)
 |-- BodyTemp: double (nullable = true)
 |-- HeartRate: integer (nullable = true)
 |-- RiskLevel: string (nullable = true)



In [44]:
# Display 5 first rows
data.show(5)

+---+----------+-----------+----+--------+---------+---------+
|Age|SystolicBP|DiastolicBP|  BS|BodyTemp|HeartRate|RiskLevel|
+---+----------+-----------+----+--------+---------+---------+
| 25|       130|         80|15.0|    98.0|       86|high risk|
| 35|       140|         90|13.0|    98.0|       70|high risk|
| 29|        90|         70| 8.0|   100.0|       80|high risk|
| 30|       140|         85| 7.0|    98.0|       70|high risk|
| 35|       120|         60| 6.1|    98.0|       76| low risk|
+---+----------+-----------+----+--------+---------+---------+
only showing top 5 rows



In [72]:
# Remover a coluna 'person_id'
data = data.drop('person_id')

In [73]:
# Register the DataFrame as a temporary view
data.createOrReplaceTempView("RiskLevel")

# Get the count of rows using SQL
spark.sql("SELECT COUNT(1) FROM RiskLevel").show()

[Stage 86:>                                                         (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|    1014|
+--------+



                                                                                

In [74]:
# Counting the occurrences of each value in the "RiskLevel" column
result = spark.sql("SELECT RiskLevel, COUNT(*) AS count FROM RiskLevel GROUP BY RiskLevel")

# Displaying the result
result.show()

                                                                                

+---------+-----+
|RiskLevel|count|
+---------+-----+
| low risk|  406|
| mid risk|  336|
|high risk|  272|
+---------+-----+



                                                                                

In [75]:
from pyspark.sql.functions import max, min

# Finding the maximum and minimum age
max_age = data.select(max("Age")).collect()[0][0]
min_age = data.select(min("Age")).collect()[0][0]

# Displaying the results
print("Maximum Age:", max_age)
print("Minimum Age:", min_age)

Maximum Age: 70
Minimum Age: 10


In [76]:
from pyspark.sql.functions import max, min

# Grouping by RiskLevel and calculating maximum and minimum values for SystolicBP and DiastolicBP
risk_stats = data.groupBy("RiskLevel").agg(
    max("SystolicBP").alias("MaxSystolicBP"),
    min("SystolicBP").alias("MinSystolicBP"),
    max("DiastolicBP").alias("MaxDiastolicBP"),
    min("DiastolicBP").alias("MinDiastolicBP")
)

# Displaying the results
risk_stats.show()

                                                                                

+---------+-------------+-------------+--------------+--------------+
|RiskLevel|MaxSystolicBP|MinSystolicBP|MaxDiastolicBP|MinDiastolicBP|
+---------+-------------+-------------+--------------+--------------+
| low risk|          129|           70|           100|            49|
| mid risk|          140|           70|           100|            50|
|high risk|          160|           83|           100|            60|
+---------+-------------+-------------+--------------+--------------+



In [77]:
from pyspark.sql.functions import col

# Filter records with high risk
high_risk_data = data.filter(col("RiskLevel") == "high risk")

# Show ages, all types of pressure, and risk level with highest risk
high_risk_data.select("Age", "SystolicBP", "DiastolicBP", "RiskLevel").show()

+---+----------+-----------+---------+
|Age|SystolicBP|DiastolicBP|RiskLevel|
+---+----------+-----------+---------+
| 25|       130|         80|high risk|
| 35|       140|         90|high risk|
| 29|        90|         70|high risk|
| 30|       140|         85|high risk|
| 23|       140|         80|high risk|
| 35|        85|         60|high risk|
| 42|       130|         80|high risk|
| 50|       140|         90|high risk|
| 25|       140|        100|high risk|
| 40|       140|        100|high risk|
| 48|       140|         90|high risk|
| 25|       140|        100|high risk|
| 23|       140|         90|high risk|
| 34|        85|         60|high risk|
| 50|       140|         90|high risk|
| 25|       140|        100|high risk|
| 42|       140|        100|high risk|
| 32|       140|        100|high risk|
| 50|       140|         95|high risk|
| 38|       135|         60|high risk|
+---+----------+-----------+---------+
only showing top 20 rows



# 3. Data Preparation Phase

missing value

In [78]:
from pyspark.sql.functions import col

# Count missing values in each column
missing_values = data.select([col(c).isNull().cast("int").alias(c) for c in data.columns]) \
                     .agg(*[F.sum(c).alias(c) for c in data.columns])

# Show the number of missing values in each column
missing_values.show()

+---+----------+-----------+---+--------+---------+---------+
|Age|SystolicBP|DiastolicBP| BS|BodyTemp|HeartRate|RiskLevel|
+---+----------+-----------+---+--------+---------+---------+
|  0|         0|          0|  0|       0|        0|        0|
+---+----------+-----------+---+--------+---------+---------+



In [79]:
# Convert PySpark DataFrame to Pandas DataFrame
pandas_df = data.toPandas()

# Display the first few rows of the Pandas DataFrame
print(pandas_df.head())

   Age  SystolicBP  DiastolicBP    BS  BodyTemp  HeartRate  RiskLevel
0   25         130           80  15.0      98.0         86  high risk
1   35         140           90  13.0      98.0         70  high risk
2   29          90           70   8.0     100.0         80  high risk
3   30         140           85   7.0      98.0         70  high risk
4   35         120           60   6.1      98.0         76   low risk


In [80]:
# 4. Modeling Phase