In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import matplotlib.pyplot as plt

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .master("spark://spark-master:7077") \
                    .appName("Ass4-Q3") \
                    .config("spark.executor.memory", "6g") \
                    .getOrCreate()

24/12/09 01:25:50 WARN StandaloneSchedulerBackend: Application ID is not initialized yet.
24/12/09 01:25:50 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
24/12/09 01:25:50 WARN StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master


In [5]:
import cv2
import tempfile
import json
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Dropout, Activation, Conv2D, MaxPooling2D

BUFFER_SIZE = 10000

def make_test_datasets(path, batch_size=64):
    X = []
    y = []
    #convert = lambda category : int(category == 'dog')
    
    for p in os.listdir(path):
        #category = p.split(".")[0]
        #category = convert(category)
        img_array = cv2.imread(os.path.join(path,p), cv2.IMREAD_COLOR)
        new_img_array = cv2.resize(img_array, dsize=(150,150,3)) / 255
        X.append(new_img_array)
        #y.append(category)

    dataset = tf.data.Dataset.from_tensor_slices((
        tf.cast(X, tf.float32),
        #tf.cast(y, tf.int64)
        )
    )
    return dataset
    

def build_and_compile_cnn_model():
    model = Sequential()
    # Adds a densely-connected layer with 64 units to the model:
    model.add(Conv2D(64,(3,3), activation = 'relu', input_shape = (80,80,3)))
    model.add(MaxPooling2D((2, 2)))
    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(MaxPooling2D((2, 2)))
    model.add(Conv2D(128, (3, 3), activation='relu'))
    model.add(MaxPooling2D((2, 2)))
    model.add(Conv2D(128, (3, 3), activation='relu'))
    model.add(MaxPooling2D((2, 2)))
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(Dense(1, activation='sigmoid'))
    
    model.compile(optimizer="adam",
                  loss='binary_crossentropy',
                  metrics=['binary_accuracy'])
    return model

2024-12-09 01:25:56.705670: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-12-09 01:25:57.026948: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [6]:
def train(batch_size=64):
    import cv2
    import tempfile
    import json
    import tensorflow as tf
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense, Flatten, Dropout, Activation, Conv2D, MaxPooling2D
    

    BUFFER_SIZE = 10000

    path="/mnt/data_file/train_data/train"
    checkpoint_path = "/mnt/data_file/ass4.model/"
    random_path = tempfile.TemporaryDirectory()

    from tensorflow.keras.callbacks import ModelCheckpoint

    chief_callback = ModelCheckpoint(checkpoint_path, 
                                     monitor='binary_accuracy', 
                                     save_best_only=True,
                                     mode='max')
    dummy_callback = ModelCheckpoint(random_path.name, monitor='binary_accuracy', save_best_only=True)

    def make_datasets():
        X = []
        y = []
        convert = lambda category : int(category == 'dog')
        
        for p in os.listdir(path):
            category = p.split(".")[0]
            category = convert(category)
            img_array = cv2.imread(os.path.join(path,p),cv2.IMREAD_GRAYSCALE)
            new_img_array = cv2.resize(img_array, dsize=(80,80)) / 255.0
            X.append(new_img_array)
            y.append(category)

        dataset = tf.data.Dataset.from_tensor_slices((
            tf.cast(X, tf.float32),
            tf.cast(y, tf.int64))
        )
        dataset = dataset.repeat().shuffle(BUFFER_SIZE).batch(batch_size)
        print(f"Data size: {len(y)}")
        print(f"Dog pictures #: {sum(y)}")
        return dataset
        

    def build_and_compile_cnn_model():
        model = Sequential()
        # Adds a densely-connected layer with 64 units to the model:
        model.add(Conv2D(64,(3,3), activation = 'relu', input_shape = (80,80,1)))
        model.add(MaxPooling2D(pool_size = (2,2)))
        # Add another:
        model.add(Conv2D(64,(3,3), activation = 'relu'))
        model.add(MaxPooling2D(pool_size = (2,2)))
        
        model.add(Flatten())
        model.add(Dense(64, activation='relu'))
        # Add a softmax layer with 10 output units:
        model.add(Dense(1, activation='sigmoid'))
        
        model.compile(optimizer="adam",
                      loss='binary_crossentropy',
                      metrics=['binary_accuracy'])
        return model

    train_datasets = make_datasets()
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
    train_datasets = train_datasets.with_options(options)
    multi_worker_model = build_and_compile_cnn_model()
    if 'TF_CONFIG' in os.environ:    
        tf_config = json.loads(os.environ['TF_CONFIG'])    
        node_index = tf_config['task']['index']    
        is_chief = node_index == 0    
        print(f"Node Index: {node_index}, Is Chief: {is_chief}")
    #callback = [chief_callback if is_chief else dummy_callback]
    callback = chief_callback
    multi_worker_model.fit(x=train_datasets, epochs=3, steps_per_epoch=5, callbacks=callback)
    print(max(multi_worker_model.predict(train_datasets, steps=5)))
    random_path.cleanup()
    return multi_worker_model

In [7]:
from spark_tensorflow_distributor import MirroredStrategyRunner
 
BATCH_SIZE_PER_REPLICA = 64
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
runner = MirroredStrategyRunner(num_slots=2, local_mode=False, use_gpu=False)
runner.run(train, batch_size=BATCH_SIZE_PER_REPLICA)

INFO:MirroredStrategyRunner:Doing CPU training...
INFO:MirroredStrategyRunner:Will run with 2 Spark tasks.
INFO:MirroredStrategyRunner:Distributed training in progress...
INFO:MirroredStrategyRunner:View Spark executor stderr logs to inspect training...


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.NullPointerException
java.lang.NullPointerException
	at org.apache.spark.api.python.PythonAccumulatorV2.<init>(PythonRDD.scala:706)
	at org.apache.spark.api.python.PythonAccumulatorV2.copyAndReset(PythonRDD.scala:726)
	at org.apache.spark.api.python.PythonAccumulatorV2.copyAndReset(PythonRDD.scala:698)
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:171)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1570)
	at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1397)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1332)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2991)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1397)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1332)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2991)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NullPointerException
	at org.apache.spark.api.python.PythonAccumulatorV2.<init>(PythonRDD.scala:706)
	at org.apache.spark.api.python.PythonAccumulatorV2.copyAndReset(PythonRDD.scala:726)
	at org.apache.spark.api.python.PythonAccumulatorV2.copyAndReset(PythonRDD.scala:698)
	at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:171)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1244)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1570)
	at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1397)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1332)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2991)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)


In [25]:
checkpoint_path = "/mnt/data_file/ass4.model/"
test_path="/mnt/data_file/train_data/test1"


In [26]:
model = build_and_compile_cnn_model()

In [68]:

model = tf.keras.models.load_model(checkpoint_path)

In [69]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 78, 78, 64)        640       
                                                                 
 max_pooling2d (MaxPooling2  (None, 39, 39, 64)        0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 37, 37, 64)        36928     
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 18, 18, 64)        0         
 g2D)                                                            
                                                                 
 flatten (Flatten)           (None, 20736)             0         
                                                                 
 dense (Dense)               (None, 64)                1

In [29]:
test_data = make_test_datasets(test_path)

In [70]:
test_data

<_TensorSliceDataset element_spec=(TensorSpec(shape=(80, 80), dtype=tf.float32, name=None),)>

In [71]:
predict = model.predict(test_data.batch(64))



In [56]:
y_pred = (predict > 0.5).astype(np.float32)

In [73]:
tf.math.sigmoid(predict)

<tf.Tensor: shape=(12500, 1), dtype=float32, numpy=
array([[0.5],
       [0.5],
       [0.5],
       ...,
       [0.5],
       [0.5],
       [0.5]], dtype=float32)>

In [38]:
model.predict()


(<tf.Tensor: shape=(80, 80), dtype=float32, numpy=
array([[0.23921569, 0.21960784, 0.23529412, ..., 0.4       , 0.42745098,
        0.4       ],
       [0.21568628, 0.23529412, 0.24705882, ..., 0.38431373, 0.41960785,
        0.3764706 ],
       [0.19215687, 0.22745098, 0.22745098, ..., 0.3647059 , 0.39607844,
        0.36078432],
       ...,
       [0.7647059 , 0.7372549 , 0.74509805, ..., 0.42352942, 0.47843137,
        0.49411765],
       [0.7490196 , 0.7647059 , 0.7647059 , ..., 0.79607844, 0.60784316,
        0.45490196],
       [0.7019608 , 0.7372549 , 0.74509805, ..., 0.5058824 , 0.7058824 ,
        0.34117648]], dtype=float32)>,)


In [43]:
len(predict)

12500

In [45]:
X_test = []
id_line = []
def create_test1_data(test_path):
    for p in os.listdir(test_path):
        id_line.append(p.split(".")[0])
        img_array = cv2.imread(os.path.join(test_path,p),cv2.IMREAD_GRAYSCALE)
        new_img_array = cv2.resize(img_array, dsize=(80, 80))
        X_test.append(new_img_array)
create_test1_data(test_path)
X_test = np.array(X_test).reshape(-1,80,80,1)
X_test = X_test/255

In [61]:
predictions = model.predict(X_test)



In [49]:
predicted_val = [int(round(p[0])) for p in predictions]

In [51]:
max(predicted_val)

0

In [63]:
def predict_class(y_pred, thresh=0.5):
  # Return a tensor with  `1` if `y_pred` > `0.5`, and `0` otherwise
  return tf.cast(y_pred > thresh, tf.float32)

def accuracy(y_pred, y):
  # Return the proportion of matches between `y_pred` and `y`
  y_pred = tf.math.sigmoid(y_pred)
  y_pred_class = predict_class(y_pred)
  check_equal = tf.cast(y_pred_class == y,tf.float32)
  acc_val = tf.reduce_mean(check_equal)
  return acc_val

In [80]:
model = train()

Data size: 25000
Dog pictures #: 12500
Epoch 1/3


INFO:tensorflow:Assets written to: /mnt/data_file/ass4.model/assets


Epoch 2/3
Epoch 3/3
[9.635336e-13]


In [3]:
def make_datasets():
    X = []
    y = []
    convert = lambda category : int(category == 'dog')
    train_path = "/mnt/data_file/train_data/train"
    for p in os.listdir(train_path):
        category = p.split(".")[0]
        category = convert(category)
        img_array = cv2.imread(os.path.join(train_path,p),cv2.IMREAD_COLOR)
        new_img_array = cv2.resize(img_array, dsize=(150,150)) / 255.0
        X.append(new_img_array)
        y.append(category)

    dataset = tf.data.Dataset.from_tensor_slices((
        tf.cast(X, tf.float32),
        tf.cast(y, tf.int64))
    )
    #dataset = dataset.repeat().shuffle(10000).batch(64)
    print(f"Data size: {len(y)}")
    print(f"Dog pictures #: {sum(y)}")
    return dataset

In [None]:
train_dataset = make_datasets()

In [None]:
train_dataset.save("/mnt/data_file/train_data/saved_data/")

In [None]:
model = build_and_compile_cnn_model()

In [None]:
tf.config.run_functions_eagerly(True)
model.fit(train_dataset.batch(64).shuffle(10000), epochs=10)

In [19]:
model.evaluate(train_dataset.batch(128))



[4.7014031410217285, 0.5]