In [None]:
# This is the preprocessing code used for our image/label preprocessing 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!unzip /content/drive/Shareddrives/DL_Final_Project/HUST-OBS.zip > /dev/null

In [None]:
!pip install tf-models-official

In [None]:
import json
import os
from tqdm import tqdm
from PIL import Image

In [None]:
# !pip install --upgrade tensorflow keras

In [None]:
import random
from datetime import datetime
import cv2
import numpy as np
import math
import argparse
import pandas as pd
import tensorflow as tf
import csv
import matplotlib.pyplot as plt
import keras
import tensorflow_models as tfm

Datasplit:

1: TRAIN: ['H', 'X', 'L', 'Y'] TEST: ['G']

2: TRAIN: ['H', 'G', 'L', 'Y'] TEST: ['X']

3: TRAIN: ['H', 'X', 'G'] TEST: ['L', 'Y']

In [None]:
'''
adding salt & pepper noise to dataset
CREDIT: Wang et al. HUST-OBS data preprocessing

NOTE: In order to create comparable results, we need to
do the same preprocessing steps on the dataset
'''
def salt_and_pepper_noise(image):
  if np.random.random() < 0.5:
      image1 = np.array(image)

      # add noise
      salt_vs_pepper_ratio = np.random.uniform(0, 0.4)
      amount = np.random.uniform(0, 0.006)
      num_salt = np.ceil(amount * image1.size / 3 * salt_vs_pepper_ratio)
      num_pepper = np.ceil(amount * image1.size / 3 * (1.0 - salt_vs_pepper_ratio))

      # Generate at random locations
      coords_salt = [np.random.randint(0, i - 1, int(num_salt)) for i in image1.shape]
      coords_pepper = [np.random.randint(0, i - 1, int(num_pepper)) for i in image1.shape]

      # image1[coords_salt] = 255
      image1[coords_salt[0], coords_salt[1], :] = 255
      image1[coords_pepper[0], coords_pepper[1], :] = 0
      image = Image.fromarray(image1)

  return image

In [None]:
'''
erode_and_dialate image for noise removal, segmentation, and feature extraction
CREDIT: Wang et al. HUST-OBS data preprocessing

NOTE: In order to create comparable results, we need to
do the same preprocessing steps on the dataset
'''
def erode_and_dialate(image):
  # Generate a random number between 0 and 2
  random_value = random.random() * 3

  if random_value < 1:  # 1/3 probability of performing addition operation
      he = random.randint(1, 3)
      kernel = np.ones((he, he), np.uint8)
      image = cv2.erode(image, kernel, iterations=1)
  elif random_value < 2:  # 1/3 probability of performing division operation
      he = random.randint(1, 3)  # Generate a random integer between 1 and 10 as the divisor
      kernel = np.ones((he, he), np.uint8)
      image = cv2.dilate(image, kernel, iterations=1)
  return image

In [None]:
class RandomGaussianBlur(object):
    def __init__(self, p=0.5, min_kernel_size=3, max_kernel_size=15, min_sigma=0.1, max_sigma=1.0):
        self.p = p
        self.min_kernel_size = min_kernel_size
        self.max_kernel_size = max_kernel_size
        self.min_sigma = min_sigma
        self.max_sigma = max_sigma

    def __call__(self, img):
        if random.random() < self.p and self.min_kernel_size < self.max_kernel_size:
            kernel_size = random.randrange(self.min_kernel_size, self.max_kernel_size + 1, 2)
            sigma = random.uniform(self.min_sigma, self.max_sigma)
            return tfm.vision.augment.gaussian_filter2d(img, kernel_size, sigma)
        else:
            return img

In [None]:
def pad_with_white(image, xl, yl, xr, yr):
  # white is RGB (255, 255, 255), so we want to pad with 255
  # the dimensions of the image are (height, width, channels)
  paddings = [[yl, yr], [xl, xr], [0, 0]]
  image = tf.pad(image, paddings, mode='CONSTANT', constant_values=255)
  return image

In [None]:
class ColorJitter(object):
  def __init__(self, brightdelta, contrastdelta, satdelta, huedelta):
    self.brightdelta = brightdelta
    self.contrastdelta = contrastdelta
    self.satdelta = satdelta
    self.huedelta = huedelta

  def __call__(self, image):
    image = tf.image.random_brightness(image, self.brightdelta)
    image = tf.image.random_contrast(image, max(0, 1 - self.contrastdelta), 1 + self.contrastdelta)
    image = tf.image.random_saturation(image, max(0, 1 - self.satdelta), 1 + self.satdelta)
    image = tf.image.random_hue(image, self.huedelta)
    return image

In [None]:
def random_apply(image, transforms, p):
  if random.random() <= p:
    for transform in transforms:
      image = transform(image)
  return image

In [None]:
def tensorflow_normalize(image, mean, std):
    image = tf.convert_to_tensor(image, dtype=tf.float32)
    image = (image - mean) / std
    return image

In [None]:
'''
process_image_train converts a png to a tensor
'''
def process_image_train(image):
  # Checks if image is grayscale and converts to RGB
  if image.mode == 'L':
        image = image.convert('RGB')

  # Resize image to 72 x 72
  w, h = image.size
  if w > h:
      x = 72
      y = round(h / w * 72)
  # x, y = 72,72
  else:
      y = 72
      x = round(w / h * 72)

  # Reshape to desired x & y of 129 x 129 pixels
  sizey, sizex = 129, 129
  if y < 128:
      while sizey > 128 or sizey < 16:
          sizey = round(random.gauss(y, 30))
  if x < 128:
      while sizex > 128 or sizex < 16:
          sizex = round(random.gauss(x, 30))

  dx = 128 - sizex
  dy = 128 - sizey

  if dx > 0:
      xl = -1
      while xl > dx or xl < 0:
          xl = round(dx / 2)
          xl = round(random.gauss(xl, 10))
  else:
      xl = 0
  if dy > 0:
      yl = -1
      while yl > dy or yl < 0:
          yl = round(dy / 2)
          yl = round(random.gauss(yl, 10))
  else:
      yl = 0

  yr = dy - yl
  xr = dx - xl

  # Image processing
  image = salt_and_pepper_noise(image)
  image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
  image = erode_and_dialate(image)
  image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
  random_gaussian_blur = RandomGaussianBlur()
  image = random_gaussian_blur(image)
  image = tf.image.resize(image, (sizey, sizex))
  image = tf.image.random_flip_left_right(image)
  image = pad_with_white(image, xl, yl, xr, yr)
  image = tf.keras.layers.RandomRotation(factor=15/(180 * math.pi), fill_mode='constant', fill_value=255)(image)
  mean = [0.85233593, 0.85246795, 0.8517555]
  std = [0.31232414, 0.3122127, 0.31273854]
  image = tensorflow_normalize(image, mean, std)
  return image

In [None]:
'''
process_image_test converts a png to a tensor
'''
def process_image_test(image):
  # Checks if image is grayscale and converts to RGB
  if image.mode == 'L':
        image = image.convert('RGB')

  # Resize image to 72 x 72
  w, h = image.size
  if w > h:
      dy = w - h
      yl = round(dy / 2)
      yr = dy - yl
      image = pad_with_white(image, 0, yl, 0, yr)
  else:
      dx = h - w
      xl = round(dx / 2)
      xr = dx - xl
      image = pad_with_white(image, xl, 0, xr, 0)

  image = tf.image.resize(image, (128, 128))
  mean = [0.85233593, 0.85246795, 0.8517555]
  std = [0.31232414, 0.3122127, 0.31273854]
  image = tensorflow_normalize(image, mean, std)
  return image

In [None]:
from tqdm import tqdm

def preprocess(path: str, process_type: str):
    assert process_type in ["train", "test"] # check if preprocess type is train/test

    with open(path, newline='') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')

        # process labels
        inputs = []
        labels = []

        # Count the total number of rows excluding the header
        total_rows = sum(1 for _ in reader) - 1  # Exclude the header row
        csvfile.seek(0)  # Reset the file pointer

        # process images
        header = True
        for row in tqdm(reader, total=total_rows, desc=f'Processing {process_type} data'):
            if header is True:
                header = False
            else:
                try:
                  label = int(row[0]) # get labels
                  image_path = row[1] # get images
                  image = Image.open(image_path) # open image
                except:
                  print("error in", row, "with label", label, "image", image_path)

                if (process_type=="train"):
                    image = process_image_train(image) # process image for train

                if (process_type == "test"):
                    image = process_image_test(image) # process image for test

                # update the labels & images lists
                labels.append(label) # add label to label list
                inputs.append(image)

    labels = tf.convert_to_tensor(labels)
    print(labels.shape)

    # labels = tf.convert_to_tensor(np.array(labels))
    dataset = tf.data.Dataset.from_tensor_slices((inputs, labels))
    return dataset


In [None]:
# train or test
process_type = {
    "mock": "train",
    "wangtest": "test",
    "wangtrain": "train",
    "train1": "train",
    "train2": "train",
    "train3": "train",
    "test1": "test",
    "test2": "test",
    "test3": "test",
    "train1a": "train",
    "train1b": "train",
    "train1c": "train",
    "train2a": "train",
    "train2b": "train",
    "train2c": "train",
    "train3a": "train",
    "train3b": "train",
    "train3c": "train",
    "train4a": "train",
    "train4b": "train",
    "train4c": "train",
    "train4d": "train"
}

In [None]:
# Storing file paths to train & test csv datasets:
file_paths = {
    "mock": "/content/drive/Shareddrives/DL_Final_Project/DATA/mock.csv",
    "wangtest":"/content/drive/Shareddrives/DL_Final_Project/DATA/wang_test.csv",
    "wangtrain": "/content/drive/Shareddrives/DL_Final_Project/DATA/wang_train.csv",
    "train1": "/content/drive/Shareddrives/DL_Final_Project/DATA/train1-HXLY.csv",
    "train2": "/content/drive/Shareddrives/DL_Final_Project/DATA/train2-HGLY.csv",
    "train3": "/content/drive/Shareddrives/DL_Final_Project/DATA/train3-HXG.csv",
    "test1": "/content/drive/Shareddrives/DL_Final_Project/DATA/test1-G.csv",
    "test2": "/content/drive/Shareddrives/DL_Final_Project/DATA/test2-X.csv",
    "test3": "/content/drive/Shareddrives/DL_Final_Project/DATA/test3-LY.csv",

    "train1a": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN1/partition_1.csv",
    "train1b": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN1/partition_2.csv",
    "train1c": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN1/partition_3.csv",

    "train2a": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN2/partition_1.csv",
    "train2b": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN2/partition_2.csv",
    "train2c": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN2/partition_3.csv",

    "train3a": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN3/partition_1.csv",
    "train3b": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN3/partition_2.csv",
    "train3c": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN3/partition_3c.csv",

    "train4a": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN4/partition_1.csv",
    "train4b": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN4/partition_2.csv",
    "train4c": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN4/partition_3.csv",
    "train4d": "/content/drive/Shareddrives/DL_Final_Project/DATA/TRAIN4/partition_4.csv"
}

In [None]:
saved_data_paths = {
    "mock": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/mock",
    "wangtest": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/wangtest",
    "wangtrain":"/content/drive/Shareddrives/DL_Final_Project/tf_datasets/wangtrain",
    "train1": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train1",
    "train2": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train2",
    "train3": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train3",
    "test1": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/test1",
    "test2": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/test2",
    "test3": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/test3",

    "train1a": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train1a",
    "train1b": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train1b",
    "train1c": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train1c",

    "train2a": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train2a",
    "train2b": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train2b",
    "train2c": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train2c",

    "train3a": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train3a",
    "train3b": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train3b",
    "train3c": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train3c",

    "train4a": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train4a",
    "train4b": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train4b",
    "train4c": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train4c",
    "train4d": "/content/drive/Shareddrives/DL_Final_Project/tf_datasets/train4d"
}

In [None]:
# preprocess takes in
# 1) file path (see file_paths dict)
# 2) "train" or "test" mode of preprocess
key = "train4c" # we need to do train1a, train1b, train1c, train2a, train2b, train2c, train3a, train3b, train3c
print("==========================")
print(key)
print("--------------------------")
dataset = preprocess(file_paths[key], process_type[key])
save_path = saved_data_paths[key]
try:
    dataset.save(save_path)
    print("Dataset", key, "saved successfully")
except Exception as e:
    print("Error saving dataset:", e)


train4c
--------------------------


Processing train data: 15415it [11:43, 21.92it/s]


(15414,)
Dataset train4c saved successfully


##Testing (this section sees if pytorch & tensorflow are returning the same during reimplementation)
We need to make sure that our preprocessing is the same as Wang et al 2024 in order to make our
results comparable 

In [None]:
test_image = "/content/drive/Shareddrives/DL_Final_Project/HUST-OBS/deciphered/0005/G_0005_甲1170合31823無名組.png"
image = Image.open(test_image)
if image.mode == 'L':
    image = image.convert('RGB')
image_width, image_height = image.size
if image_width > image_height:
    x = 72
    y = round(image_height / image_width * 72)
# x, y = 72,72
else:
    y = 72
    x = round(image_width / image_height * 72)
sizey, sizex = 129, 129
if y < 128:
    while sizey > 128 or sizey < 16:
        sizey = round(random.gauss(y, 30))
if x < 128:
    while sizex > 128 or sizex < 16:
        sizex = round(random.gauss(x, 30))
dx = 128 - sizex  
dy = 128 - sizey
if dx > 0:
    xl = -1
    while xl > dx or xl < 0:
        xl = round(dx / 2)
        xl = round(random.gauss(xl, 10))
else:
    xl = 0
if dy > 0:
    yl = -1
    while yl > dy or yl < 0:
        yl = round(dy / 2)
        yl = round(random.gauss(yl, 10))
else:
    yl = 0
yr = dy - yl
xr = dx - xl
image = salt_and_pepper_noise(image)
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
image = erode_and_dialate(image)
image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))

In [None]:
from torchvision import transforms
def test(img, a=None, b=None, c=None, d=None):
  torch_img = transforms.Resize((sizey,sizex))(img)
  torch_img = transforms.Pad([xl, yl, xr, yr], fill=(255, 255, 255), padding_mode='constant')(torch_img)
  #torch_img = transforms.RandomRotation(degrees=(15, 15), center=(round(64), round(64)), fill=(255, 255, 255))(torch_img)
  torch_img = transforms.RandomGrayscale(p=1)(torch_img)
  blur = RandomGaussianBlur()
  #tf_img = blur(img)
  tf_img = tf.image.resize(img, (sizey, sizex))
  tf_img = pad_with_white(tf_img, xl, yl, xr, yr)
  #tf_img = tf.keras.layers.RandomRotation(factor=(15/(180 * math.pi), 15/(180 * math.pi)), fill_mode='constant', fill_value=255)(tf_img)
  #tf_img = random_apply(tf_img, [ColorJitter(0.4, 0.4, 0.4, 0.1)], 0.8)
  tf_img = random_apply(tf_img, [tf.image.rgb_to_grayscale], 1)
  return torch_img, tf_img

In [None]:
# from PIL import ImageChops

In [None]:
#display(image)
a, b = test(image)
b = tf.keras.utils.array_to_img(b.numpy())
print(a)
display(a)
display(b)
print(b.size)

diff = ImageChops.difference(a, b)
print(a.size)
print(b.size)
display(diff)


if diff.getbbox():
  #The images are inherently going to be a bit different, I think just converting to and from torch and tf changes them
  # but make sure the diff image (the black one with white bits) is mostly similar
    print("images are different")
else:
    print("images are the same")

a = np.array(a)
b = np.array(b)

# Write the array to disk
with open('/content/drive/Shareddrives/DL_Final_Project/torchout.txt', 'w') as outfile:
    # I'm writing a header here just for the sake of readability
    # Any line starting with "#" will be ignored by numpy.loadtxt
    outfile.write('# Array shape: {0}\n'.format(a.shape))

    # Iterating through a ndimensional array produces slices along
    # the last axis. This is equivalent to data[i,:,:] in this case
    for data_slice in a:

        # The formatting string indicates that I'm writing out
        # the values in left-justified columns 7 characters in width
        # with 2 decimal places.
        np.savetxt(outfile, data_slice, fmt='%-7.2f')

        # Writing out a break to indicate different slices...
        outfile.write('# New slice\n')

with open('/content/drive/Shareddrives/DL_Final_Project/tfout.txt', 'w') as outfile:
    # I'm writing a header here just for the sake of readability
    # Any line starting with "#" will be ignored by numpy.loadtxt
    outfile.write('# Array shape: {0}\n'.format(b.shape))

    # Iterating through a ndimensional array produces slices along
    # the last axis. This is equivalent to data[i,:,:] in this case
    for data_slice in b:

        # The formatting string indicates that I'm writing out
        # the values in left-justified columns 7 characters in width
        # with 2 decimal places.
        np.savetxt(outfile, data_slice, fmt='%-7.2f')

        # Writing out a break to indicate different slices...
        outfile.write('# New slice\n')

#open a file and write to it and then get the diff?
# with open('/content/drive/Shareddrives/DL_Final_Project/upload_check.txt', 'w') as g:
#   a = np.array(a)
#   b = np.array(b)
#   np.savetxt('/content/drive/Shareddrives/DL_Final_Project/torchout.txt', a)
#   np.savetxt('/content/drive/Shareddrives/DL_Final_Project/tfout.txt', b)
  # g.write("torch\n")
  # g.write(str(np.array(a).to_list()))
  # g.write("\ntensorflow\n")
  # g.write(str(np.array(b).to_list()))
# print(np.array(a))
# print("\n")
# print(np.array(b))

In [None]:
# TEST NORMALIZATION
import torch
import torchvision.transforms as transforms
import tensorflow as tf
import numpy as np


def pytorch_normalize(image):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.85233593, 0.85246795, 0.8517555], [0.31232414, 0.3122127, 0.31273854])
    ])
    return transform(image)

def tensorflow_normalize(image, mean, std):
    image = tf.convert_to_tensor(image, dtype=tf.float32)
    image = (image - mean) / std
    return image

# Define image dimensions
width = 100
height = 100

# Generate a gradient for each color channel
channel_r = np.linspace(0, 1, width * height).reshape(height, width)
channel_g = np.linspace(1, 0, width * height).reshape(height, width)
channel_b = np.zeros((height, width))  # Constant zero for blue channel

# Combine channels to create a color image
image = np.stack([channel_r, channel_g, channel_b], axis=-1)

torch_normalized = pytorch_normalize(image)

# Normalize with TensorFlow
mean = [0.85233593, 0.85246795, 0.8517555]
std = [0.31232414, 0.3122127, 0.31273854]
tf_normalized = tensorflow_normalize(image, mean, std)

# print(torch_normalized)
# print("...")
# print(tf_normalized)

# Check if the results are the same
assert np.allclose(torch_normalized.permute(1, 2, 0).numpy(), tf_normalized.numpy(), atol=1e-5, rtol=1e-3)



In [None]:
import torch
import tensorflow as tf
import numpy as np

# Create a sample NumPy array
np_array = np.array([1, 2, 3, 4, 5])

# Convert the NumPy array to a tensor in PyTorch
torch_tensor = torch.from_numpy(np_array)

# Convert the NumPy array to a tensor in TensorFlow
tf_tensor = tf.convert_to_tensor(np_array)

# Check if the results are the same
print(tf_tensor)
print(torch_tensor)

