Please verify that you have pyspark and the required java versions installed.

In [27]:
%pip install pyspark==3.0.0 && java -version

Collecting pyspark==3.0.0
  Using cached pyspark-3.0.0-py2.py3-none-any.whl
Installing collected packages: pyspark
Note: you may need to restart the kernel to use updated packages.


ERROR: Could not install packages due to an OSError: [WinError 32] The process cannot access the file because it is being used by another process: 'c:\\Users\\ethan\\AppData\\Local\\Programs\\Python\\Python310\\Lib\\site-packages\\pyspark\\jars\\HikariCP-2.5.1.jar'
Consider using the `--user` option or check the permissions.



In [28]:
import os
import pathlib
import pickle

os.environ["PYSPARK_SUBMIT_ARGS"] = "pyspark-shell"

data_path = pathlib.Path().cwd().parent / "spark-analytics" / "1m_health_events_dataset.csv"
model_path = pathlib.Path().cwd().parent / "spark-analytics" / "pyspark_models"

# PySpark KMeans Clustering

## Imports and data loading

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import month
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# create a SparkSession
spark = SparkSession.builder.appName("HealthRiskPrediction").getOrCreate()

data = spark.read.csv("1m_health_events_dataset.csv", header=True, inferSchema=True)
data = data.withColumn("Timestamp", data["Timestamp"].cast("timestamp"))
data = data.withColumn("Month", month(data["Timestamp"]))

# do a string indexing on categorical columns
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in ["EventType", "Location", "Severity"]]

# create a feature vector for risk prediction
riskFeatures = ["EventType_index", "Location_index", "Severity_index", "Month"]
riskAssembler = VectorAssembler(inputCols=riskFeatures, outputCol="riskFeatures")

# create a feature vector for anomaly detection
anomalyFeatures = ["EventType_index", "Location_index", "Severity_index"]
anomalyAssembler = VectorAssembler(inputCols=anomalyFeatures, outputCol="anomalyFeatures")

(trainingData, testData) = data.randomSplit([0.8, 0.2], seed=42)

## Model Creation & Evaluation #1
KMeans Clustering - Risk prediction


In [30]:
# create a MLP classifier for risk prediction
riskLayers = [len(riskFeatures), 10, 5, 2]  # Input features, hidden units, output classes
riskMLP = MultilayerPerceptronClassifier(labelCol="Is_Anomaly", featuresCol="riskFeatures", layers=riskLayers, seed=42)

# create a pipeline for risk prediction
riskPipeline = Pipeline(stages=indexers + [riskAssembler, riskMLP])

riskModel = riskPipeline.fit(trainingData)
riskPredictions = riskModel.transform(testData)

# evaluate the risk prediction model
riskEvaluator = BinaryClassificationEvaluator(labelCol="Is_Anomaly")
riskAccuracy = riskEvaluator.evaluate(riskPredictions)
print("Risk Prediction Accuracy:", riskAccuracy)

Risk Prediction Accuracy: 0.9474703434020741


## Model Creation & Evaluation #2
KMeans Clustering - Anomaly detection


In [31]:
# create a KMeans model for anomaly detection
kmeans = KMeans(featuresCol="anomalyFeatures", k=2, seed=42)

anomalyPipeline = Pipeline(stages=indexers + [anomalyAssembler, kmeans])
anomalyModel = anomalyPipeline.fit(trainingData)

anomalyPredictions = anomalyModel.transform(testData)

# evaluate the anomaly detection model
anomalyEvaluator = ClusteringEvaluator(featuresCol="anomalyFeatures")
anomalyScore = anomalyEvaluator.evaluate(anomalyPredictions)
print("Anomaly Detection Score:", anomalyScore)

Anomaly Detection Score: 0.5764826823551313


## Model Saving
Currently does not work

In [32]:
%%script pass

# save our models, and feel free to change to your desired local path!
riskModel.write().overwrite().save(f'{model_path / "risk_model"}')
anomalyModel.write().overwrite().save(f'{model_path / "anomaly_model"}')

Couldn't find program: 'pass'


In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, datediff, current_timestamp
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# create a SparkSession
spark = SparkSession.builder \
    .appName("HealthRiskPrediction") \
    .getOrCreate()

# load the CSV data
data = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f'{data_path}')

# preprocess the data
data = data.withColumn("days_since_event", datediff(current_timestamp(), col("Timestamp")))
data = data.withColumn("Timestamp", unix_timestamp("Timestamp", "yyyy-MM-dd HH:mm:ss"))

# encode categorical variables
event_indexer = StringIndexer(inputCol="EventType", outputCol="EventTypeIndex")
location_indexer = StringIndexer(inputCol="Location", outputCol="LocationIndex")
severity_indexer = StringIndexer(inputCol="Severity", outputCol="SeverityIndex")

# one-hot encode categorical variables
event_encoder = OneHotEncoder(inputCol="EventTypeIndex", outputCol="EventTypeVec")
location_encoder = OneHotEncoder(inputCol="LocationIndex", outputCol="LocationVec")
severity_encoder = OneHotEncoder(inputCol="SeverityIndex", outputCol="SeverityVec")

# create feature vector
assembler = VectorAssembler(
    inputCols=["Timestamp", "days_since_event", "EventTypeVec", "LocationVec", "SeverityVec"],
    outputCol="features"
)

# split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [34]:
# create a Random Forest classifier
rf = RandomForestClassifier(labelCol="Is_Anomaly", featuresCol="features", numTrees=100)

# create a pipeline
pipeline = Pipeline(stages=[
    event_indexer, location_indexer, severity_indexer,
    event_encoder, location_encoder, severity_encoder,
    assembler, rf
])

# set up parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.maxBins, [16, 32, 64]) \
    .build()

# create a binary classification evaluator
evaluator = BinaryClassificationEvaluator(labelCol="Is_Anomaly")

# create a cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3, parallelism=4)

In [35]:
model = cv.fit(train_data)
predictions = model.transform(test_data)

# evaluate our model
accuracy = evaluator.evaluate(predictions)
print(f"Model accuracy: {accuracy:.8f}")

Model accuracy: 0.95055473


In [36]:
%%script pass

model.write().overwrite().save(f'{model_path}/randomforest_cv_model_1')

Couldn't find program: 'pass'


In [37]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, datediff, current_timestamp
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# create a SparkSession
spark = SparkSession.builder \
    .appName("HealthRiskPrediction") \
    .getOrCreate()

# load the CSV data
data = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f'{data_path}')

# sample a subset of the data for training and testing
sampled_data = data.sample(fraction=1.0, seed=42)

# preprocess the data
sampled_data = sampled_data.withColumn("days_since_event", datediff(current_timestamp(), col("Timestamp")))
sampled_data = sampled_data.withColumn("Timestamp", unix_timestamp("Timestamp", "yyyy-MM-dd HH:mm:ss"))

# encode categorical variables
event_indexer = StringIndexer(inputCol="EventType", outputCol="EventTypeIndex")
location_indexer = StringIndexer(inputCol="Location", outputCol="LocationIndex")
severity_indexer = StringIndexer(inputCol="Severity", outputCol="SeverityIndex")

# one-hot encode categorical variables
event_encoder = OneHotEncoder(inputCol="EventTypeIndex", outputCol="EventTypeVec")
location_encoder = OneHotEncoder(inputCol="LocationIndex", outputCol="LocationVec")
severity_encoder = OneHotEncoder(inputCol="SeverityIndex", outputCol="SeverityVec")

# create feature vector
assembler = VectorAssembler(
    inputCols=["Timestamp", "days_since_event", "EventTypeVec", "LocationVec", "SeverityVec"],
    outputCol="features"
)

# split the data into training and test sets
train_data, test_data = sampled_data.randomSplit([0.8, 0.2], seed=42)

In [38]:
# create a Random Forest classifier
rf = RandomForestClassifier(labelCol="Is_Anomaly", featuresCol="features", numTrees=50)

# create a pipeline
pipeline = Pipeline(stages=[
    event_indexer, location_indexer, severity_indexer,
    event_encoder, location_encoder, severity_encoder,
    assembler, rf
])

# set up parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [5, 10]) \
    .addGrid(rf.maxBins, [16, 32]) \
    .build()

# create a binary classification evaluator
evaluator = BinaryClassificationEvaluator(labelCol="Is_Anomaly")

# create a cross-validator with fewer folds
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=2, parallelism=4)

In [39]:
model = cv.fit(train_data)

predictions = model.transform(test_data)

# evaluate our model
accuracy = evaluator.evaluate(predictions)
print(f"Model accuracy: {accuracy:.8f}")

Model accuracy: 0.95087880


In [41]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, datediff, current_timestamp, hour, dayofweek, month
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# create a SparkSession
spark = SparkSession.builder \
    .appName("HealthRiskPrediction") \
    .getOrCreate()

# load the CSV data
data = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(f'{data_path}')

# sample a subset of the data for training and testing
sampled_data = data.sample(fraction=1.0, seed=42)

# preprocess the data
sampled_data = sampled_data.withColumn("days_since_event", datediff(current_timestamp(), col("Timestamp")))
sampled_data = sampled_data.withColumn("hour", hour(col("Timestamp")))
sampled_data = sampled_data.withColumn("day_of_week", dayofweek(col("Timestamp")))
sampled_data = sampled_data.withColumn("month", month(col("Timestamp")))
sampled_data = sampled_data.withColumn("Timestamp", unix_timestamp("Timestamp", "yyyy-MM-dd HH:mm:ss"))

# encode categorical variables
event_indexer = StringIndexer(inputCol="EventType", outputCol="EventTypeIndex")
location_indexer = StringIndexer(inputCol="Location", outputCol="LocationIndex")
severity_indexer = StringIndexer(inputCol="Severity", outputCol="SeverityIndex")

# one-hot encode categorical variables
event_encoder = OneHotEncoder(inputCol="EventTypeIndex", outputCol="EventTypeVec")
location_encoder = OneHotEncoder(inputCol="LocationIndex", outputCol="LocationVec")
severity_encoder = OneHotEncoder(inputCol="SeverityIndex", outputCol="SeverityVec")

# create feature vector
assembler = VectorAssembler(
    inputCols=["Timestamp", "days_since_event", "hour", "day_of_week", "month", "EventTypeVec", "LocationVec", "SeverityVec"],
    outputCol="features"
)

# scale features
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# split the data into training and test sets
train_data, test_data = sampled_data.randomSplit([0.8, 0.2], seed=42)

In [44]:
# create classifiers
rf = RandomForestClassifier(labelCol="Is_Anomaly", featuresCol="scaledFeatures", numTrees=100)
gbt = GBTClassifier(labelCol="Is_Anomaly", featuresCol="scaledFeatures", maxIter=50)

# create a pipeline
pipeline = Pipeline(stages=[
    event_indexer, location_indexer, severity_indexer,
    event_encoder, location_encoder, severity_encoder,
    assembler, scaler, rf
])

# set up parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.maxBins, [16, 32, 64]) \
    .addGrid(rf.numTrees, [50, 100, 200]) \
    .build()

# create evaluators
binary_evaluator = BinaryClassificationEvaluator(labelCol="Is_Anomaly")
multiclass_evaluator = MulticlassClassificationEvaluator(labelCol="Is_Anomaly", metricName="accuracy")

# create a cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, evaluator=binary_evaluator, numFolds=3, parallelism=4)

In [None]:
model = cv.fit(train_data)

predictions = model.transform(test_data)

# evaluate our model
binary_metrics = binary_evaluator.evaluate(predictions)
multiclass_accuracy = multiclass_evaluator.evaluate(predictions)
print(f"Binary Classification Metrics: {binary_metrics:.4f}")
print(f"Multiclass Classification Accuracy: {multiclass_accuracy:.4f}")

## May 4th

Enhanced model training pipeline by adding StandardScaler and GBTClassifier options, and implemented model versioning for zero-downtime deployment

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, datediff, current_timestamp, hour, dayofweek, month
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, MinMaxScaler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# constants
DATA_PATH = "../data/1m_health_events_dataset.csv"
MODEL_NAME = "health_risk_model_notebook"
MODEL_DIR = "../models"

# create SparkSession
spark = SparkSession.builder.appName("HealthRiskPrediction").getOrCreate()

data = spark.read.csv(DATA_PATH, header=True, inferSchema=True)

# preprocess the data
preprocessed_data = data \
    .withColumn("days_since_event", datediff(current_timestamp(), col("Timestamp"))) \
    .withColumn("hour", hour(col("Timestamp"))) \
    .withColumn("day_of_week", dayofweek(col("Timestamp"))) \
    .withColumn("month", month(col("Timestamp"))) \
    .withColumn("Timestamp", unix_timestamp("Timestamp", "yyyy-MM-dd HH:mm:ss"))

# split the data into training and testing sets
train_data, test_data = preprocessed_data.randomSplit([0.8, 0.2], seed=42)

# create the pipeline
event_indexer = StringIndexer(inputCol="EventType", outputCol="EventTypeIndex")
location_indexer = StringIndexer(inputCol="Location", outputCol="LocationIndex")
severity_indexer = StringIndexer(inputCol="Severity", outputCol="SeverityIndex")

event_encoder = OneHotEncoder(inputCol="EventTypeIndex", outputCol="EventTypeVec")
location_encoder = OneHotEncoder(inputCol="LocationIndex", outputCol="LocationVec")
severity_encoder = OneHotEncoder(inputCol="SeverityIndex", outputCol="SeverityVec")

assembler = VectorAssembler(
    inputCols=["Timestamp", "days_since_event", "hour", "day_of_week", "month", "EventTypeVec", "LocationVec", "SeverityVec"],
    outputCol="features"
)

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

classifier = RandomForestClassifier(labelCol="Is_Anomaly", featuresCol="scaledFeatures", numTrees=100)

pipeline = Pipeline(stages=[
    event_indexer, location_indexer, severity_indexer,
    event_encoder, location_encoder, severity_encoder,
    assembler, scaler, classifier
])

# train the pipeline
param_grid = ParamGridBuilder() \
    .addGrid(classifier.maxDepth, [5, 10, 15]) \
    .addGrid(classifier.maxBins, [16, 32, 64]) \
    .addGrid(classifier.impurity, ["gini", "entropy"]) \
    .build()

binary_evaluator = BinaryClassificationEvaluator(labelCol="Is_Anomaly")

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=param_grid, evaluator=binary_evaluator, numFolds=3, parallelism=4)
model = cv.fit(train_data)

# evaluate the model
predictions = model.transform(test_data)

binary_evaluator = BinaryClassificationEvaluator(labelCol="Is_Anomaly")
multiclass_evaluator = MulticlassClassificationEvaluator(labelCol="Is_Anomaly", metricName="accuracy")

binary_metrics = binary_evaluator.evaluate(predictions)
multiclass_accuracy = multiclass_evaluator.evaluate(predictions)

print(f"Binary Classification Metrics: {binary_metrics:.4f}")
print(f"Multiclass Classification Accuracy: {multiclass_accuracy:.4f}")

# save the model
model_path = f"{MODEL_DIR}/{MODEL_NAME}"
model.write().overwrite().save(model_path)