In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, dayofweek, dayofmonth, month, year
import requests
from pyspark.sql.functions import col, hour, dayofweek, dayofmonth, month, year, unix_timestamp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import Window
from pyspark.sql import functions as F

import pickle
import os

In [22]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, dayofweek, dayofmonth, month, year

# Download the file
url = "https://langdon.fedorapeople.org/1m_health_events_dataset.csv"
local_file_path = "1m_health_events_dataset.csv"

response = requests.get(url)
with open(local_file_path, 'wb') as file:
    file.write(response.content)

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("HealthRiskPrediction") \
    .config("spark.executor.cores", "4") \
    .config("spark.task.cpus", "4") \
    .config("spark.rapids.sql.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()


# Load the CSV directly into a PySpark DataFrame from the local file path
df = spark.read.csv(local_file_path, header=True, inferSchema=True)

# Confirm DataFrame loading by showing the schema (optional)
df.printSchema()


root
 |-- EventType: string (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- Details: string (nullable = true)
 |-- Is_Anomaly: integer (nullable = true)



In [23]:
# Convert Timestamp to a datetime type and extract useful features
df = df.withColumn("Hour", hour(col("Timestamp"))) \
       .withColumn("DayOfWeek", dayofweek(col("Timestamp"))) \
       .withColumn("DayOfMonth", dayofmonth(col("Timestamp"))) \
       .withColumn("Month", month(col("Timestamp"))) \
       .withColumn("Year", year(col("Timestamp"))) \
       .withColumn("TimestampUnix", unix_timestamp(col("Timestamp")))

df.cache()

# Encode categorical columns using StringIndexer
event_indexer = StringIndexer(inputCol="EventType", outputCol="EventTypeIndex")
location_indexer = StringIndexer(inputCol="Location", outputCol="LocationIndex")
severity_indexer = StringIndexer(inputCol="Severity", outputCol="SeverityIndex")

# Assemble features into a single vector column
feature_columns = ["EventTypeIndex", "LocationIndex", "SeverityIndex", "Hour", "DayOfWeek", "DayOfMonth", "Month", "Year", "TimestampUnix"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Define the classifier with additional GPU-specific configurations if needed
classifier = RandomForestClassifier(labelCol="Is_Anomaly", featuresCol="features", numTrees=100)

# Create a pipeline
pipeline = Pipeline(stages=[event_indexer, location_indexer, severity_indexer, assembler, classifier])

# Create a parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(classifier.maxDepth, [5, 10, 15]) \
    .addGrid(classifier.numTrees, [50, 100]) \
    .build()

# Define a cross-validator
cross_validator = CrossValidator(estimator=pipeline,
                                 estimatorParamMaps=param_grid,
                                 evaluator=MulticlassClassificationEvaluator(labelCol="Is_Anomaly", metricName="accuracy"),
                                 numFolds=5)

# Split data into training and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Train the model
cv_model = cross_validator.fit(train_data)

In [26]:
# Specify the path where the model will be saved
model_save_path = 'spark_model.pkl'

# Save the trained model using Spark's save method
cv_model.write().overwrite().save(model_save_path)

# Make predictions using the trained model
predictions = cv_model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="Is_Anomaly", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy * 100:.2f}%")

Model Accuracy: 99.99%


In [39]:
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the saved CrossValidatorModel using the correct load method
model_path = 'spark_model.pkl'
cv_model = CrossValidatorModel.load(model_path)

# Make predictions using the loaded model
predictions = cv_model.transform(test_data)

# Evaluate the predictions
evaluator = MulticlassClassificationEvaluator(labelCol="Is_Anomaly", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy (Loaded Model): {accuracy * 100:.2f}%")

# Additional Predictions (Risk Prediction Model)
predictions.show(100)


Model Accuracy (Loaded Model): 99.99%
+------------------+-------------------+-----------+--------+--------------------+----------+----+---------+----------+-----+----+-------------+-------------------+------------+--------------------+------------------+-------------+--------------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|         EventType|          Timestamp|   Location|Severity|             Details|Is_Anomaly|Hour|DayOfWeek|DayOfMonth|Month|Year|TimestampUnix|      PrevTimestamp|PrevSeverity|       TimeDiffHours|SeverityEscalation|PotentialRisk|EventTypeIndex|LocationIndex|SeverityIndex|            features|       rawPrediction|         probability|prediction|
+------------------+-------------------+-----------+--------+--------------------+----------+----+---------+----------+-----+----+-------------+-------------------+------------+--------------------+------------------+-------------+--------------+-------------+----

In [35]:

# Add window-based lag calculations for escalation and rapidity analysis
window = Window.partitionBy("Location").orderBy("Timestamp")

# Calculate differences and severity escalation between consecutive events
df = df.withColumn("PrevTimestamp", F.lag("Timestamp", 1).over(window)) \
       .withColumn("PrevSeverity", F.lag("Severity", 1).over(window)) \
       .withColumn("TimeDiffHours", (F.unix_timestamp("Timestamp") - F.unix_timestamp("PrevTimestamp")) / 3600) \
       .withColumn("SeverityEscalation", F.when((F.col("PrevSeverity") == "low") & (F.col("Severity") != "low"), 1)
                   .when((F.col("PrevSeverity") == "medium") & (F.col("Severity") == "high"), 1).otherwise(0))

# Create the "PotentialRisk" column based on escalation and rapidity
df = df.withColumn("PotentialRisk", F.when((F.col("SeverityEscalation") == 1) & (F.col("TimeDiffHours") <= 24), 1).otherwise(0))

# Fill nulls created during lagging
df = df.fillna({"TimeDiffHours": 0, "SeverityEscalation": 0})

# Encode categorical columns using StringIndexer
event_indexer = StringIndexer(inputCol="EventType", outputCol="EventTypeIndex")
location_indexer = StringIndexer(inputCol="Location", outputCol="LocationIndex")
severity_indexer = StringIndexer(inputCol="Severity", outputCol="SeverityIndex")

# Define new feature columns
feature_columns = ["EventTypeIndex", "LocationIndex", "SeverityIndex", "TimeDiffHours", "SeverityEscalation"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Define the classifier for risk prediction
risk_classifier = RandomForestClassifier(labelCol="PotentialRisk", featuresCol="features", numTrees=100)

# Build the pipeline for risk prediction
pipeline = Pipeline(stages=[event_indexer, location_indexer, severity_indexer, assembler, risk_classifier])

# Create a parameter grid for tuning
param_grid = ParamGridBuilder().addGrid(risk_classifier.maxDepth, [5, 10, 15]).addGrid(risk_classifier.numTrees, [50, 100]).build()

# Define the cross-validator
cross_validator = CrossValidator(estimator=pipeline, 
                                 estimatorParamMaps=param_grid, 
                                 evaluator=MulticlassClassificationEvaluator(labelCol="PotentialRisk", metricName="accuracy"), 
                                 numFolds=5)

# Split data into training and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Train the risk model
cv_model = cross_validator.fit(train_data)




In [37]:
# Specify the path where the model will be saved
model_save_path = 'risk_model.pkl'

# Save the trained model using Spark's save method
cv_model.write().overwrite().save(model_save_path)

# Make predictions using the trained model
predictions = cv_model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="PotentialRisk", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Risk Prediction Model Accuracy: {accuracy * 100:.2f}%")

Risk Prediction Model Accuracy: 100.00%


In [42]:
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the saved CrossValidatorModel using the correct load method
model_path = 'risk_model.pkl'
cv_model = CrossValidatorModel.load(model_path)

# Make predictions using the loaded model
predictions = cv_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="PotentialRisk", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Risk Prediction Model Accuracy (Loaded Model): {accuracy * 100:.2f}%")

# Display the predictions
predictions.show(100)


Risk Prediction Model Accuracy (Loaded Model): 100.00%
+------------------+-------------------+-----------+--------+--------------------+----------+----+---------+----------+-----+----+-------------+-------------------+------------+--------------------+------------------+-------------+--------------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|         EventType|          Timestamp|   Location|Severity|             Details|Is_Anomaly|Hour|DayOfWeek|DayOfMonth|Month|Year|TimestampUnix|      PrevTimestamp|PrevSeverity|       TimeDiffHours|SeverityEscalation|PotentialRisk|EventTypeIndex|LocationIndex|SeverityIndex|            features|       rawPrediction|         probability|prediction|
+------------------+-------------------+-----------+--------+--------------------+----------+----+---------+----------+-----+----+-------------+-------------------+------------+--------------------+------------------+-------------+--------------+-