# 1. Import dependencies 

In [38]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import shutil
import plotly.graph_objects as go
from PIL import Image
from skimage import measure
from skimage.filters import threshold_otsu

from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D
from tensorflow.keras.metrics import AUC
from tensorflow.keras.callbacks import EarlyStopping

np.set_printoptions(precision=6, suppress=True, threshold=10000, linewidth=np.inf)
pd.options.display.max_rows = 999

# 2. Load data

## 2.1 Create directories

In [2]:
BASE_DIR = os.path.dirname(os.getcwd())
dataset = os.path.join('data', 'dataset')
data_dir = os.path.join('data', 'images')

if not os.path.exists(os.path.join(BASE_DIR, data_dir)):
    os.makedirs(os.path.join(BASE_DIR, data_dir))

train_dir = os.path.join(data_dir, 'train')
valid_dir = os.path.join(data_dir, 'valid')
test_dir = os.path.join(data_dir, 'test')

for directory in (train_dir, valid_dir, test_dir):
    if not os.path.exists(os.path.join(BASE_DIR, directory)):
        os.makedirs(os.path.join(BASE_DIR, directory))

In [3]:
image_fnames = os.listdir(os.path.join(BASE_DIR, 'data/dataset'))
image_fnames = [fname for fname in image_fnames if fname.split('.')[1].lower() in ['jpg', 'png', 'jpeg']]
print(len(image_fnames))

9996


## 2.2 Split into train, valid, test

In [4]:
size = len(image_fnames)

train_size = int(np.floor(0.7 * size))
valid_size = int(np.floor(0.2 * size))
test_size = size - train_size - valid_size

train_idx = train_size
valid_idx = train_size + valid_size
test_idx = train_size + valid_size + test_size

In [5]:
for i, fname in enumerate(image_fnames):
    if i <= train_idx:
        src = os.path.join(BASE_DIR, 'data', 'dataset', fname)
        dst = os.path.join(BASE_DIR, train_dir, fname)
        shutil.copyfile(src, dst)
    elif train_idx < i <= valid_idx:
        src = os.path.join(BASE_DIR, 'data', 'dataset', fname)
        dst = os.path.join(BASE_DIR, valid_dir, fname)
        shutil.copyfile(src, dst)
    elif valid_idx < i < test_idx:
        src = os.path.join(BASE_DIR, 'data', 'dataset', fname)
        dst = os.path.join(BASE_DIR, test_dir, fname)
        shutil.copyfile(src, dst)


In [6]:
print('train set', len(os.listdir(os.path.join(BASE_DIR, train_dir))))
print('validation set', len(os.listdir(os.path.join(BASE_DIR, valid_dir))))
print('test set', len(os.listdir(os.path.join(BASE_DIR, test_dir))))

train set 6998
validation set 1999
test set 999


## 2.3 Load images and coordinates


In [7]:
def load_images(directory):
    images = []
    labels = []
    for filename in os.listdir(directory):
        image = Image.open(os.path.join(directory, filename))
        image_array = np.array(image) / 255.0
        images.append(image_array)
        coordinates = filename.split('.')[0].split('_')
        x1, y1, x2, y2 = __builtins__.map(int, coordinates)
        label = [x1, y1, x2, y2]
        labels.append(label)
    return np.array(images), np.array(labels)

def coord_map(coords):
    heat_maps = []
    for coord in coords:
        heat_map = np.zeros((64, 64, 1))
        heat_map[coord[1], coord[0], 0] = 1
        heat_map[coord[3], coord[2], 0] = 1
        heat_maps.append(heat_map)
    heat_maps = np.array(heat_maps)
    return heat_maps

train_images, train_labels = load_images(os.path.join(BASE_DIR, train_dir))
valid_images, valid_labels = load_images(os.path.join(BASE_DIR, valid_dir))
test_images, test_labels = load_images(os.path.join(BASE_DIR, test_dir))

train_coords_maps = coord_map(train_labels)
valid_coords_maps = coord_map(valid_labels)
test_coords_maps = coord_map(test_labels)

# 3. Create model

## 3.1 Create model architecture

In [60]:

def create_model():
    inputs = Input((64, 64, 1))
    conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(pool1)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(pool2)

    up1 = UpSampling2D(size=(2, 2))(conv3)
    conv4 = Conv2D(64, (3, 3), activation='relu', padding='same')(up1)

    up2 = UpSampling2D(size=(2, 2))(conv4)
    conv5 = Conv2D(32, (3, 3), activation='relu', padding='same')(up2)

    output = Conv2D(1, (1, 1), activation='sigmoid')(conv5)

    model = Model(inputs=[inputs], outputs=[output])

    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', AUC()])

    return model

model = create_model()
early_stopping = EarlyStopping(monitor='val_loss', patience=5, verbose=1)


## 3.2 Train model

In [61]:
model.fit(train_images, train_coords_maps, validation_data=(valid_images, valid_coords_maps), epochs=1000, batch_size=128, callbacks=[early_stopping])

Epoch 1/1000
Epoch 2/1000
Epoch 3/1000
Epoch 4/1000
Epoch 5/1000
Epoch 6/1000
Epoch 7/1000
Epoch 8/1000
Epoch 9/1000
Epoch 10/1000
Epoch 11/1000
Epoch 12/1000
Epoch 13/1000
Epoch 14/1000
Epoch 15/1000
Epoch 16/1000
Epoch 17/1000
Epoch 18/1000
Epoch 19/1000
Epoch 20/1000
Epoch 21/1000
Epoch 22/1000
Epoch 23/1000
Epoch 24/1000
Epoch 25/1000
Epoch 26/1000
Epoch 27/1000
Epoch 28/1000
Epoch 29/1000
Epoch 30/1000
Epoch 31/1000
Epoch 32/1000
Epoch 33/1000
Epoch 34/1000
Epoch 35/1000
Epoch 36/1000
Epoch 37/1000
Epoch 38/1000
Epoch 39/1000
Epoch 40/1000
Epoch 41/1000
Epoch 41: early stopping


<keras.callbacks.History at 0x28665820190>

In [62]:
model.save(os.path.join(BASE_DIR, 'model', 'saved'))



INFO:tensorflow:Assets written to: c:\Users\Dell\Documents\Git\line_recognition\model\saved\assets


INFO:tensorflow:Assets written to: c:\Users\Dell\Documents\Git\line_recognition\model\saved\assets


In [63]:
my_model = load_model(os.path.join(BASE_DIR, 'model', 'saved'))

# 4. Predict

## 4.1. Predict heatmaps on test set

In [65]:
predicted_maps = model.predict(test_images)



## 4.2. Predict coordinates

In [66]:
def predict_coords(predicted_maps):
    predicted_coords = []
    for map_argmax in predicted_maps:
        thresh = threshold_otsu(map_argmax)
        binary_map = map_argmax > thresh
        labels = measure.label(binary_map)
        centroids = [np.round(prop.centroid).astype(int) for prop in measure.regionprops(labels)]

        if len(centroids) < 1:
            print("Warning: no clusters identified in the heatmap. Adding dummy coordinates.")
            centroids = [np.array([0, 0]), np.array([0, 0])]
        elif len(centroids) == 1:
            print("Warning: only one cluster identified in the heatmap. Duplicating coordinates.")
            centroids = [centroids[0], centroids[0]]
        elif len(centroids) > 2:
            areas = [prop.area for prop in measure.regionprops(labels)]
            centroids = [centroids[i] for i in np.argsort(areas)[-2:]]

        start_y = centroids[0][0]
        start_x = centroids[0][1]
        end_y = centroids[1][0]
        end_x = centroids[1][1]

        predicted_coords.append(np.array([start_x, start_y, end_x, end_y]))
    return predicted_coords

pred_coords = predict_coords(predicted_maps)



In [67]:
print(len(pred_coords))
print(len(test_labels))

999
999


## 4.3. Sort coordinates by x descending

In [68]:
def sort_by_x(coords):
    if coords[0] > coords[2]:
        coords = np.array([coords[2], coords[3], coords[0], coords[1]])
    return coords

sorted_pred_coords = [sort_by_x(coords) for coords in pred_coords]
sorted_test_coords = [sort_by_x(coords) for coords in test_labels]

In [69]:
def create_dataframe(pred_coords, test_labels):
    x_start_pred = [coord[0] for coord in pred_coords]
    y_start_pred = [coord[1] for coord in pred_coords]
    x_end_pred = [coord[2] for coord in pred_coords]
    y_end_pred = [coord[3] for coord in pred_coords]
    
    x_start_test = [coord[0] for coord in test_labels]
    y_start_test = [coord[1] for coord in test_labels]
    x_end_test = [coord[2] for coord in test_labels]
    y_end_test = [coord[3] for coord in test_labels]
    
    data = {'x_start_test': x_start_test, 'x_start_pred': x_start_pred, 'y_start_test': y_start_test, 'y_start_pred': y_start_pred, 
            'x_end_test': x_end_test, 'x_end_pred': x_end_pred, 'y_end_test': y_end_test, 'y_end_pred': y_end_pred}

    df = pd.DataFrame(data)

    return df
df = create_dataframe(sorted_pred_coords, sorted_test_coords)

In [70]:
df.head()

Unnamed: 0,x_start_test,x_start_pred,y_start_test,y_start_pred,x_end_test,x_end_pred,y_end_test,y_end_pred
0,22,22,33,33,61,61,55,55
1,26,26,14,14,61,61,55,55
2,58,58,19,19,61,62,55,55
3,58,58,62,62,61,62,55,55
4,23,23,29,29,61,61,56,56


# 5. Evaluate

In [72]:
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error, r2_score

def compute_mae(predicted_coords, true_coords):
    mae_values = []
    for pred, true in zip(predicted_coords, true_coords):
        mae_values.append(mean_absolute_error(true, pred))
    return np.mean(mae_values)

mae = compute_mae(sorted_pred_coords, test_labels)
mse = mean_squared_error(sorted_test_coords, sorted_pred_coords)
r2 = r2_score(sorted_test_coords, sorted_pred_coords)
print(f"MAE: {mae:.2f}")
print(f"MSE: {mse:.2f}")
print(f"R2: {r2:.2f}")

MAE: 9.63
MSE: 20.31
R2: 0.94
