<a href="https://colab.research.google.com/github/flaviochess/robocop_ai/blob/main/robocop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Atenção
Esse playbook usa a api do Gemini e por isso dele deve ser executado em um país que o Gemini está disponível: [available regions](https://ai.google.dev/gemini-api/docs/available-regions).

Se estiver rodando esse playbook a partir do **Colab**, é importante se certificar que a instância na qual está rodando encontra-se em um destes países, do contrário não conseguirá usar esse playbook.

O comando a seguir indica onde está rodando o Colab. Caso esteja fora da área de disponibilidade do Gemini (muitos países da Europa ainda não estão disponíveis), é necessário iniciar uma nova instância do Colab até cair em um servidor em um país onde o Gemini está disponível.

In [None]:
!curl ipinfo.io

### Inicializando o ambiente
Instala as bibliotecas do Gemini

In [None]:
!pip install -U -q google-generativeai

### Configurando o modelo
Configura o modelo do Gemini. Como o projeto utiliza analise de vídeos, é necessário utilizar o Gemini 1.5 Pro, que permite o input de vídeos.

Uma vez que essa versão ainda não permite costumizações como temperatura do modelo, está sendo utilizado as configurações padrão.

Para executar é necessário incluir sua api_key do Gemini, executando a partir do Colab basta incluir ela em secrets com o nome `geminiApiKey`. Caso esteja executando localmente é necessário incluir sua api_key na variável `api_key`.

In [None]:
from pathlib import Path
import hashlib
import google.generativeai as genai

from google.colab import userdata
api_key = userdata.get('geminiApiKey')
genai.configure(api_key=api_key)

# Set the model to Gemini 1.5 Pro to analyze the video.
model = genai.GenerativeModel(model_name="models/gemini-1.5-pro-latest")

In [None]:
class Grafo:
  def __init__(self, vertices):
    self.vertices = vertices
    self.grafo = [[] for i in range(self.vertices)]

  def adiciona_aresta(self, u, v):
    self.grafo[u-1].append(v)
    self.grafo[v-1].append(u)

  def nexts(self, vertice):
    return self.grafo[vertice-1]

  def mostra_lista(self):
    for i in range(self.vertices):
      print(f'{i+1}:', end=' ')
      for j in self.grafo[i]:
        print(f'{j} |', end=' ')
      print('')

Incializa o mapa da cidade, inteligando as ruas e assim saber todas as opções de rotas.

(Posteriormente utilizar uma api como a do Google Maps para obter as opções de rotas)

In [None]:
zip_codes = {
    "0300-100": 1,
    "0300-200": 2,
    "0300-300": 3,
    "0300-400": 4,
    "0300-500": 5,
    "0300-600": 6,
    "0300-700": 7,
    "0300-800": 8,
    "0300-900": 9,
    "0400-100": 10,
    "0400-200": 11,
    "0400-300": 12,
    "0400-400": 13
}

streetsGraph = Grafo(13)

streetsGraph.adiciona_aresta(1, 2)
streetsGraph.adiciona_aresta(1, 6)
streetsGraph.adiciona_aresta(2, 3)
streetsGraph.adiciona_aresta(2, 4)
streetsGraph.adiciona_aresta(2, 8)
streetsGraph.adiciona_aresta(2, 9)
streetsGraph.adiciona_aresta(3, 5)
streetsGraph.adiciona_aresta(4, 5)
streetsGraph.adiciona_aresta(4, 6)
streetsGraph.adiciona_aresta(4, 9)
streetsGraph.adiciona_aresta(6, 7)
streetsGraph.adiciona_aresta(6, 8)
streetsGraph.adiciona_aresta(6, 9)
streetsGraph.adiciona_aresta(7, 10)
streetsGraph.adiciona_aresta(7, 11)
streetsGraph.adiciona_aresta(8, 9)
streetsGraph.adiciona_aresta(8, 10)
streetsGraph.adiciona_aresta(8, 12)
streetsGraph.adiciona_aresta(9, 10)
streetsGraph.adiciona_aresta(9, 11)
streetsGraph.adiciona_aresta(9, 12)
streetsGraph.adiciona_aresta(9, 13)
streetsGraph.adiciona_aresta(10, 11)

Testando videos:
(continuar:
- subir dois ou 3 videos, uma para uma metade das ruas e outro para outra)
- mudar a pasta de frames para streets
- para cada video mandar o path, pois cada um vai mudar a sub-pasta incluindo o nome da rua

In [None]:
street_one_video_cam = "http://onsave.com.br/cdn/streets/1/video.mp4"
street_two_video_cam = "http://onsave.com.br/cdn/streets/1/video.mp4"
street_three_video_cam = "http://onsave.com.br/cdn/streets/1/video.mp4"
street_four_video_cam = "http://onsave.com.br/cdn/streets/4/video.mp4"
street_five_video_cam = "http://onsave.com.br/cdn/streets/1/video.mp4"
street_six_video_cam = "http://onsave.com.br/cdn/streets/4/video.mp4"
street_seven_video_cam = "http://onsave.com.br/cdn/streets/9/video.mp4"
street_eight_video_cam = "http://onsave.com.br/cdn/streets/4/video.mp4"
street_nive_video_cam = "http://onsave.com.br/cdn/streets/9/video.mp4"
street_ten_video_cam = "http://onsave.com.br/cdn/streets/9/video.mp4"
street_eleven_video_cam = "http://onsave.com.br/cdn/streets/9/video.mp4"
street_twelve_video_cam = "http://onsave.com.br/cdn/streets/9/video.mp4"
street_thirteen_video_cam = "http://onsave.com.br/cdn/streets/9/video.mp4"

import cv2
import os
import shutil

# Create or cleanup existing extracted image frames directory.
FRAME_EXTRACTION_DIRECTORY = "/content/streets/"
FRAME_PREFIX = "_frame"
STREET_ONE = "1"
STREET_TWO = "2"
STREET_THREE = "3"
STREET_FOUR = "4"
STREET_FIVE = "5"
STREET_SIX = "6"
STREET_SEVEN = "7"
STREET_EIGTH = "8"
STREET_NINE = "9"
STREET_TEN = "10"
STREET_ELEVEN = "11"
STREET_TWELVE = "12"
STREET_THIRTEEN = "13"

def create_frame_output_dir(output_dir):
  if not os.path.exists(output_dir):
    os.makedirs(output_dir)
  else:
    shutil.rmtree(output_dir)
    os.makedirs(output_dir)

def extract_frame_from_video(video_file_path, street_index):
  print(f"Extracting {video_file_path} at 1 frame per second. This might take a bit...")
  street_directory = FRAME_EXTRACTION_DIRECTORY + street_index
  create_frame_output_dir(street_directory)
  vidcap = cv2.VideoCapture(video_file_path)
  fps = vidcap.get(cv2.CAP_PROP_FPS)
  frame_duration = 1 / fps  # Time interval between frames (in seconds)
  output_file_prefix = os.path.basename(video_file_path).replace('.', '_')
  frame_count = 0
  count = 0
  while vidcap.isOpened():
      success, frame = vidcap.read()
      if not success: # End of video
          break
      if int(count / fps) == frame_count: # Extract a frame every second
          min = frame_count // 60
          sec = frame_count % 60
          time_string = f"{min:02d}:{sec:02d}"
          image_name = f"{output_file_prefix}{FRAME_PREFIX}{time_string}.jpg"
          output_filename = os.path.join(street_directory, image_name)
          cv2.imwrite(output_filename, frame)
          frame_count += 1
      count += 1
  vidcap.release() # Release the capture object\n",
  print(f"Completed video frame extraction!\n\nExtracted: {frame_count} frames")

def extract_frame_from_video_multiple_streets(video_file_path, street_indexes):
  print(f"Extracting {video_file_path} at 1 frame per second. This might take a bit...")

  for street_index in street_indexes:
    street_directory = FRAME_EXTRACTION_DIRECTORY + street_index
    create_frame_output_dir(street_directory)

  vidcap = cv2.VideoCapture(video_file_path)
  fps = vidcap.get(cv2.CAP_PROP_FPS)
  frame_duration = 1 / fps  # Time interval between frames (in seconds)
  output_file_prefix = os.path.basename(video_file_path).replace('.', '_')
  frame_count = 0
  count = 0
  while vidcap.isOpened():
      success, frame = vidcap.read()
      if not success: # End of video
          break
      if int(count / fps) == frame_count: # Extract a frame every second
          min = frame_count // 60
          sec = frame_count % 60
          time_string = f"{min:02d}:{sec:02d}"
          image_name = f"{output_file_prefix}{FRAME_PREFIX}{time_string}.jpg"

          for street_index in street_indexes:
            street_directory = FRAME_EXTRACTION_DIRECTORY + street_index
            output_filename = os.path.join(street_directory, image_name)
            cv2.imwrite(output_filename, frame)
          frame_count += 1
      count += 1
  vidcap.release() # Release the capture object\n",
  print(f"Completed video frame extraction!\n\nExtracted: {frame_count} frames")

# When a different video cam by street
# extract_frame_from_video(street_one_video_cam, STREET_ONE)
# extract_frame_from_video(street_two_video_cam, STREET_TWO)
# extract_frame_from_video(street_three_video_cam, STREET_THREE)
# extract_frame_from_video(street_four_video_cam, STREET_FOUR)
# extract_frame_from_video(street_five_video_cam, STREET_FIVE)
# extract_frame_from_video(street_six_video_cam, STREET_SIX)
# extract_frame_from_video(street_seven_video_cam, STREET_SEVEN)
# extract_frame_from_video(street_eight_video_cam, STREET_EIGTH)
# extract_frame_from_video(street_nive_video_cam, STREET_NINE)
# extract_frame_from_video(street_ten_video_cam, STREET_TEN)
# extract_frame_from_video(street_eleven_video_cam, STREET_ELEVEN)
# extract_frame_from_video(street_twelve_video_cam, STREET_TWELVE)
# extract_frame_from_video(street_thirteen_video_cam, STREET_THIRTEEN)

extract_frame_from_video_multiple_streets(street_one_video_cam, [STREET_ONE, STREET_TWO, STREET_THREE, STREET_FIVE])
extract_frame_from_video_multiple_streets(street_four_video_cam, [STREET_FOUR, STREET_SIX, STREET_EIGTH])
extract_frame_from_video_multiple_streets(street_seven_video_cam, [STREET_SEVEN, STREET_NINE, STREET_TEN, STREET_ELEVEN, STREET_TWELVE, STREET_THIRTEEN])

In [None]:
import os

class File:
  def __init__(self, file_path: str, display_name: str = None):
    self.file_path = file_path
    if display_name:
      self.display_name = display_name
    self.timestamp = get_timestamp(file_path)

  def set_file_response(self, response):
    self.response = response

def get_timestamp(filename):
  """Extracts the frame count (as an integer) from a filename with the format
     'output_file_prefix_frame00:00.jpg'.
  """
  parts = filename.split(FRAME_PREFIX)
  if len(parts) != 2:
      return None  # Indicates the filename might be incorrectly formatted
  return parts[1].split('.')[0]

Procura o carro em uma rua em específico:

In [None]:
import json

# Upload the files to the API
# Change full_video to False to only upload a 10 second slice of files to reduce upload time.
# Change full_video to True to upload the whole video.
full_video = True

# Uploaded files at Gemini
uploaded_files = []

def clean_gemini_videos():
  try:
    print(f'Limpando Gemini videos ({len(uploaded_files)} arquivos)...')
    for file in uploaded_files:
      genai.delete_file(file.response.name)
      print(f'Arquivo {file.file_path} deletado')
    print("Limpeza finalizada!")
  except:
    print("Limpeza não finalizada, mas o processo continuará...")

def load_street_video_can_to_gemini(street_index):
  #clean_gemini_videos()
  uploaded_files = []

  street_directory = FRAME_EXTRACTION_DIRECTORY + str(street_index)
  files = os.listdir(street_directory)
  files = sorted(files)
  files_to_upload = []
  for file in files:
    files_to_upload.append(
        File(file_path=os.path.join(street_directory, file)))

  print(f'Carregando {len(files_to_upload) if full_video else 10} arquivos da rua {street_index}. Isso pode demorar um pouco...')

  for file in files_to_upload if full_video else files_to_upload[40:50]:
    print(f'Uploading: {file.file_path}...')
    response = genai.upload_file(path=file.file_path)
    file.set_file_response(response)
    uploaded_files.append(file)

  print("Video carregado!")


def make_gemini_request(prompt, files):
  request = [prompt]
  for file in files:
    request.append(file.timestamp)
    request.append(file.response)
  return request

#TODO: checar se tem videos para aquela rua
def is_car_on_street(street_index, car_tag):
  print(f"Procurando carro {car_tag} na rua {street_index}...")
  load_street_video_can_to_gemini(street_index)

  # Create the prompt.
  prompt = f'''No vídeo procure por um carro com a placa "{car_tag.replace("-", " ")}" ou similar.
               O resultado deve ser em formato JSON (sem marcações markdown) contendo um atributo "similaridade" indicando de 0 a 100
               qual é a similaridade da placa encontrada com o valor "{car_tag.replace("-", " ")}".
               Se a similaridade for maior que zero, inclua também os atributos "cor" indicando a cor do carro encontrado,
               "fabricante" indicando o fabricante do carro encontrado, "placa" indicando a placa do carro encontrado e "instante" indicando
               o último instante em que o carro aparece no vídeo'''

  request = make_gemini_request(prompt, uploaded_files)
  response = model.generate_content(request,
                                    request_options={"timeout": 600})

  print(f'Debug, Gemini answer: {response.text.replace("`", "").replace("json", "")}')

  return json.loads(response.text.replace("`", "").replace("json", ""))


In [None]:
import time

def remove_when_exists(arr, values):
  if len(values) == 0:
    return arr

  for value in values:
    try:
      arr.remove(value)
    except:
      None

  return arr


def find_my_car(car_tag, last_found, next_possibilities, checked_streets, last_found_result):
  if len(next_possibilities) > 0:
    street_index = next_possibilities[0]
    try:
      find_result = is_car_on_street(street_index, car_tag)
      checked_streets.append(street_index)

      if find_result["similaridade"] > 60:
        print(f"#### carro visto na rua {street_index} ####")
        print("verificando próximas ruas...")
        time.sleep(6) # para evitar exaustão do modelo por muitos requests no uso free
        next_streets = streetsGraph.nexts(street_index)
        return find_my_car(car_tag, street_index, remove_when_exists(next_streets, checked_streets), checked_streets, find_result)
      elif len(next_possibilities) > 1:
        next_possibilities.remove(street_index)
        return find_my_car(car_tag, last_found, next_possibilities, checked_streets, last_found_result)
      else:
        return (last_found, last_found_result)
    except:
      print("##### parece que muitas requisições foram feitas, se está usando uma versão free aguarde um pouco ou reinicie a busca a partir da última rua encontrada #####")
      return (last_found, last_found_result)



def find(car_tag, ref_street):
  try:
    last_found, last_result = find_my_car(car_tag, None, [ref_street], [], {})
    if last_found == None:
      print(f'Não foi possível encontrar com ajuda de IA o carro na rua {ref_street}')
    else:
      print(f'Probabilidade de {last_result["similaridade"]}% do carro estar na rua {last_found}')
  except:
    print("Processo interrompido inesperadamente, por favor, continue a busca da última rua encontrada.")


In [None]:
find("5660-LMW", 1)