# Documentation
In this notebook, we provide all the code required to create the Railway recognition pipeline, which you can explore in the accompanying demo notebook located in this folder. This document serves as a comprehensive reference, presenting methods and their brief documentation to offer insights into our development process. Please note that this file does not contain any logical sequence for execution, and running the code within it will not yield meaningful results. Its sole purpose is documentation.

# All Imports

In [None]:
# Install necessary packages
!pip install mmcv-full==1.7.0 -f https://download.openmmlab.com/mmcv/dist/cu117/torch1.13.0/index.html
!pip install mmdet==2.26.0
!pip install detectron2 -f https://dl.fbaipublicfiles.com/detectron2/wheels/cu113/torch1.10/index.html
!pip install transformers timm
!pip install mmdet
!pip install -U openmim
!pip install -U torch sahi
!pip install ultralytics==8.0.202
!pip install tqdm --upgrade

# Standard library imports
import os
from tqdm.notebook import tqdm  # Progress bar for loops
import PIL  # Python Imaging Library for image processing

# Custom package imports
from sahi.utils.yolov8 import download_yolov8s_model  # YOLOv8 model download utility
from sahi import AutoDetectionModel  # Auto Detection Model from Sahi
from sahi.utils.cv import read_image  # Read image utility
from sahi.utils.file import download_from_url  # Download file from URL utility
from sahi.predict import get_prediction, get_sliced_prediction, predict  # Prediction utilities
from IPython.display import Image  # Display images in Jupyter Notebooks

# Image processing imports
from PIL import ImageEnhance, ImageFilter  # Image enhancement and filtering
from IPython import display  # Display utilities

# Deep learning framework and computer vision libraries
from ultralytics import YOLO  # YOLO object detection from Ultralytics
import shutil  # File operations utility
import random  # Random number generation
import cv2  # OpenCV for computer vision tasks
import math  # Mathematical functions

# Google Colab specific imports
from google.colab import drive  # Google Drive integration
import pandas as pd  # Data manipulation library
import numpy as np  # Numerical computing library

# Set the current working directory and print it
HOME = os.getcwd()
print(HOME)

# Data preprocessing


## Resizing


In [None]:
# As input takes folder with full size images, and destination where to export data
def resizeImags(path,destin):
  for n in tqdm(os.listdir(path)):
    print(n[-4:]=='.tif')
    if n[-4:] == ".tif":
      img = cv2.imread(path+ n)

      print('Original Dimensions : ',img.shape)
      scale_percent = 20 # percent of original size
      width = int(img.shape[1] * scale_percent / 100)
      height = int(img.shape[0] * scale_percent / 100)
      dim = (width, height)
      # resize image
      resized = cv2.resize(img, dim, interpolation = cv2.INTER_AREA)
      print('Resized Dimensions : ',resized.shape)
      cv2.imwrite(destin+n, resized)

## Pairing labels with images

In [None]:
#takes folder with annotations and folder with images
def pairImages(annot,destin):
  goodlist = []
  for anno in tqdm(os.listdir(annot)):

    anno = anno[4:]
    print(anno)
    newname = anno[:-4]+".tif"
    if newname in os.listdir(destin):
      goodlist.append(anno[:-4]+".tif")
      img = cv2.imread(destin+newname)
      cv2.imwrite(annot+newname, img)

## Splitting Train/Validation

In [None]:
# Define folder paths for datasets
train_path_img ="/content/drive/MyDrive/Railway/correct_stuff/augmented/train/"
train_path_label = "/content/drive/MyDrive/Railway/correct_stuff/augmented/labels/train/"
val_path_img = "/content/drive/MyDrive/Railway/correct_stuff/augmented/images/val/"
val_path_label = "/content/drive/MyDrive/Railway/correct_stuff/augmented/labels/val/"
test_path = "/content/drive/MyDrive/Railway/correct_stuff/augmented/test"

In [None]:
def train_test_split(path,neg_path=None, split = 0.2):
    print("------ PROCESS STARTED -------")


    files = list(set([name[:-4] for name in os.listdir(path)])) ## removing duplicate names i.e. counting only number of images



    print(files)
    print (f"--- This folder has a total number of {len(files)} images---")
    random.seed(42)
    random.shuffle(files)

    test_size = int(len(files) * split)
    train_size = len(files) - test_size

    ## creating required directories

    os.makedirs(train_path_img, exist_ok = True)
    os.makedirs(train_path_label, exist_ok = True)
    os.makedirs(val_path_img, exist_ok = True)
    os.makedirs(val_path_label, exist_ok = True)


    ### ----------- copying images to train folder
    for filex in tqdm(files[:train_size]):
      print(filex)
      if filex == 'classes':
          continue
      shutil.copy2(path + filex + '.tif',f"{train_path_img}/" + filex + '.tif' )
      shutil.copy2(path + filex + '.txt', f"{train_path_label}/" + filex + '.txt')



    print(f"------ Training data created with 80% split {len(files[:train_size])} images -------")

    if neg_path:
        neg_images = list(set([name[:-4] for name in os.listdir(neg_path)])) ## removing duplicate names i.e. counting only number of images
        for filex in tqdm(neg_images):
            shutil.copy2(neg_path+filex+ ".jpg", f"{train_path_img}/" + filex + '.jpg')

        print(f"------ Total  {len(neg_images)} negative images added to the training data -------")

        print(f"------ TOTAL Training data created with {len(files[:train_size]) + len(neg_images)} images -------")



    ### copytin images to validation folder
    for filex in tqdm(files[train_size:]):
      if filex == 'classes':
          continue
      # print("running")
      shutil.copy2(path + filex + '.tif', f"{val_path_img}/" + filex + '.tif' )
      shutil.copy2(path + filex + '.txt', f"{val_path_label}/" + filex + '.txt')

    print(f"------ Testing data created with a total of {len(files[train_size:])} images ----------")

    print("------ TASK COMPLETED -------")


## Augmentation

### Images reflection

In [None]:
# Mirror the boundry boxes
def mirror_labels(array):
  return [array[0], 1 - float(array[1]), float(array[2]), float(array[3]), float(array[4])]

def mirror(filename, read_from, save_to, test):

  file_path = os.path.join(read_from, filename)
  image = cv2.imread(file_path)


  # File the image and save it
  mirrored_image = cv2.flip(image, 1)
  mirrored_image_path = os.path.join(save_to, "mirrored_" + filename)
  cv2.imwrite(mirrored_image_path, mirrored_image)


  output = []


  with open(file_path[:-4] + '.txt') as file:
    for line in file:
      words = line.split()

      # mirror all of the boundry boxes
      new_labels = mirror_labels(words)


      for word in new_labels:
        output.append(word)

  # Save the boundry boxes
  with open(save_to + 'mirrored_' + filename[:-4] + '.txt', "w") as file:
      for index, item in enumerate(output, start=1):
        file.write(str(item) + ' ')
        if index % 5 == 0 and index != 0 and index != len(output):
          file.write("\n")


### Brightness, Contrast, Gaussian Blur augmentation

In [None]:
def augmentData(data, destination):
    images = list(set([name[:-4] for name in os.listdir(data)]))
    images = images[89:]
    for i in tqdm(images):
        # Open the original image
        original_image = Image.open(f"{data}/{i}.tif")

        # Save the original image and text file in the new folder
        original_image.save(f"{destination}/original_{i}.tif")
        shutil.copy2(f"{data}/{i}.txt", f"{destination}/original_{i}.txt")

        # Brightness augmentation
        for brightness_factor in [0.5, 1.5]:
            brightened_image = ImageEnhance.Brightness(original_image).enhance(brightness_factor)
            brightened_image.save(f"{destination}/bright_{brightness_factor}_{i}.tif")
            shutil.copy2(f"{data}/{i}.txt", f"{destination}/bright_{brightness_factor}_{i}.txt")

        # Contrast augmentation
        for contrast_factor in [0.5, 1.5]:
            contrasted_image = ImageEnhance.Contrast(original_image).enhance(contrast_factor)
            contrasted_image.save(f"{destination}/contrast_{contrast_factor}_{i}.tif")
            shutil.copy2(f"{data}/{i}.txt", f"{destination}/contrast_{contrast_factor}_{i}.txt")

        # Gaussian Blur augmentation
        for blur_radius in [1, 2, 3]:
            blurred_image = original_image.filter(ImageFilter.GaussianBlur(blur_radius))
            blurred_image.save(f"{destination}/blur_{blur_radius}_{i}.tif")
            shutil.copy2(f"{data}/{i}.txt", f"{destination}/blur_{blur_radius}_{i}.txt")
    print('---Data Augmented---')


## Cropping training tracks

In [None]:


#gives back list of predictions, takes as argumetns path to flder with images and path to yolo model
def predictAndExport(folder,path):
  model = YOLO(path)
  results = []

  for n in tqdm(os.listdir(folder)):
      if n[-4:] == ".tif" :
        objectPred = {}
        img = Image.open(folder+"/"+ n)
        print(n)
        result = model.predict(img,imgsz=1024, conf=0.5)
        result = result[0]
        objectPred['name'] = n
        if len(result.boxes.cls.cpu().numpy()) == 0:
          pass
        else:
          objectPred['label'] = int(math.floor(result.boxes.cls.cpu().numpy()[0]))
          objectPred['conf'] = result.boxes.conf.cpu().numpy()[0]
          objectPred['bBox0'] = result.boxes.xywhn.cpu().numpy()[0][0]
          objectPred['bBox1'] = result.boxes.xywhn.cpu().numpy()[0][2]
          print(objectPred)
          results.append(objectPred)
  return results




### Writes a csv with the predictions

In [None]:
def writeACSV(destin,data,filename):
  columns = ["name", "conf", "label", "bBox0","bBox1"]
  with open(destin+filename+".csv", mode="w", newline="") as file:
    writer = csv.DictWriter(file, fieldnames=columns)
    # Write the header
    writer.writeheader()
    # Write the data rows
    for row in tqdm(data):
        print(row)
        writer.writerow(row)

## Actual cropping

In [None]:
def crop_images(annots, imagesDir, outputfolderdir):

  df = pd.read_csv(annots)

  df = df.tail(-1)

  df = df
  listimages = os.listdir(imagesDir+"/")
  print(listimages)
  for row in tqdm(df.iterrows()):

    row = row[1]
    if row["name"] in listimages:
      print(row["name"])

      image = cv2.imread(imagesDir + '/' + str(row['name']))


      cropped_image = crop_image(row['bBox0'], row['bBox1'] ,image)
      #print(cropped_image)
      print(imagesDir + '/' + str(row['name']))


      try:
        cv2.imwrite(outputfolderdir + 'cropped_' + str(row['name']), cropped_image)
      except:
        print(cropped_image)


def crop_image(bBox0, bBox1,  image):

  bBox0 = bBox0
  # Adds padding to the cropping
  bBox1 = bBox1 * 1.2


  # Get the image dimensions
  height, width, _ = image.shape

  # Calculate the bounding box coordinates in pixel values

  x1 = int((bBox0 - bBox1/2)*width ) if x1>= 0 else 0
  x2 = int((bBox0 + bBox1/2)*width) if x2 <= width else width

  # Crop the image
  cropped_image = image[0:height, x1:x2]
  return cropped_image


# Training

In [None]:
# Example training on existing model
model = ultralytics.YOLO('yolov8n.pt').load("/content/drive/MyDrive/Railway/modelv2_26/modelAugmentations/weights/last.pt")
model.train( resume=True ,data='/content/drive/MyDrive/Railway/correct_stuff/dataset.yaml', epochs=25, imgsz=1024, device = 0, workers = 0)

In [None]:
# Example training wiht Yolov8 CLI
!yolo task=detect mode=train model=yolov8n.pt data=/content/drive/MyDrive/Railway/BetterStationsIMG/dataset.yaml epochs=25 imgsz=1024 batch=8 project=/content/drive/MyDrive/Railway/bettermodel/ name=trainsmodelOutput

# Predicting

In [None]:
#method for predicting images from a folder using Slicing aided hyper inference
def SahiPredictionExport(test_path,model_path,export_dir):
    files = []
    for n in tqdm(os.listdir(test_path)):
      files.append(test_path + n)

    detection_model = AutoDetectionModel.from_pretrained(
        model_type="yolov8",
        model_path=model_path,
        confidence_threshold=0.1,
        device= 'cuda:0'
    )

    n = 0
    for a in files:
      result = get_sliced_prediction(a, detection_model)

      print(result)
      result.export_visuals(export_dir=export_dir,file_name=f"predicted_image_{n}")
      n= n+1

# Evaluation

In [None]:
#code that evaluates the performance of the model on the validation dataset
model = YOLO(modelPath)  # load a custom model

# Validate the model
metrics = model.val(imgsz=1024, save_json=True)  # no arguments needed, dataset and settings remembered
print(metrics.box.map)    # map50-95
print(metrics.box.map50)  # map50
print(metrics.box.map75)  # map75
print(metrics.box.maps)

# OCR

In [None]:

# Creating OCR Reader for English Language
reader = easyocr.Reader(['en'])

def recognize_sign_number(image_path):
  # Make the picture bigger
  new_image = cv2.resize(image_path, (800, 800))
  # apply ocr method from easyocr library, allowing only numbers from 0 to 9
  ocr_result = reader.readtext(new_image, allowlist='0123456789')

  if ocr_result == []:
    return 'no result'

  return str(ocr_result[0][1])

# Triangulation

####Open and read the location data and the camera settings

In [None]:

def getLocation(file):

  return  pd.read_csv(file)

def readSettings(file):
  cam_settings = {}
  for i in range(len(os.listdir(file))):

      # print(os.listdir(folder)[i])

      if os.listdir(file)[i].endswith('.txt'):
        # print(os.listdir(file)[i][:-4])
        f = open(os.path.join(file, os.listdir(file)[i]))
        settings = f.readlines()
        foldname = settings[0][19:24]
        print(foldname)
        # print(settings)
        folds = {}
        for i in settings:
            if i.startswith('-'):
                # print(type(i))

                split = i[2:-1].split(':')
                folds[split[0].split(' ')[1]] = split[1][1:]


        cam_settings[foldname] = folds

  return cam_settings



###Functions

In [None]:
# Returns the rotation matrix
def get_rot_mat(omg, phi, kap):
    # Rotations, note: minus sign!
    omg = -np.deg2rad(omg)
    phi = -np.deg2rad(phi)
    kap = -np.deg2rad(kap)

    Rx = np.array([[1,0,0],[0, np.cos(omg), -np.sin(omg)],[0, np.sin(omg), np.cos(omg)]])
    Ry = np.array([[np.cos(phi), 0, np.sin(phi)],[0,1,0],[-np.sin(phi), 0, np.cos(phi)]])
    Rz = np.array([[np.cos(kap), -np.sin(kap), 0],[np.sin(kap), np.cos(kap), 0], [0,0,1]])
    return Rz@Ry@Rx



# Returns the intrinsic matrix
def get_int_mat(f, img_w, img_h):
    f = (f/10)/100 # focal length in m
    pix_size = 3.76e-6 # Physical size of a pixel on the sensor in m
    fc = f/pix_size # Normalised, dimensionless focal length

    # Image offsets
    u0, v0 = (img_w/2), (img_h/2)

    # Construct intrinsic matrix
    K = np.array([[-fc, 0, u0, 0],[0, fc, v0, 0],[0, 0, 1, 0]])

    return K


# Returns the camera matrix
def get_cam_mat(R, cp, K):
    M = np.eye(4)

    M[0:3,0:3] = R
    M[0:3,3] = -R@cp[:,0]
    return K@M

# Returns the world position based on corresponding pixel pairs and the two camera matrices
def get_pos(pp1, pp2, P1, P2):
    A = np.zeros((4,4))
    A[0,:] = pp1[1]*P1[2,:] - P1[1,:]
    A[1,:] = P1[0,:] - pp1[0]*P1[2,:]
    A[2,:] = pp2[1]*P2[2,:] - P2[1,:]
    A[3,:] = P2[0,:] - pp2[0]*P2[2,:]
    U, S, Vh = np.linalg.svd(A, full_matrices=True)
    pos = Vh[3,:]/Vh[3,3]
    return pos[:-1]



# creates the bounding boxes from the txt files obtained from model
def BoundingBox(dic):

  coorList = []


  for prediction in dic:
    # print(prediction)
    nums = prediction['bbox']
    label = prediction['category_name']

    coordinates = np.zeros((5,2))
    coordinates[0,:] = [(nums[0]+(nums[2]/2)), (nums[1]+(nums[3]/2))]#center
    coordinates[1,:] = [(nums[0]+nums[2]), (nums[2])]#top left corner
    coordinates[2,:] = [(nums[0]+(nums[2])), (nums[2]+(nums[3]))]#top right corner
    coordinates[3,:] = [(nums[0]), (nums[1])]#bottom left corner
    coordinates[4,:] = [(nums[0]), (nums[1]+(nums[3]))]#bottom right corner
    coorList.append([coordinates,label])

  return coorList


# for more info about this algorithm, check https://docs.opencv.org/3.4/d1/de0/tutorial_py_feature_homography.html
def find_matching_points(image1, image2):
    # Load images
    img1 = cv2.imread(image1, cv2.IMREAD_GRAYSCALE)
    img2 = cv2.imread(image2, cv2.IMREAD_GRAYSCALE)

    #Here, you can also used SIFT, but ORB has less computational strain
    orb = cv2.ORB_create()

    # Detect keypoints and descriptors
    kp1, des1 = orb.detectAndCompute(img1, None)
    kp2, des2 = orb.detectAndCompute(img2, None)

    # here, FLANN based matcher can be used to make it faster, however BFmatcher returns the best results
    # Initialize Brute Force Matcher
    bf = cv2.BFMatcher(cv2.NORM_HAMMING)


    matches = bf.knnMatch(des1, des2, k=2)
    # print(matches)

    # Apply ratio test
    good_matches = []
    for m, n in matches:
        if m.distance < 0.75 * n.distance:
            good_matches.append(m)

    # Get matching points that scored high on ratio test
    pts1 = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1,2)
    pts2 = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1,2)

    # Find homography using RANSAC
    H, _ = cv2.findHomography(pts1, pts2, cv2.RANSAC)

    return H


# matches the centers of the bounding boxes of detected objects for triangulation
def find_matching_boxes(predictions1, predictions2,H):
    predictions1 = BoundingBox(predictions1)
    predictions2 = BoundingBox(predictions2)


    distances = np.zeros((len(predictions1), len(predictions2)))


    for i in range(len(predictions1)):
        for j in range(len(predictions2)):
          if predictions1[i][1] == predictions2[j][1]:

            distances[i, j] = math.dist(predictions2[j][0][0],cv2.perspectiveTransform(predictions1[i][0][0].reshape(-1,1,2),H).reshape(2,))
    # print(distances)
    matches = []
    try:
        for i in range(len(predictions1)):
          sorted = np.argsort(distances[i])
          match_index = None
          if (distances[i][sorted[0]]) == 0 and (len(sorted) == 1):
            continue
          elif (distances[i][sorted[0]]) == 0 and (len(sorted) != 1):
            match_index = sorted[1]
          elif distances[i][sorted[0]]!= 0:
            match_index = sorted[0]
          matches.append((i, match_index, predictions1[i][0][0], predictions2[i][0][0], distances[i][match_index]))


        for i1, m1 in enumerate(matches):
          for i2, m2 in enumerate(matches):
            if (m1[1] == m2[1]) and (i2 != i1):
              if m1[4] < m2[4]:
                matches.remove(i1)
              elif m2[4] < m1[4]:
                matches.remove(i2)
    except:
      pass


    return matches

# gets real world position after finding the pairs
def get_pos_matching_boxes(boxpair, im1, im2, positions, camSet):
  P = dict()

  i1 = positions[positions['Filename']== im1[-20:-4]].index[0]
  i2=  positions[positions['Filename']== im2[-20:-4]].index[0]

  print(camSet)
  print(im1)
  print(im1[-20:-15])

  #getting the relevant info for constructing the camera matrices
  R= get_rot_mat(positions.loc[i1]['Omega'],positions.loc[i1]['Phi'],positions.loc[i1]['Kappa'])
  cp = np.array([[positions.loc[i1]['X/Long'],positions.loc[i1]['Y/Lat'], positions.loc[i1]['Z']]]).T
  K =  get_int_mat(int(float(camSet.get(im1[-20:-15]).get('focal'))),int(float(camSet.get(im1[-20:-15]).get('width'))),int(float(camSet.get(im1[-20:-15]).get('height'))))
  P[im1] = get_cam_mat(R,cp,K)

  R= get_rot_mat(positions.loc[i2]['Omega'],positions.loc[i2]['Phi'],positions.loc[i2]['Kappa'])
  cp = np.array([[positions.loc[i2]['X/Long'],positions.loc[i2]['Y/Lat'], positions.loc[i2]['Z']]]).T
  K =  get_int_mat(int(float(camSet,get(im2[-20:-15]).get('focal'))),int(float(camSet.get(im2[-20:-15]).get('width'))),int(float(camSet.get(im2[-20:-15]).get('height'))))
  P[im2] = get_cam_mat(R,cp,K)



  npoints = len(boxpair)
  wkast = np.zeros((npoints,3))
  final = []

  for i in range(npoints):
      wkast[i,:] = get_pos(boxpair[i][2], boxpair[i][2], P[im1], P[im2])
      #add RD coordinates and index of which box it matches in image2
      final.append([wkast[i,0], wkast[i,1], boxpair[i][1]])
      # print(get_pos(boxpair[i][2], boxpair[i][2], P[im1], P[im2])) #uncomment to see the coordinates printed out

  return final


def geoloc (image1, image2, prediction1, prediction2, path):
    # positions, camSet = OpenImages(path)# path to be changed
    # print(os.path.join(path, 'positions.csv'))
    positions = getLocation(path)

    camSet = readSettings(path)


    H = find_matching_points(image1, image2)
    boxpair = find_matching_boxes(prediction1, prediction2,H)
    results = get_pos_matching_boxes(boxpair, image1, image2,positions,camSet)

    return results

