In [None]:
import rasterio
import os
# from rasterio.plot import show
from rasterio.windows import Window
import cv2
import gdal
import numpy as np
from matplotlib import pyplot as plt
import torch
import argparse
import time
import json
from pathlib import Path
import torch.backends.cudnn as cudnn
from numpy import random
from icecream import ic

# import sys
# sys.path.append('yolov7')


from models.experimental import attempt_load
from utils.datasets import LoadStreams, LoadImages
from utils.general import check_img_size, check_requirements, check_imshow, non_max_suppression, apply_classifier, \
    scale_coords, xyxy2xywh, strip_optimizer, set_logging, increment_path
from utils.plots import plot_one_box
from utils.torch_utils import select_device, load_classifier, time_synchronized, TracedModel

In [None]:
class detect_piles():
  def __init__(self, weights):
    self.device = select_device('')

    # load FP32 model
    self.model = attempt_load(weights, map_location=self.device)
    print("Model Loaded")


  def detect(self, source_path, pred_saving_path, imgsize = 640, view_img_flag = False, save_img=False, trace_flag = False):

    all_boxes = []

    conf_thres = 0.5
    iou_thres = 0.45
    classes = None
    agnostic_nms = False
    save_conf = False
    save_txt = True
    nosave = False
    save_dir = pred_saving_path

    source, view_img, imgsz, trace = source_path, view_img_flag, imgsize, trace_flag
    
    save_img = not nosave and not source.endswith('.txt')
    # webcam = source.isnumeric() or source.endswith('.txt') or source.lower().startswith(('rtsp://', 'rtmp://', 'http://', 'https://'))
    webcam = False
    set_logging()
    half = self.device.type != 'cpu'

      
    stride = int(self.model.stride.max())  # model stride
    imgsz = check_img_size(imgsz, s=stride) 
    img_size = 640
    
    if trace:
      self.model = TracedModel(self.model, self.device, img_size)
    if half:
      self.model.half()

    classify = False

    if classify:
      modelc = load_classifier(name='resnet101', n=2)  # initialize
      modelc.load_state_dict(torch.load('weights/resnet101.pt', map_location=self.device)['model']).to(self.device).eval()

    vid_path, vid_writer = None, None

    if webcam:
      view_img = check_imshow()
      cudnn.benchmark = True  # set True to speed up constant image size inference
      dataset = LoadStreams(source, img_size=imgsz, stride=stride)

    else:
      dataset = LoadImages(source, img_size=imgsz, stride=stride)
    
    names = self.model.module.names if hasattr(self.model, 'module') else self.model.names
    # colors = [[random.randint(0, 255) for _ in range(3)] for _ in names]
    colors = [[0,0,255]]

    if self.device.type != 'cpu':
      self.model(torch.zeros(1, 3, imgsz, imgsz).to(self.device).type_as(next(self.model.parameters())))  # run once
    old_img_w = old_img_h = imgsz
    old_img_b = 1
    augment = False
    t0 = time.time()

    for path, img, im0s, vid_cap in dataset:
      img = torch.from_numpy(img).to(self.device)
      img = img.half() if half else img.float()  # uint8 to fp16/32
      img /= 255.0  # 0 - 255 to 0.0 - 1.0

      if img.ndimension() == 3:
        img = img.unsqueeze(0)

      if self.device.type != 'cpu' and (old_img_b != img.shape[0] or old_img_h != img.shape[2] or old_img_w != img.shape[3]):
        old_img_b = img.shape[0]
        old_img_h = img.shape[2]
        old_img_w = img.shape[3]

        for i in range(3):
          self.model(img, augment=augment)[0]

      t1 = time_synchronized()

      with torch.no_grad():     # Calculating gradients would cause a GPU memory leak
        pred = self.model(img, augment=augment)[0]
      t2 = time_synchronized()
      pred = non_max_suppression(pred, conf_thres, iou_thres, classes=classes, agnostic=agnostic_nms)
      t3 = time_synchronized()

      if classify:
        pred = apply_classifier(pred, modelc, img, im0s)

      for i, det in enumerate(pred):
        if webcam:
          p, s, im0, frame = path[i], '%g: ' % i, im0s[i].copy(), dataset.count
        else:
          p, s, im0, frame = path, '', im0s, getattr(dataset, 'frame', 0)
        p = Path(p)
        save_path = save_dir + "/" + p.name
        # txt_path = str(save_dir + "/" + p.stem) + ('' if dataset.mode == 'image' else f'_{frame}')
        gn = torch.tensor(im0.shape)[[1, 0, 1, 0]]  # normalization gain whwh
        if len(det):
          det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
          for c in det[:, -1].unique():
            n = (det[:, -1] == c).sum()
            s += f"{n} {names[int(c)]}{'s' * (n > 1)}, "
          for *xyxy, conf, cls in reversed(det):
            if save_txt:
              xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist()
              all_boxes.append([xywh[0], xywh[1], xywh[2], xywh[3]])
              line = (cls, *xywh, conf) if save_conf else (cls, *xywh)
              # with open(txt_path + '.txt', 'a') as f:
              #   f.write(('%g ' * len(line)).rstrip() % line + '\n')

            if save_img or view_img:
              label = f'{names[int(cls)]} {conf:.2f}'
              plot_one_box(xyxy, im0, label=label, color=colors[int(cls)], line_thickness=1)
        # print(f'{s}Done. ({(1E3 * (t2 - t1)):.1f}ms) Inference, ({(1E3 * (t3 - t2)):.1f}ms) NMS')
        if view_img:
          cv2.imshow(str(p), im0)
          cv2.waitKey(1)
        ww, hh = im0.shape[1], im0.shape[0]  
        if save_img:
          if dataset.mode == 'image':
            cv2.imwrite(save_path, im0)
            print(f" The image with the result is saved in: {save_path}")
          else: 
            if vid_path != save_path:
              vid_path = save_path
              if isinstance(vid_writer, cv2.VideoWriter):
                vid_writer.release()
              if vid_cap:
                fps = vid_cap.get(cv2.CAP_PROP_FPS)
                w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
              else: 
                fps, w, h = 30, im0.shape[1], im0.shape[0]
                save_path += '.mp4'
              vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
            vid_writer.write(im0)  
    return all_boxes, ww, hh

In [None]:
class get_predictions():
  def __init__(self, w_path):
    self.pile_class = detect_piles(w_path)

  def convert_img_array(self, data):
    data = torch.from_numpy(data)
    data=data.permute(1, 2, 0)
    data = data.numpy()
    data = cv2.cvtColor(data, cv2.COLOR_BGR2RGB)
    return data

  def yolo2bbox(self, x, y, w, h, img_height, img_width):
    x = float(x)
    y = float(y)
    w = float(w)
    h = float(h)
    img_height = float(img_height)
    img_width = float(img_width)
    height = h * img_height
    width = w * img_width
    left = (x * img_width) - (width // 2)
    top = (y * img_height) - (height // 2)
    return round(top), round(left), round(height), round(width)

  def resetXY(self, list_of_box, img_h, img_w, org_x, org_y):
    convert_x_y = []
    for i in list_of_box:
      y,x,h,w = self.yolo2bbox(i[0], i[1], i[2], i[3], img_h, img_w)
      convert_x_y.append([x+org_x,y+org_y,w,h])
    return convert_x_y


  def generate_points(self, cor_list):
    cnt = 0
    final_data = []
    for i in cor_list:
      tmp = {"id": "",
            "coord": ""}
      x,y,w,h = i
      p = [x,y]
      sp = [x+w, y]
      tp = [x+w, y+h]
      fp = [x, y+h]
      tmp["id"] = cnt
      tmp["coord"] = [p, sp, tp, fp]
      final_data.append(tmp)
      cnt += 1
    return final_data


  def convertUTM(self, file_path, list_of_coord):
    ds = gdal.Open(file_path)

    cnt = 0
    final_data = []
    gt = ds.GetGeoTransform()
    c, a, b, f, d, e = gt
    for i in list_of_coord:
      tmp_list = []
      tmp = {"id": "",
            "coord": ""}
      for k in i["coord"]:
        x, y = k[0], k[1]
        east = c+(a*x)
        north = f+(e*y)
        tmp_list.append([east, north])
      tmp["id"], tmp["coord"] = cnt, tmp_list
      final_data.append(tmp)
      cnt += 1
    return final_data


  def generate_crops(self, file_path, pred_path, json_save_path, tmp_folder, crop_size=2500):

    if not os.path.exists(json_save_path):
      print("Not Found GeoJson Saving Path.. \n Creating GeoJson File Saving Path...")
      os.makedirs(json_save_path)

    if not os.path.exists(tmp_folder):
      print("not found tmp folder.. \n creating tmp folder...")
      os.makedirs(tmp_folder)

    if not os.path.exists(pred_path):
      print("not found directory for saving pedicted images.. \n creating directory...")
      os.makedirs(pred_path)

    

    file_name = file_path.split("/")[-1].split(".")[0]

    geoJson = {
          "type": "FeatureCollection",
          "features": []
        }

    id = 0


    img = rasterio.open(file_path)
    profile = img.profile

    width = profile["width"]
    height = profile["height"]

    print("Total Width = ", width, "Total Height = ", height)

    width_cut = crop_size
    height_cut = crop_size
    step = crop_size
    cnt_ = 0
    

    for col in range(0, height, step):
      if col + step >= height:
        height_cut = height - col

      for row in range(0, width, step):

        if row+width_cut >= width:
          final_cut = width - row
          data = img.read([1,2,3], window=Window(row,col, final_cut, height_cut))
          final_img = self.convert_img_array(data)
          cv2.imwrite(f'{tmp_folder}/img_{cnt_}.png', final_img)
          sam = f'{tmp_folder}/img_{cnt_}.png'
          pred_boxes, img_w, img_h = self.pile_class.detect(sam, pred_path)
          reset_cord = self.resetXY(pred_boxes, img_h, img_w, row, col)
          generate_cord = self.generate_points(reset_cord)
          converted_cord = self.convertUTM(file_path, generate_cord)
          if len(converted_cord) > 0:
            # ic(len(converted_cord))
            for i in converted_cord:
              tmp = {"id": "",
                    "coordinates": ""}
              tmp["id"] = id
              tmp["coordinates"] = i["coord"]
              geoJson["features"].append(tmp)
              id += 1


          os.remove(sam)
          cnt_ += 1

        else:
          data = img.read([1,2,3], window=Window(row,col, width_cut, height_cut))
          final_img = self.convert_img_array(data)
          cv2.imwrite(f'{tmp_folder}/img_{cnt_}.png', final_img)
          sam = f'{tmp_folder}/img_{cnt_}.png'
          pred_boxes, img_w, img_h = self.pile_class.detect(sam, pred_path)
          reset_cord = self.resetXY(pred_boxes, img_h, img_w, row, col)
          generate_cord = self.generate_points(reset_cord)
          converted_cord = self.convertUTM(file_path, generate_cord)
          if len(converted_cord) > 0:
            for i in converted_cord:
              tmp = {"id": "",
                    "coordinates": ""}
              tmp["id"] = id
              tmp["coordinates"] = i["coord"]
              geoJson["features"].append(tmp)
              id += 1


          os.remove(sam)
          cnt_ += 1
    
    with open(f"{json_save_path}/{file_name}.geojson", "a") as outfile:
      json.dump(geoJson, outfile)


In [None]:
tiff_file1 = "tiff_files/NamarDaharat2_Orthomosaic_export_TueAug23092831065915.tif"
weights_path = "best.pt"

save_pred_path = "free_test/yolo_detected"
json_saving_path  = "free_test/generated_geo_files"
tmp_file_path = "free_test/tmp" # use for Store images which will go to yolo model for prediction and then remove it

In [None]:
obj = get_predictions(weights_path)


Fusing layers... 
RepConv.fuse_repvgg_block
RepConv.fuse_repvgg_block
RepConv.fuse_repvgg_block
IDetect.fuse
Model Loaded


In [None]:
obj.generate_crops(tiff_file1, save_pred_path, json_saving_path, tmp_file_path)