In [1]:
import os
from tensorflow.config import list_physical_devices

project_base_dir = 'big-data-project/'
train_data_dir = os.path.join(project_base_dir, "data", "train")
test_data_dir = os.path.join(project_base_dir, "data", "test")
img_width = 1024
img_height = 1024
channel = 3
num_classes= 2 #[pneumonia, normal] [bacterial, viral]

print("Num GPUs Available: ", len(list_physical_devices('GPU')))

Num GPUs Available:  1


2021-12-09 17:48:18.219170: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-12-09 17:48:18.232026: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-12-09 17:48:18.232765: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero


In [2]:
import io
import pandas as pd
from PIL import Image
from tensorflow import expand_dims
from tensorflow.keras import applications
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, GlobalMaxPooling2D
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, PandasUDFType

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

### Select the base model, here I have used a custom ResNet50 that was trained on detecting "Normal" vs "Pneumonia"
base_model = applications.ResNet50(weights=None, include_top=False, input_shape=(img_width, img_height, channel))

# Adding the top layer
x = base_model(base_model.input, training=False)
x = GlobalAveragePooling2D(name="global_average_pooling2d_last")(x)
predictions = Dense(num_classes, activation='softmax', name='predictions')(x)
source_model = Model(inputs=base_model.input, outputs=predictions)

# Loading trained model for predicting "Normal" vs "Pneumonia"
source_model.load_weights('big-data-project/weights_fine_tune/model_1.07-0.9644.hdf5') # change this to your path and model weights

## Removing top dense layer and leaving just GlobalAveragePooling2D
last_layer_name = "global_average_pooling2d_last"
model = Model(inputs=source_model.input, outputs=source_model.get_layer(last_layer_name).output)
model.summary()
del source_model

def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content))
    img = img.convert('RGB')
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_content(model, content):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = expand_dims(preprocess(content), axis=0)
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    return preds.flatten()

def process_example(model, row):
    features = featurize_content(model, row.content).tolist()
    if "NORMAL" in row.path:
        label = 0
    if "PNEUMONIA" in row.path:
        if "bacteria" in row.path.lower():
            label = 2
        elif "virus" in row.path.lower():
            label = 1
        else:
            raise("Couldn't label image")
    return features, label

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/09 17:48:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-12-09 17:48:21.731497: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-12-09 17:48:21.732519: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:939] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-12-09 17:48:21.733296: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:93

Model: "model_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 1024, 1024, 3)]   0         
                                                                 
 resnet50 (Functional)       (None, 32, 32, 2048)      23587712  
                                                                 
 global_average_pooling2d_la  (None, 2048)             0         
 st (GlobalAveragePooling2D)                                     
                                                                 
Total params: 23,587,712
Trainable params: 23,534,592
Non-trainable params: 53,120
_________________________________________________________________


In [3]:
# Initialize the base path to the *new* directory that will contain
# our images after computing the training and testing split
BASE_PATH = os.path.join(project_base_dir, "dataset2")

# Define the names of the training, testing, and validation directories
TRAIN_PATH = os.path.join(BASE_PATH, "train")
if not os.path.exists(TRAIN_PATH):
    os.makedirs(TRAIN_PATH)

TEST_PATH = os.path.join(BASE_PATH, "test")
if not os.path.exists(TEST_PATH):
    os.makedirs(TEST_PATH)


In [4]:
# Reading training images from the mounted storage as binary files
images_df = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.png") \
  .option("recursiveFileLookup", "true") \
  .load(train_data_dir)

In [None]:
# Writing features and labels for Training Set
file = open(os.path.join(TRAIN_PATH, "data_train_max_pool.csv"), 'w', encoding="utf-8")
for row in images_df.rdd.collect:
    features, label = process_example(model, row)
    vec = ",".join(map(str, features))
    feature_row = f"{label},{vec}\n"
    file.write(feature_row)

2021-12-09 17:06:57.539930: I tensorflow/stream_executor/cuda/cuda_dnn.cc:366] Loaded cuDNN version 8200
[Stage 280:>                                                        (0 + 1) / 1]

In [4]:
# Reading test images from the mounted storage as binary files
images_df = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.png") \
  .option("recursiveFileLookup", "true") \
  .load(test_data_dir)

In [5]:
# Writing features and labels for Test Set
file = open(os.path.join(TEST_PATH, "data_test_max_pool.csv"), 'w', encoding="utf-8")
for row in images_df.rdd.collect:
    features, label = process_example(model, row)
    vec = ",".join(map(str, features))
    feature_row = f"{label},{vec}\n"
    file.write(feature_row)

2021-12-09 17:48:39.246939: I tensorflow/stream_executor/cuda/cuda_dnn.cc:366] Loaded cuDNN version 8200
[Stage 24:>                                                         (0 + 1) / 1]