In [1]:
!pip install pyspark medmnist tensorflow findspark scikit-learn matplotlib

Collecting medmnist
  Downloading medmnist-3.0.2-py3-none-any.whl.metadata (14 kB)
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Collecting fire (from medmnist)
  Downloading fire-0.7.0.tar.gz (87 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.2/87.2 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading medmnist-3.0.2-py3-none-any.whl (25 kB)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Building wheels for collected packages: fire
  Building wheel for fire (setup.py) ... [?25l[?25hdone
  Created wheel for fire: filename=fire-0.7.0-py3-none-any.whl size=114249 sha256=326d7a13c0f3227a82c9c0e3b2b735bf3e2e344b723be1977e719d6676118647
  Stored in directory: /root/.cache/pip/wheels/19/39/2f/2d3cadc408a8804103f1c34ddd4b9f6a93497b11fa96fe738e
Successfully built fire
Installing collected packages: findspark, fire, medmnist
Successfully installed fi

In [None]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from medmnist import INFO, ChestMNIST
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from sklearn.utils import resample

# Initialize Spark session
spark = SparkSession.builder \
    .appName("ChestMNIST with PySpark") \
    .getOrCreate()
print("spark created")

# Choose chest x-ray data from medmnist library
data_flag = 'chestmnist'
info = INFO[data_flag]
print("data chosed")

# Load the dataset using MedMNIST library
train_dataset = ChestMNIST(split='train', download=True)
val_dataset = ChestMNIST(split='val', download=True)
test_dataset = ChestMNIST(split='test', download=True)
print("dataset loaded")

# Extract the pixel data
X_train, X_val, X_test = train_dataset.imgs, val_dataset.imgs, test_dataset.imgs
y_train, y_val, y_test = train_dataset.labels, val_dataset.labels, test_dataset.labels
print("data extracted")

# ** Data Exploration **

# Normalize pixel values
X_train, X_val, X_test = X_train / 255.0, X_val / 255.0, X_test / 255.0
print("data normalized")

# Add another dimension for CNN model to be able to process the data.
X_train = X_train[..., np.newaxis]
X_val = X_val[..., np.newaxis]
X_test = X_test[..., np.newaxis]
print("added dimension")
# Convert to Spark DataFrame
def convert_to_spark_df(X, y):
    # Flatten the image data to a single vector
    X_flattened = X.reshape(X.shape[0], -1)
    print("image flattened")
    # Ensure y is reshaped correctly to be a single column of labels for each sample
    # If y is multi-label, make sure it's the correct shape (num_samples, num_labels)
    if y.ndim > 1:
        y = y.argmax(axis=1).reshape(-1, 1)  # If multi-label, use the argmax for multi-class classification

    # Create a Spark DataFrame with features and labels
    data = np.hstack((X_flattened, y))  # Combine flattened features and labels
    columns = [f"feature_{i}" for i in range(X_flattened.shape[1])] + ["label"]
    print("spark dataframe created")
    return spark.createDataFrame(data.tolist(), columns)

train_data = convert_to_spark_df(X_train, y_train)
val_data = convert_to_spark_df(X_val, y_val)
test_data = convert_to_spark_df(X_test, y_test)

# ** Feature Engineering with PySpark **

# VectorAssembler to combine all features into a single vector column for scaling
feature_cols = [f"feature_{i}" for i in range(X_train.shape[1])]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
print("combined features into vector")

# StandardScaler for feature scaling
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
print("applied standard scaler")

# ** Pipeline for Data Preprocessing **

# Create pipeline for transformations
pipeline = Pipeline(stages=[assembler, scaler])
print("pipeline created")

# Fit and transform the training data using PySpark’s StandardScaler
scaler_model = pipeline.fit(train_data)
train_data = scaler_model.transform(train_data)
val_data = scaler_model.transform(val_data)
test_data = scaler_model.transform(test_data)
print("fit and transformed training data")
# ** Data Rebalancing with PySpark **

def rebalance_data_spark(X, y):
    # Convert to Spark DataFrame
    data = np.hstack((X, y.reshape(-1, 1)))
    columns = [f"feature_{i}" for i in range(X.shape[1])] + ["label"]
    df = spark.createDataFrame(data.tolist(), columns)
    print("spark dataframe created after rebalancing")

    # Split data into positive and negative classes
    negative_df = df.filter(df["label"] == 0)
    positive_df = df.filter(df["label"] == 1)
    print("data split after rebalancing")

    # Over-sample the positive class
    positive_oversampled = positive_df.sample(True, fraction=negative_df.count() / positive_df.count(), seed=42)
    print("oversampled data")

    # Combine and shuffle the dataset
    balanced_df = negative_df.union(positive_oversampled)
    print("combined and shuffled rebalanced dataset")
    return balanced_df.sample(False, 1.0, seed=42)  # shuffle

balanced_train_data = rebalance_data_spark(X_train, y_train)

# ** Convert Data to Numpy for TensorFlow Training **

def spark_to_numpy(df):
    features = np.array(df.select("scaled_features").rdd.map(lambda row: row[0].toArray()).collect())
    labels = np.array(df.select("label").rdd.map(lambda row: row[0]).collect())
    print("converted features and labels into numpy arrays")
    return features, labels

# Convert the rebalanced data back to Numpy arrays for TensorFlow
X_train_balanced, y_train_balanced = spark_to_numpy(balanced_train_data)
X_val, y_val = spark_to_numpy(val_data)
X_test, y_test = spark_to_numpy(test_data)

# ** CNN Model Definition with TensorFlow **

# Define the CNN model using keras
model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(1, activation='sigmoid')
])
print("cnn model defined")

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
print("cnn model compiled")

# Train the Model using Keras
history = model.fit(X_train_balanced, y_train_balanced, epochs=100, batch_size=32, validation_data=(X_val, y_val))
print("cnn model trained")

# ** Model Evaluation **

# Evaluate the Model
test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)
print(f"Test Accuracy: {test_acc:.2f}")

# Confusion Matrix
y_pred = model.predict(X_test)
y_pred_binary = (y_pred > 0.5).astype(int)
ConfusionMatrixDisplay.from_predictions(y_test, y_pred_binary, display_labels=['Negative', 'Positive'])
plt.show()

# Classification Report
print("Classification Report:")
print(classification_report(y_test, y_pred_binary, target_names=['Negative', 'Positive']))

# Plot accuracy and loss over the epochs to view model performance
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()


spark created
data chosed
Downloading https://zenodo.org/records/10519652/files/chestmnist.npz?download=1 to /root/.medmnist/chestmnist.npz


100%|██████████| 82.8M/82.8M [00:07<00:00, 11.1MB/s]


Using downloaded and verified file: /root/.medmnist/chestmnist.npz
Using downloaded and verified file: /root/.medmnist/chestmnist.npz
dataset loaded
data extracted
data normalized
added dimension
image flattened
spark dataframe created
image flattened
spark dataframe created
image flattened
spark dataframe created
combined features into vector
applied standard scaler
pipeline created
