In [None]:
#! pip install timm==0.4.5 transformers==4.6
#! pip install matplotlib scikit-image
#! pip install opencv-python


In [None]:
import torch
print(torch.cuda.is_available())
from PIL import Image
import requests
import torchvision.transforms as T
import matplotlib.pyplot as plt
from collections import defaultdict
import torch.nn.functional as F
import numpy as np
from skimage.measure import find_contours

from matplotlib import patches,  lines
from matplotlib.patches import Polygon

torch.set_grad_enabled(False)

In [None]:
# standard PyTorch mean-std input image normalization
transform = T.Compose([
    T.Resize(800),
    T.ToTensor(),
    T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# for output bounding box post-processing
def box_cxcywh_to_xyxy(x):
    x_c, y_c, w, h = x.unbind(1)
    b = [(x_c - 0.5 * w), (y_c - 0.5 * h),
         (x_c + 0.5 * w), (y_c + 0.5 * h)]
    return torch.stack(b, dim=1)

def rescale_bboxes(out_bbox, size):
    img_w, img_h = size
    b = box_cxcywh_to_xyxy(out_bbox)
    b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32)
    return b

In [None]:
# colors for visualization
COLORS = [[0.000, 0.447, 0.741], [0.850, 0.325, 0.098], [0.929, 0.694, 0.125],
          [0.494, 0.184, 0.556], [0.466, 0.674, 0.188], [0.301, 0.745, 0.933]]

def apply_mask(image, mask, color, alpha=0.5):
    """Apply the given mask to the image.
    """
    for c in range(3):
        image[:, :, c] = np.where(mask == 1,
                                  image[:, :, c] *
                                  (1 - alpha) + alpha * color[c] * 255,
                                  image[:, :, c])
    return image

import cv2

def add_rectangle_on_frame(frame, coordinates, title):
    """_summary_

    Args:
        frame (_type_): _description_
        coordinates (_type_): in x0, y0, w, h 
        text (_type_): _description_

    Returns:
        _type_: _description_
    """
    # Add a rectangle to the frame
    rectangle_color = (0, 255, 0)  # Green color
    rectangle_thickness = 2
    coordinates = [int(i) for i in coordinates]
    cv2.rectangle(frame, (coordinates[0], coordinates[1]), (coordinates[0]+coordinates[2], coordinates[1]+coordinates[3]), rectangle_color, rectangle_thickness)

    # Add a title to the frame
    
    """title_position = (coordinates[0], coordinates[1]-2)
    title_font = cv2.FONT_HERSHEY_SIMPLEX
    title_font_scale = 1
    title_color = (0, 255, 0)  # Green color
    cv2.putText(frame, title, title_position, title_font, title_font_scale, title_color, rectangle_thickness, cv2.LINE_AA)
    plt.imshow(frame)
    plt.axis('off')
    plt.show()"""
    return frame 
  
def plot_results2(pil_img, scores, boxes, labels, masks=None):
    
    frame = np.array(pil_img)
    colors = COLORS * 100
    if masks is None:
      masks = [None for _ in range(len(scores))]
    for s, (xmin, ymin, xmax, ymax), l, mask, c in zip(scores, boxes.tolist(), labels, masks, colors):
        coordinates = [xmin, ymin, -xmin+xmax, -ymin+ymax]    
        frame= add_rectangle_on_frame(frame, coordinates, ": ")
    #for pig_id,pig in enumerate(boxes):
        
   
    plt.figure(figsize=(16,10))
    plt.imshow(frame)
    plt.axis('off')
    plt.show()
    
def plot_results(pil_img, scores, boxes, labels, masks=None):
    plt.figure(figsize=(16,10))
    np_image = np.array(pil_img)
    ax = plt.gca()
    colors = COLORS * 100
    if masks is None:
      masks = [None for _ in range(len(scores))]
    assert len(scores) == len(boxes) == len(labels) == len(masks)
    for s, (xmin, ymin, xmax, ymax), l, mask, c in zip(scores, boxes.tolist(), labels, masks, colors):
        ax.add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                   fill=False, color=c, linewidth=3))
        text = f'{l}: {s:0.2f}'
        ax.text(xmin, ymin, text, fontsize=15, bbox=dict(facecolor='white', alpha=0.8))

        """if mask is None:
          continue
        np_image = apply_mask(np_image, mask, c)

        padded_mask = np.zeros((mask.shape[0] + 2, mask.shape[1] + 2), dtype=np.uint8)
        padded_mask[1:-1, 1:-1] = mask
        contours = find_contours(padded_mask, 0.5)
        for verts in contours:
          # Subtract the padding and flip (y, x) to (x, y)
          verts = np.fliplr(verts) - 1
          p = Polygon(verts, facecolor="none", edgecolor=c)
          ax.add_patch(p)"""


    plt.imshow(np_image)
    plt.axis('off')
    plt.show()


def add_res(results, ax, color='green'):
    #for tt in results.values():
    if True:
        bboxes = results['boxes']
        labels = results['labels']
        scores = results['scores']
        #keep = scores >= 0.0
        #bboxes = bboxes[keep].tolist()
        #labels = labels[keep].tolist()
        #scores = scores[keep].tolist()
    #print(torchvision.ops.box_iou(tt['boxes'].cpu().detach(), torch.as_tensor([[xmin, ymin, xmax, ymax]])))
    
    colors = ['purple', 'yellow', 'red', 'green', 'orange', 'pink']
    
    for i, (b, ll, ss) in enumerate(zip(bboxes, labels, scores)):
        ax.add_patch(plt.Rectangle((b[0], b[1]), b[2] - b[0], b[3] - b[1], fill=False, color=colors[i], linewidth=3))
        cls_name = ll if isinstance(ll,str) else CLASSES[ll]
        text = f'{cls_name}: {ss:.2f}'
        ax.text(b[0], b[1], text, fontsize=15, bbox=dict(facecolor='white', alpha=0.8))

In [None]:
model, postprocessor = torch.hub.load('ashkamath/mdetr:main', 'mdetr_efficientnetB5', pretrained=True, return_postprocessor=True)
#model = model.cuda()
model.eval();

In [None]:
! pip install nms

In [None]:
from nms import nms , malisiewicz, fast



def plot_inference(im, caption):
  # mean-std normalize the input image (batch-size: 1)
  img = transform(im).unsqueeze(0)#.cuda()

  # propagate through the model
  memory_cache = model(img, [caption], encode_and_save=True)
  outputs = model(img, [caption], encode_and_save=False, memory_cache=memory_cache)
  
  # keep only predictions with 0.7+ confidence
  probas = 1 - outputs['pred_logits'].softmax(-1)[0, :, -1].cpu()
  keep = (probas > 0.2).cpu()

  
  ############NMS
  """bboxes_scaled = rescale_bboxes(outputs['pred_boxes'].cpu()[0, :], im.size)
  bboxes_scaled_tlw= [[int(box[0]),int(box[1]), int(box[2]-box[0]), int(box[3]-box[1]) ]   for box in bboxes_scaled]
  best_keep_nms = nms.boxes(bboxes_scaled_tlw, probas, nms_algorithm=fast.nms, nms_threshold=0.4)
  best_keep_nms = np.array( [ True if (idx in best_keep_nms) else False for idx, i in enumerate(keep) ] )
  
  keep =keep & best_keep_nms"""
  #print("***",keep, best_keep_nms)
  
  
  
  # convert boxes from [0; 1] to image scales
  bboxes_scaled = rescale_bboxes(outputs['pred_boxes'].cpu()[0, keep], im.size)
  # Extract the text spans predicted by each box
  positive_tokens = (outputs["pred_logits"].cpu()[0, keep].softmax(-1) > 0.1).nonzero().tolist()
  predicted_spans = defaultdict(str)
  for tok in positive_tokens:
    item, pos = tok
    if pos < 255:
        span = memory_cache["tokenized"].token_to_chars(0, pos)
        predicted_spans [item] += " " + caption[span.start:span.end]

  labels = [predicted_spans [k] for k in sorted(list(predicted_spans .keys()))]
  
  print(len(probas[keep]), len(bboxes_scaled) ,len(labels))
  assert len(probas[keep])== len(bboxes_scaled) ==len(labels)
  plot_results(im, probas[keep], bboxes_scaled, labels)
  plot_results2(im, probas[keep], bboxes_scaled, labels)

In [None]:
#plot_inference(im, "5 people each holding an umbrella")

import cv2 
video_file = "/home/sophie/aggression_detection/annotated/2019_11_22/000010/color.mp4"

processed_frames = []
video_capture = cv2.VideoCapture(video_file)  # Replace with your video file path
frame_id=0
while True:
    ret, frame = video_capture.read()
    
    if not ret  or frame_id==30:
        break
    
    if frame_id >25:
        print(ret)
        frame=Image.fromarray(frame)
        processed_frame = plot_inference (frame, "pigs") 
        print("*****",frame_id)
    frame_id+=1
    
""" processed_frames.append(processed_frame)
    frame_id+=1

def key_function(item):
    return list(item.keys())[0]

# Use argsort with the custom key function to sort the array
sorted_indices = np.argsort(np.array([key_function(item) for item in processed_frames]))
sorted_processed_frames = np.array(processed_frames)[sorted_indices]
sorted_processed_frames= sorted_processed_frames.tolist()"""


In [None]:
from bytetracker import BYTETracker
help(BYTETracker)