In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import zipfile
import os


os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

In [None]:
import numpy as np
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.metrics import TruePositives, TrueNegatives, FalsePositives, FalseNegatives

In [None]:
path_API_autentification_token= '/content/drive/MyDrive/BigData Pneumonia Project'

In [None]:
import zipfile
import os

os.environ['KAGGLE_CONFIG_DIR'] = path_API_autentification_token

!kaggle datasets download -d paultimothymooney/chest-xray-pneumonia

zip_ref = zipfile.ZipFile('chest-xray-pneumonia.zip', 'r')
zip_ref.extractall('/tmp')
zip_ref.close()

In [None]:
train_dir = '/tmp/chest_xray/train'
val_dir = '/tmp/chest_xray/val'
test_dir = '/tmp/chest_xray/test'

In [None]:
img_height = 128
img_width = 128
batch_size = 32

In [None]:
train_df = tf.keras.preprocessing.image_dataset_from_directory(
    train_dir,
    color_mode = 'grayscale',
    image_size = (img_height,img_width),
    batch_size = batch_size
)

val_df = tf.keras.preprocessing.image_dataset_from_directory(
    val_dir,
    color_mode = 'grayscale',
    image_size = (img_height,img_width),
    batch_size = batch_size
)

test_df = tf.keras.preprocessing.image_dataset_from_directory(
    test_dir,
    color_mode = 'grayscale',
    image_size = (img_height,img_width),
    batch_size = batch_size
)

In [None]:
train_labels = []
test_labels = []
val_labels = []

for images, labels in train_df.unbatch():
  train_labels.append(labels.numpy())

for images, labels in test_df.unbatch():
  test_labels.append(labels.numpy())

for images, labels in val_df.unbatch():
  val_labels.append(labels.numpy())



In [None]:
print("Count values of instances per label in train dataset\n")
print(pd.DataFrame(np.unique(train_labels, return_counts = True), index=['Label', 'count'], columns=['Normal','Pneumonia']))
print("\n\nCount values of instances per label in test dataset\n")
print(pd.DataFrame(np.unique(test_labels, return_counts = True), index=['Label', 'count'], columns=['Normal','Pneumonia']))
print("\n\nCount values of instances per label in validation dataset\n")
print(pd.DataFrame(np.unique(val_labels, return_counts = True), index=['Label', 'count'], columns=['Normal','Pneumonia']))

In [None]:
plt.figure(figsize=(10, 10))
for images, labels in train_df.take(1):
    for i in range(9):
        plt.subplot(3, 3, i + 1)
        plt.imshow(np.squeeze(images[i].numpy().astype("uint8")))
        plt.title(train_df.class_names[labels[i]])
        plt.axis("off")

In [None]:
AUTOTUNE = tf.data.experimental.AUTOTUNE

train_df = train_df.cache().prefetch(buffer_size=AUTOTUNE)
val_df = val_df.cache().prefetch(buffer_size=AUTOTUNE)
test_df = test_df.cache().prefetch(buffer_size=AUTOTUNE)

In [None]:
appname = "Predicting pneumonia"

spark_mirror = "https://archive.apache.org/dist/spark"
spark_version = "3.3.1"
hadoop_version = "3"

! apt-get update
! apt-get install openjdk-8-jdk-headless -qq > /dev/null

! rm -rf spark-{spark_version}-bin-hadoop{hadoop_version}.tgz spark-{spark_version}-bin-hadoop{hadoop_version}
! wget -q {spark_mirror}/spark-{spark_version}/spark-{spark_version}-bin-hadoop{hadoop_version}.tgz
! tar xzf spark-{spark_version}-bin-hadoop{hadoop_version}.tgz

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{spark_version}-bin-hadoop{hadoop_version}"

! pip install -q findspark
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(appname).master("local[*]").getOrCreate()

In [None]:
from pyspark.sql.functions import lit
import pyspark.sql.functions as F
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.ml.image import ImageSchema
from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import rand # shuffling
import cv2 # image preprocessing
import numpy as np

In [None]:
import glob2

train_normal_cases = glob2.glob(train_dir + '/NORMAL/' + '*jpeg')
train_pneumonia_cases = glob2.glob(train_dir + '/PNEUMONIA/' + '*jpeg')

test_normal_cases = glob2.glob(test_dir + '/NORMAL/' + '*jpeg')
test_pneumonia_cases = glob2.glob(test_dir + '/PNEUMONIA/'+ '*jpeg')

val_normal_cases = glob2.glob(val_dir + '/NORMAL/' + '*jpeg')
val_pneumonia_cases = glob2.glob(val_dir + '/PNEUMONIA/' + '*jpeg')

columns = ["path", "label"]

In [None]:
data_tr_normal = [[path_train_normal, 0.0] for path_train_normal in train_normal_cases]
data_tr_pneumonia = [[path_train_pneumonia, 1.0] for path_train_pneumonia in train_pneumonia_cases]

train_normal_df = spark.createDataFrame(data_tr_normal, columns)
train_pneumonia_df = spark.createDataFrame(data_tr_pneumonia, columns)

train_df = train_normal_df.unionAll(train_pneumonia_df)

train_df = train_df.repartition(100)

In [None]:
data_te_normal = [[path_test_normal, 0.0] for path_test_normal in test_normal_cases]
data_te_pneumonia = [[path_test_pneumonia, 1.0] for path_test_pneumonia in test_pneumonia_cases]

test_normal_df = spark.createDataFrame(data_te_normal, columns)
test_pneumonia_df = spark.createDataFrame(data_te_pneumonia, columns)

test_df = test_normal_df.unionAll(test_pneumonia_df)

test_df = test_df.repartition(100)

In [None]:
data_v_normal = [[path_val_normal, 0.0] for path_val_normal in val_normal_cases]
data_v_pneumonia = [[path_val_pneumonia, 1.0] for path_val_pneumonia in val_pneumonia_cases]

val_normal_df = spark.createDataFrame(data_v_normal, columns)
val_pneumonia_df = spark.createDataFrame(data_v_pneumonia, columns)

val_df = val_normal_df.unionAll(val_pneumonia_df)

val_df = val_df.repartition(100)

In [None]:
train_df = train_df.orderBy(rand())
test_df = test_df.orderBy(rand())
val_df = val_df.orderBy(rand())

In [None]:
val_df.show()

In [None]:
val_df.printSchema()

In [None]:
train_df.groupBy('label').count().show()
test_df.groupBy('label').count().show()
val_df.groupBy('label').count().show()

In [None]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

@F.udf(VectorUDT())
def image_to_vector_udf(imagepath: StringType):
  img = cv2.imread(imagepath) #read the image
  img = cv2.resize(img, (32, 32)) #resize
  img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) #change color scale to gray
  img = img/255.0
  img = np.reshape(img, (32,32,1)).flatten().tolist()
  #flat_list = [item for sublist in img for item in sublist]
  return Vectors.dense(img)


train_df = train_df.withColumn("features", image_to_vector_udf(train_df["path"]))
test_df = test_df.withColumn("features", image_to_vector_udf(test_df["path"]))
val_df = val_df.withColumn("features", image_to_vector_udf(val_df["path"]))


list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

train_df = train_df.select(
    train_df["label"],
    list_to_vector_udf(train_df["features"]).alias("features")
)

test_df = test_df.select(
    test_df["label"],
    list_to_vector_udf(test_df["features"]).alias("features")
)

val_df = val_df.select(
    val_df["label"],
    list_to_vector_udf(val_df["features"]).alias("features")
)

In [None]:
val_df.show()

In [None]:
val_df.printSchema()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

estimator = RandomForestClassifier(labelCol='label')

evaluator = MulticlassClassificationEvaluator(labelCol='label')

rfc = estimator
params = ParamGridBuilder() \
    .addGrid(rfc.numTrees, [ 20, 50, 100 ]) \ #tạo một lưới để tìm kiếm numTrees tốt nhất trong số 20, 50 và 100
    .addGrid(rfc.maxDepth, [ 3, 5, 7 ]) \ #tìm kiếm maxDepth tốt nhất trong số 3, 5 và 7
    .build()

crossval = CrossValidator(estimator=estimator,
                          estimatorParamMaps=params,
                          evaluator=evaluator,
                          numFolds=4) #Tạo một phiên bản CrossValidator chỉ định công cụ ước tính, lưới tham số, bộ đánh giá và số lần gấp để xác thực chéo

model_rf = crossval.fit(train_df) #Huấn luyện mô hình Rừng ngẫu nhiên bằng cách sử dụng xác thực chéo

In [None]:
cvModel_rf = model_rf.bestModel #rích xuất mô hình tốt nhất từ CrossValidatorModel​​

In [None]:
print("Random Forest best model parameters: \n----")

print('Best Param (numTrees): ', cvModel_rf._java_obj.getNumTrees())

print('Best Param (maxDepth): ', cvModel_rf._java_obj.getMaxDepth())

In [None]:
print("\nBest Model accuracy : " , cvModel_rf.summary.accuracy) #Độ chính xác tổng thể của mô hình tốt nhất trên tập dữ liệu được đánh giá là khoảng 98%
print("-------------\nBest Model recall by label [0 - normal ,1 - pneumonia] \n\n " , cvModel_rf.summary.recallByLabel) #tỉ lệ dương thực theo nhãn biểu thị việc thu hồi nhãn 0 ('bình thường') và nhãn 1 ('viêm phổi') tương ứng. Các giá trị này biểu thị tỷ lệ phần trăm các phiên bản được dự đoán chính xác của từng lớp
print("-------------\nBest Model precision by label [0 - normal ,1 - pneumonia] \n\n " , cvModel_rf.summary.precisionByLabel) #số liệu đo lường độ chính xác của các dự đoán tích cực được đưa ra, tương ứng với độ chính xác tương ứng cho nhãn 0 (bình thường) và nhãn 1 (viêm phổi)
print("-------------\nBest Model False positive rate by label [0 - normal ,1 - pneumonia] \n\n " , cvModel_rf.summary.falsePositiveRateByLabel) #biểu thị tỷ lệ dương tính giả tương ứng cho nhãn 0 (bình thường) và nhãn 1 (viêm phổi)

In [None]:
results_rf = model_rf.transform(test_df) #đánh giá mô hình Rừng ngẫu nhiên tốt nhất của mình trên tập dữ liệu thử nghiệm (test_df)

In [None]:
Met_rf = evaluator.evaluate(results_rf)
print("Test F1 score:", Met_rf)

In [None]:
results_rf.show(5)

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics #đánh giá thêm hiệu suất của mô hình trên dữ liệu thử nghiệm
from pyspark import SparkContext
sc=spark.sparkContext

predictionAndLabels_rf = sc.parallelize(results_rf.select('label','prediction').toPandas().values.tolist()) #song hóa các nhãn dự đoán và nhãn thực tế từ dữ liệu thử nghiệm

metrics_rf = MulticlassMetrics(predictionAndLabels_rf)

In [None]:
# Tính toán độ chính xác, thu hồi và điểm F1 cho mỗi lớp (lớp 0,0 và lớp 1,0)
labels = [0.0, 1.0]
for label in sorted(labels):
    print("Class %s precision = %s" % (label, metrics_rf.precision(label))) #các trường hợp được dự đoán chính xác của loại 0/1 trong số tất cả các trường hợp của lớp 0/1 được dự đoán
    print("Class %s recall = %s" % (label, metrics_rf.recall(label))) #các trường hợp được dự đoán chính xác của lớp 0/1 trong số tất cả các trường hợp lớp 0/1 thực tế
    print("Class %s F1 Measure = %s" % (label, metrics_rf.fMeasure(label, beta=1.0))) #trung bình hài hòa của độ chính xác và độ thu hồi cho loại 0/1
    print()
print()

# Các số liệu có trọng số được tính toán, có tính đến sự mất cân bằng của lớp. Độ chính xác có trọng số, mức thu hồi và điểm F1
print("Weighted recall = %s" % metrics_rf.weightedRecall)
print("Weighted precision = %s" % metrics_rf.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics_rf.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics_rf.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics_rf.weightedFalsePositiveRate)

In [None]:
print("FP Metrics per label (normal, pneumonia): (" , metrics_rf.falsePositiveRate(0.0), ",",  metrics_rf.falsePositiveRate(1.0), ")")
#Tỷ lệ dương tính giả (FPR) cho mỗi nhãn lớp bằng cách sử dụng MulticlassMetrics

In [None]:
conf_m_rf=metrics_rf.confusionMatrix().toArray()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
# Plotting the confussion matrix as a heatmap
plt.figure(figsize=(5,3))
sns.set(font_scale=1.2)
ax = sns.heatmap(conf_m_rf, annot=True,xticklabels=['H', 'P'], yticklabels=['H', 'P'], cbar=False, cmap='Blues',linewidths=1, linecolor='black', fmt='.0f')
plt.yticks(rotation=0)
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
ax.xaxis.set_ticks_position('top')
plt.title('Confusion matrix - test data\n(H - healthy/normal, P - pneumonia)')
plt.show()
# Ma trận nhầm lẫn cho thấy dự đoán của mô hình của bạn phù hợp như thế nào với các nhãn thực tế trên các lớp khác nhau.

In [None]:
model.save('mlibmodel.h5')