In [1]:
import torch
import numpy as np
import ffmpeg,subprocess
from PIL import Image, ImageDraw

In [None]:
!pip install ffmpeg-python imageio

In [2]:
def get_current_signal(bbox_tensor, image_size):
  # bbox_tensor[N, [x1,x2,y1,y2,confdence,class]]
  h,w = image_size
  w_center = w/2
  x_center = bbox_tensor[:,1] - bbox_tensor[:,0]
  deltas = torch.abs(w_center-x_center) 
  min_delta, min_idx = torch.min(deltas,0)
  return min_idx

In [3]:
{"stop": 0, "warning": 1, "go": 2, "ambiguous": 3}
word_to_color = {"stop": "red", "warning": "yellow", "go": "green", "ambiguous": "white"}
mapping = {0:"stop", 1:'warning', 2:'go', 3:'ambiguous'}

def create_json(labels, image_size, path):
  """
    labels_tensor : (N, 10, 5)
    N - number of frames
    10 - max number pf traffic signals
    5 - number of label values
    path : path to file
  """
  dicts = []
  for i, label_tensor in enumerate(labels):
    frame_dict = {str(i):{}}
    min_idx = get_current_signal(label_tensor, (256,256))
    for j, labeled_frame in enumerate(label_tensor):
      frame_dict[str(i)][str(j)]={}
      frame_dict[str(i)][str(j)]['coords'] = list(np.array(labeled_frame[0:4].numpy(), dtype=int))
      frame_dict[str(i)][str(j)]['state'] = mapping[labeled_frame[4].item()]
      frame_dict[str(i)][str(j)]['affect'] = False
      if j == min_idx:
        frame_dict[str(i)][str(j)]['affect'] = True
    dicts.append(frame_dict)
  dict_str = ""
  for d in dicts:
      dict_str += str(d)+'\n'
  import json
  with open(path, "w") as f:
    json.dump(dict_str, f, indent=4, sort_keys=True)
  print(dict_str)
  return dicts

In [4]:
A = [
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
     [[0,0,10,10,1],
     [0,10,10,20,2],
     [0,10,20,20,2],],
]

A = torch.Tensor(A)
predict_path = 'jsoned.json'
frame_labels = create_json(A, (256,256), predict_path)




In [8]:
frame_labels_ = {}
for i,d in enumerate(frame_labels):
    frame_labels_[i] = d[str(i)]

In [17]:
video_file = 'video (10).avi'

probe = ffmpeg.probe(video_file)
video_info = next(s for s in probe['streams'] if s['codec_type'] == 'video')
old_width = int(video_info['width'])
old_height = int(video_info['height'])
num_frames = int(eval(video_info['duration'])*eval(video_info['avg_frame_rate']))
fps = eval(video_info['avg_frame_rate'])

frame_size = old_width*old_height*3
#print(frame_idx)
process = (
    ffmpeg
    .input(video_file)
    .filter('scale', old_width, old_height)
    .output('pipe:', format='rawvideo', pix_fmt='rgb24')
    .run_async(pipe_stdout=True)
)

for i in range(num_frames):
    frame = (
        np
        .frombuffer(process.stdout.read(frame_size), np.uint8)
        .reshape(old_height, old_width, 3)
    )

    frame = np.uint8(frame)
    image = Image.fromarray(frame)

    label = frame_labels_.get(i)
    if label:
        for traffic_light, state in label.items():
            color = word_to_color[state['state']]
            coords = state['coords']
            
            img1 = ImageDraw.Draw(image)  
            img1.rectangle(coords, outline = color)
    image.save(f"sample_data/{i}.jpg")
subprocess.call(["ffmpeg","-y","-r",str(fps),"-i", "sample_data/%d.jpg","-vcodec","mpeg4", "-qscale","1", "-r", str(fps), "sample_data/video.avi"])
dir_name = "sample_data/"
import os
test = os.listdir(dir_name)

for item in test:
    if item.endswith(".jpg"):
        os.remove(os.path.join(dir_name, item))