In [7]:
import os
import numpy as np
import tensorflow as tf
from datetime import datetime
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Concatenate

# Define the dimensions of the input images
image_width = 128
image_height = 128
image_channels = 3

# Siamese Network
input_a = Input(shape=(image_width, image_height, image_channels))
input_b = Input(shape=(image_width, image_height, image_channels))

# Shared convolutional layers
conv1 = Conv2D(32, (3, 3), activation='relu')
maxpool1 = MaxPooling2D((2, 2))
conv2 = Conv2D(64, (3, 3), activation='relu')
maxpool2 = MaxPooling2D((2, 2))
flatten = Flatten()

# Process first input
x1 = conv1(input_a)
x1 = maxpool1(x1)
x1 = conv2(x1)
x1 = maxpool2(x1)
x1 = flatten(x1)

# Process second input
x2 = conv1(input_b)
x2 = maxpool1(x2)
x2 = conv2(x2)
x2 = maxpool2(x2)
x2 = flatten(x2)

# Concatenate the outputs of the Siamese network
merged = Concatenate()([x1, x2])

# Regression Head
dense1 = Dense(64, activation='relu')(merged)
output = Dense(1, activation='linear')(dense1)

# Create the model
model = Model(inputs=[input_a, input_b], outputs=output)

# Compile the model
model.compile(loss='mean_squared_error', optimizer='adam')

# Load and preprocess the training data
train_dir = 'root_data_low/train'
folders = os.listdir(train_dir)
num_samples = len(folders)
X1 = [] #np.zeros((num_samples, image_width, image_height, image_channels))
X2 = [] #np.zeros((num_samples, image_width, image_height, image_channels))
y =  [] #np.zeros((num_samples, 1))
removed_file = []

count = 0
for folder in os.listdir(train_dir):
    if(folder[0] == '.'):
        continue
    folder_path = os.path.join(train_dir, folder)
    image_files = os.listdir(folder_path)
    
    start_date_str = '20' + folder.split('_')[1]
    start_date_obj = datetime.strptime(start_date_str, '%Y%m%d')
    
    find_start_point = 0
    
    for j in range(0, len(image_files)):
      img_path = os.path.join(folder_path, image_files[j])
      try:
        if(find_start_point == 0):
          start_image = image_files[j]
          start_image_path = os.path.join(folder_path, start_image)
          start_image_array = img_to_array(load_img(start_image_path, target_size=(image_width, image_height))) / 255.0
          find_start_point = 1
          count += 1

        else:
          current_image = image_files[j]
          current_image_path = os.path.join(folder_path, current_image)
          current_image_array = img_to_array(load_img(current_image_path, target_size=(image_width, image_height))) / 255.0
          removed_extension_filename = image_files[j].split('.')[0]
          date_str = '20' + removed_extension_filename.split('_')[1]  # remove root num 
          date_obj = datetime.strptime(date_str, '%Y%m%d%H%M%S')
          hours_since_start = (date_obj - start_date_obj).total_seconds() / 3600.0   
          X1.append(start_image_array)
          X2.append(current_image_array)          
          y.append(hours_since_start)
          count +=1
      except Exception as e:
                print(f"Error loading image {img_path}: {e}")
                removed_file.append(img_path)


Error loading image root_data_low/train/root1_220919/readme.txt: cannot identify image file <_io.BytesIO object at 0x7fc16e3972e0>
Error loading image root_data_low/train/root2_220914/readme.txt: cannot identify image file <_io.BytesIO object at 0x7fc3f259cc20>
Error loading image root_data_low/train/root3_220919/readme.txt: cannot identify image file <_io.BytesIO object at 0x7fc16e6940e0>
Error loading image root_data_low/train/root2_220908/root2_220908121649(1).jpg: unconverted data remains: (1)
Error loading image root_data_low/train/root2_220908/root2_220908124028(1).jpg: unconverted data remains: (1)
Error loading image root_data_low/train/root3_221121/readme.txt: cannot identify image file <_io.BytesIO object at 0x7fc3ec787100>
Error loading image root_data_low/train/root2_220919/readme.txt: cannot identify image file <_io.BytesIO object at 0x7fc3e11bc130>
Error loading image root_data_low/train/root2_221005/readme.txt: cannot identify image file <_io.BytesIO object at 0x7fc3e11b

In [8]:
# Convert the input data to TensorFlow tensors
X1 = tf.convert_to_tensor(X1)
X2 = tf.convert_to_tensor(X2)
y = tf.convert_to_tensor(y)

# Train the model
model.fit([X1, X2], y, epochs=10, batch_size=1)



Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7fc16e6a8250>

In [39]:

# Test the model with new images
test_dir = 'root_data_low/test'
test_folders = os.listdir(test_dir)

X1_test = [] #np.zeros((num_samples, image_width, image_height, image_channels))
X2_test = [] #np.zeros((num_samples, image_width, image_height, image_channels))
y_test =  [] #np.zeros((num_samples, 1))
removed_test_file = []

test_count = 0
for folder in os.listdir(test_dir):
    if(folder[0] == '.'):
        continue
    folder_path = os.path.join(test_dir, folder)
    image_files = os.listdir(folder_path)
    
    start_date_str = '20' + folder.split('_')[1]
    start_date_obj = datetime.strptime(start_date_str, '%Y%m%d')
    
    find_start_point = 0
    
    for j in range(0, len(image_files)):
      img_path = os.path.join(folder_path, image_files[j])
      try:
        if(find_start_point == 0):
          start_image = image_files[j]
          start_image_path = os.path.join(folder_path, start_image)
          start_image_array = img_to_array(load_img(start_image_path, target_size=(image_width, image_height))) / 255.0
          start_image_array = np.expand_dims(start_image_array, axis=0)
          find_start_point = 1
          test_count += 1

        else:
          current_image = image_files[j]
          current_image_path = os.path.join(folder_path, current_image)
          current_image_array = img_to_array(load_img(current_image_path, target_size=(image_width, image_height))) / 255.0
          current_image_array = np.expand_dims(current_image_array, axis=0)

          removed_extension_filename = image_files[j].split('.')[0]
          date_str = '20' + removed_extension_filename.split('_')[1]  # remove root num 
          date_obj = datetime.strptime(date_str, '%Y%m%d%H%M%S')
          hours_since_start = (date_obj - start_date_obj).total_seconds() / 3600.0             

          X1_test.append(start_image_array)
          X2_test.append(current_image_array)
          y_test.append(hours_since_start)
          test_count +=1
          
      except Exception as e:
                print(f"Error loading image {img_path}: {e}")
                removed_test_file.append(img_path)


In [40]:

predicted_time_difference = []

count_correct_predict = 0
for i in range(test_count):
    try:
        predict = model.predict([X1_test[i], X2_test[i]])
        predicted_time_difference.append(predict)
        if (predict < 12):
            if(y_test[i] < 12):
                count_correct_predict +=1
        elif(predict<24):
            if(y_test[i] >= 12 and y_test[i]<24):
                count_correct_predict +=1
        elif(predict<36):
            if(y_test[i] >= 24 and y_test[i]<36):
                count_correct_predict +=1
        elif(predict<48):
            if(y_test[i] >= 36 and y_test[i]<48 ):
                count_correct_predict +=1
        elif(predict<60):
            if(y_test[i] >= 48 and y_test[i]<60):
                count_correct_predict +=1 
        elif(predict<72):
            if(y_test[i] >= 60 and y_test[i]<72):
                count_correct_predict +=1
        else:
            if(y_test[i]>=72):
                count_correct_predict +=1
    except Exception as e:
                print(f"Error predict: {e}")

s = len(predicted_time_difference)
print("test count : ", s)
print("count_correct_predict : ", count_correct_predict)
print("accuracy : " , count_correct_predict/ s)


Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
test count :  1839
count_correct_predict :  1224
accuracy :  0.6655791190864601


In [29]:
#for i in predicted_time_difference:
#    print(i)
print(len(predicted_time_difference))

1839


In [45]:

# Test the model with new images
test_dir = 'root_data_low/train'
test_folders = os.listdir(test_dir)

X1_test = [] #np.zeros((num_samples, image_width, image_height, image_channels))
X2_test = [] #np.zeros((num_samples, image_width, image_height, image_channels))
y_test =  [] #np.zeros((num_samples, 1))
removed_test_file = []

test_count = 0
for folder in os.listdir(test_dir):
    if(folder[0] == '.'):
        continue
    folder_path = os.path.join(test_dir, folder)
    image_files = os.listdir(folder_path)
    
    start_date_str = '20' + folder.split('_')[1]
    start_date_obj = datetime.strptime(start_date_str, '%Y%m%d')
    
    find_start_point = 0
    
    for j in range(0, len(image_files), 50):
      img_path = os.path.join(folder_path, image_files[j])
      try:
        if(find_start_point == 0):
          start_image = image_files[j]
          start_image_path = os.path.join(folder_path, start_image)
          start_image_array = img_to_array(load_img(start_image_path, target_size=(image_width, image_height))) / 255.0
          start_image_array = np.expand_dims(start_image_array, axis=0)
          find_start_point = 1
          test_count += 1

        else:
          current_image = image_files[j]
          current_image_path = os.path.join(folder_path, current_image)
          current_image_array = img_to_array(load_img(current_image_path, target_size=(image_width, image_height))) / 255.0
          current_image_array = np.expand_dims(current_image_array, axis=0)

          removed_extension_filename = image_files[j].split('.')[0]
          date_str = '20' + removed_extension_filename.split('_')[1]  # remove root num 
          date_obj = datetime.strptime(date_str, '%Y%m%d%H%M%S')
          hours_since_start = (date_obj - start_date_obj).total_seconds() / 3600.0             

          X1_test.append(start_image_array)
          X2_test.append(current_image_array)
          y_test.append(hours_since_start)
          test_count +=1
          
      except Exception as e:
                print(f"Error loading image {img_path}: {e}")
                removed_test_file.append(img_path)



predicted_time_difference = []

count_correct_predict = 0
for i in range(test_count):
    try:
        predict = model.predict([X1_test[i], X2_test[i]])
        predicted_time_difference.append(predict)
        if (predict < 12):
            if(y_test[i] < 12):
                count_correct_predict +=1
        elif(predict<24):
            if(y_test[i] >= 12 and y_test[i]<24):
                count_correct_predict +=1
        elif(predict<36):
            if(y_test[i] >= 24 and y_test[i]<36):
                count_correct_predict +=1
        elif(predict<48):
            if(y_test[i] >= 36 and y_test[i]<48 ):
                count_correct_predict +=1
        elif(predict<60):
            if(y_test[i] >= 48 and y_test[i]<60):
                count_correct_predict +=1 
        elif(predict<72):
            if(y_test[i] >= 60 and y_test[i]<72):
                count_correct_predict +=1
        else:
            if(y_test[i]>=72):
                count_correct_predict +=1
    except Exception as e:
                print(f"Error predict: {e}")

s = len(predicted_time_difference)
print("test count : ", s)
print("count_correct_predict : ", count_correct_predict)
print("accuracy : " , count_correct_predict/ s)



Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index out of range
Error predict: list index