In [None]:
# import dependencies
from IPython.display import display, Javascript, Image
from google.colab.output import eval_js
from google.colab.patches import cv2_imshow
from google.colab import drive
from base64 import b64decode, b64encode
import cv2
import numpy as np
import PIL
import io
import html
import time
import matplotlib.pyplot as plt
%matplotlib inline
!pip install ultralytics
from ultralytics import YOLO
import base64



In [None]:
# JavaScript to properly create our live video stream using our webcam as input
def video_stream():
  js = Javascript('''
    var video;
    var div = null;
    var stream;
    var captureCanvas;
    var imgElement;
    var labelElement;

    var pendingResolve = null;
    var shutdown = false;

    function removeDom() {
       stream.getVideoTracks()[0].stop();
       video.remove();
       div.remove();
       video = null;
       div = null;
       stream = null;
       imgElement = null;
       captureCanvas = null;
       labelElement = null;
    }

    function onAnimationFrame() {
      if (!shutdown) {
        window.requestAnimationFrame(onAnimationFrame);
      }
      if (pendingResolve) {
        var result = "";
        if (!shutdown) {
          captureCanvas.getContext('2d').drawImage(video, 0, 0, 640, 480);
          result = captureCanvas.toDataURL('image/jpeg', 0.8)
        }
        var lp = pendingResolve;
        pendingResolve = null;
        lp(result);
      }
    }

    async function createDom() {
      if (div !== null) {
        return stream;
      }

      div = document.createElement('div');
      div.style.border = '2px solid black';
      div.style.padding = '3px';
      div.style.width = '100%';
      div.style.maxWidth = '600px';
      document.body.appendChild(div);

      const modelOut = document.createElement('div');
      modelOut.innerHTML = "<span>Status:</span>";
      labelElement = document.createElement('span');
      labelElement.innerText = 'No data';
      labelElement.style.fontWeight = 'bold';
      modelOut.appendChild(labelElement);
      div.appendChild(modelOut);

      video = document.createElement('video');
      video.style.display = 'block';
      video.width = div.clientWidth - 6;
      video.setAttribute('playsinline', '');
      video.onclick = () => { shutdown = true; };
      stream = await navigator.mediaDevices.getUserMedia(
          {video: { facingMode: "environment"}});
      div.appendChild(video);

      imgElement = document.createElement('img');
      imgElement.style.position = 'absolute';
      imgElement.style.zIndex = 1;
      imgElement.onclick = () => { shutdown = true; };
      div.appendChild(imgElement);

      const instruction = document.createElement('div');
      instruction.innerHTML =
          '<span style="color: red; font-weight: bold;">' +
          'When finished, click here or on the video to stop this demo</span>';
      div.appendChild(instruction);
      instruction.onclick = () => { shutdown = true; };

      video.srcObject = stream;
      await video.play();

      captureCanvas = document.createElement('canvas');
      captureCanvas.width = 640; //video.videoWidth;
      captureCanvas.height = 480; //video.videoHeight;
      window.requestAnimationFrame(onAnimationFrame);

      return stream;
    }
    async function stream_frame(label, imgData) {
      if (shutdown) {
        removeDom();
        shutdown = false;
        return '';
      }

      var preCreate = Date.now();
      stream = await createDom();

      var preShow = Date.now();
      if (label != "") {
        labelElement.innerHTML = label;
      }

      if (imgData != "") {
        var videoRect = video.getClientRects()[0];
        imgElement.style.top = videoRect.top + "px";
        imgElement.style.left = videoRect.left + "px";
        imgElement.style.width = videoRect.width + "px";
        imgElement.style.height = videoRect.height + "px";
        imgElement.src = imgData;
      }

      var preCapture = Date.now();
      var result = await new Promise(function(resolve, reject) {
        pendingResolve = resolve;
      });
      shutdown = false;

      return {'create': preShow - preCreate,
              'show': preCapture - preShow,
              'capture': Date.now() - preCapture,
              'img': result};
    }
    ''')

  display(js)

def video_frame(label, bbox):
  data = eval_js('stream_frame("{}", "{}")'.format(label, bbox))
  return data

# function to convert the JavaScript object into an OpenCV image
def js_to_image(js_reply):
  """
  Params:
          js_reply: JavaScript object containing image from webcam
  Returns:
          img: OpenCV BGR image
  """
  # decode base64 image
  image_bytes = b64decode(js_reply.split(',')[1])
  # convert bytes to numpy array
  jpg_as_np = np.frombuffer(image_bytes, dtype=np.uint8)
  # decode numpy array into OpenCV BGR image
  img = cv2.imdecode(jpg_as_np, flags=1)

  return img

# function to convert OpenCV Rectangle bounding box image into base64 byte string to be overlayed on video stream
def bbox_to_bytes(bbox_array):
  """
  Params:
          bbox_array: Numpy array (pixels) containing rectangle to overlay on video stream.
  Returns:
        bytes: Base64 image byte string
  """
  # convert array into PIL image
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  iobuf = io.BytesIO()
  # format bbox into png for return
  bbox_PIL.save(iobuf, format='png')
  # format return string
  bbox_bytes = 'data:image/png;base64,{}'.format((str(b64encode(iobuf.getvalue()), 'utf-8')))

  return bbox_bytes

In [None]:
# Load the YOLO models
drive.mount('/content/drive')
model_hand = YOLO('./drive/MyDrive/Colab Notebooks/yolov11n-hands.pt')
model_face = YOLO('./drive/MyDrive/Colab Notebooks/yolov11n-face.pt')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from os import stat
from datetime import datetime
import time

class_names = ['P', 'R', 'S']

class InitialHandler:
  def handle(self, detections, state):
    state.message = 'Preparing...'

    state.player1_action = None
    state.player2_action = None

    return True


class WaitingForRockHandler:
  ACCEPTED_ROCK_FRAME_COUNT = 1

  def __init__(self):
    self.double_rock_count = 0

  def handle(self, detections, state):
    state.message = f'Waiting for rock...'

    rocks = 0
    for detection in detections:
      cls = detection.cls.item()
      label = class_names[int(cls)]
      if label == "R":
        rocks += 1

    if rocks != 2:
      self.double_rock_count = 0
      return False

    self.double_rock_count += 1

    if self.double_rock_count == self.ACCEPTED_ROCK_FRAME_COUNT:
      return True

    return False


class CountingDownHandler:
  COUNTDOWN_TIME = 5
  MOVMENT_THRESHOLD = 15

  def __init__(self):
    self.initiated_time = datetime.now()
    self.palyer1_rock_y = []
    self.palyer2_rock_y = []

  def handle(self, detections, state):
    now = datetime.now()
    elapsed = now - self.initiated_time
    seconds = int(elapsed.total_seconds())

    positions = []
    for detection in detections:
      cls = detection.cls.item()
      label = class_names[int(cls)]
      left, top, right, bottom = detection.xyxy[0].tolist()
      positions.append((left, top, label)) # should we check lable is R?

    if len(positions) == 2:
      if positions[0][0] < positions[1][0]:
        self.palyer1_rock_y.append(positions[0][1])
        self.palyer2_rock_y.append(positions[1][1])
      else:
        self.palyer1_rock_y.append(positions[1][1])
        self.palyer2_rock_y.append(positions[0][1])

    if seconds < self.COUNTDOWN_TIME:
      state.message = f'Counting down: {self.COUNTDOWN_TIME - seconds}'
      return False

    state.message = f'Counting down: ENDED'

    if not self._check_valid_rock_movment(self.palyer1_rock_y):
      state.player1_action = "Cheated"
      state.player1_cheated = True

    if not self._check_valid_rock_movment(self.palyer2_rock_y):
      state.player2_action = "Cheated"
      state.player2_cheated = True

    return True

  @classmethod
  def _check_valid_rock_movment(cls, rock_positions):
    if len(rock_positions) == 0:
      return False
    base = min(rock_positions)
    for i in range(len(rock_positions)):
      cur = rock_positions[i]
      if abs(cur-base) >= cls.MOVMENT_THRESHOLD:
        return True
    return False


class WaitingForActionHandler:
  VALID_FRAME_THRESHOLD = 3

  def __init__(self):
    self.valid_frames = 0
    self.player1_actions = []
    self.player2_actions = []


  def handle(self, detections, state):
    if state.player1_action == "Cheated" or state.player2_action == "Cheated":
      return True

    state.message = 'HOLD YOUR CHOICE!'

    actions = []
    for detection in detections:
      cls = detection.cls.item()
      label = class_names[int(cls)]
      left, top, right, bottom = detection.xyxy[0].tolist()
      actions.append((left, label))

    if len(actions) < 2:
      # raise Exception("TODO: Handle Someone act out of R S P")
      return False

    if actions[0][0] < actions[1][0]:
      self.player1_actions.append(actions[0][1])
      self.player2_actions.append(actions[1][1])
    else:
      self.player1_actions.append(actions[1][1])
      self.player2_actions.append(actions[0][1])

    self.valid_frames += 1

    if self.valid_frames == self.VALID_FRAME_THRESHOLD:
      player1_reduced_action = set(self.player1_actions)
      if len(player1_reduced_action) == 1:
        state.player1_action = player1_reduced_action.pop()
      else:
        state.player1_action = "Cheated"
        state.player1_cheated = True

      player2_reduced_action = set(self.player2_actions)
      if len(player2_reduced_action) == 1:
        state.player2_action = player2_reduced_action.pop()
      else:
        state.player2_action = "Cheated"
        state.player2_cheated = True

      return True

    return False


class CalculateScoresHandler:
  def handle(self, detections, state):
    actions = {
        "R": 0,
        "S": 0,
        "P": 0,
    }

    message = ""
    if state.player1_action == "Cheated" or state.player2_action == "Cheated":
      if state.player1_action == "Cheated":
        message += "Player1 cheated "
        state.player1_score -= 1
      if state.player2_action == "Cheated":
        message += "Player2 cheated "
        state.player2_score -= 1
      state.message = message
      return True

    message += f"Player1 {state.player1_action} "
    actions[state.player1_action] = 1
    message += f"Player2 {state.player2_action} "
    actions[state.player2_action] = 2


    rock = actions["R"]
    scissor = actions["S"]
    paper = actions["P"]

    winner = 0
    if rock != 0 and scissor != 0:
      winner = rock
    elif rock != 0 and paper != 0:
      winner = paper
    elif scissor != 0 and paper != 0:
      winner = scissor

    if winner == 0:
      message += "| Tie "
    elif winner == 1:
      message += "| Player1 won "
      state.player1_score += 1
    elif winner == 2:
      message += "| Player2 won "
      state.player2_score += 1

    state.message = message

    return True

class State:
  def __init__(self):
    self.hands = 0
    self.player1_score = 0
    self.player2_score = 0
    self.message = ''
    self.player1_action = None
    self.player2_action = None
    self.player1_cheated = False
    self.player2_cheated = False

class_colors = {
    "R": (255, 0, 0),
    "S": (0, 255, 0),
    "P": (0, 0, 255),
}

class RockPaperScissors:
  def __init__(self, hands=3):
    self.hands = hands
    self.current_handler = None
    self.prev_handler_breaked = True
    self.player1_overall_score = 0
    self.player2_overall_score = 0

  def get_handler(self, handler_classes):
    if self.prev_handler_breaked:
      if len(handler_classes) == 0:
        return None

      self.current_handler = handler_classes.pop(0)()
      return self.current_handler

    return self.current_handler

  def _start_streaming(self, handler_classes, state):

    # initialze bounding box to empty
    bbox = ''
    count = 0
    while True:
      label_html = f'Player 1: {state.player1_score}, Player 2: {state.player2_score} | {state.message}'
      js_reply = video_frame(label_html, bbox)
      if not js_reply:
          break

      # detections, width_ratio, height_ratio = darknet_helper(frame, width, height) # arsalan helper

      handler = self.get_handler(handler_classes)
      if not handler:
        break

      # convert JS response to OpenCV Image
      frame = js_to_image(js_reply["img"])

      hand_detections = model_hand.predict(source=frame, verbose=False)
      hand_detections = hand_detections[0].boxes
      hands = []
      bbox_array = np.zeros([480,640,4], dtype=np.uint8)
      for detection in hand_detections:
        x1, y1, x2, y2 = detection.xyxy[0].tolist()
        cls = detection.cls.item()
        label = class_names[int(cls)]
        hands.append((x1, y1, x2, y2))
        bbox_array = cv2.rectangle(bbox_array, (int(x1), int(y1)), (int(x2), int(y2)), class_colors[label], 2)

      faces = []
      if state.player1_cheated or state.player2_cheated:
          #detect faces
          face_detections = model_face.predict(source=frame, verbose=False)
          face_detections = face_detections[0].boxes

          for detection in face_detections:
              x1, y1, x2, y2 = detection.xyxy[0].tolist()
              faces.append((x1, y1, x2, y2))

          if len(faces) >=2:
              if faces[0][0] < faces[1][0]:
                  left_face = faces[0]
                  right_face = faces[1]
              else:
                  left_face = faces[1]
                  right_face = faces[0]

              if state.player1_cheated:
                  lx1, ly1, lx2, ly2 = left_face
                  cv2.rectangle(bbox_array, (int(lx1), int(ly1)), (int(lx2), int(ly2)), (255, 0, 0), -1)

              if state.player2_cheated:
                  lx1, ly1, lx2, ly2 = right_face
                  cv2.rectangle(bbox_array, (int(lx1), int(ly1)), (int(lx2), int(ly2)), (255, 0, 0), -1)

      # Convert the frame back to base64 for display
      bbox_array[:,:,3] = (bbox_array.max(axis = 2) > 0 ).astype(int) * 255
      bbox_bytes = bbox_to_bytes(bbox_array)
      bbox = bbox_bytes

      self.prev_handler_breaked = handler.handle(hand_detections, state)



  def _show_king(self, state):
    initiated_time = datetime.now()
    bbox=''
    winner = -1
    if state.player2_score > state.player1_score:
      winner = 2
    elif state.player2_score < state.player1_score:
      winner = 1

    while True:
      elapsed = datetime.now() - initiated_time
      if int(elapsed.total_seconds()) < 20:
        if winner == -1:
          label_html = f'NOONE WIN! TIE'
          continue
        else:
          label_html = f'player {winner} wins.'
        js_reply = video_frame(label_html, bbox)

        bbox_array = np.zeros([480,640,4], dtype=np.uint8)

        frame = js_to_image(js_reply["img"])

        faces = []
        face_detections = model_face.predict(source=frame, verbose=False)
        face_detections = face_detections[0].boxes

        for detection in face_detections:
            x1, y1, x2, y2 = detection.xyxy[0].tolist()
            # cv2.rectangle(bbox_array, (int(x1), int(y1)), (int(x2), int(y2)), (0, 0, 255), 2)
            faces.append((x1, y1, x2, y2))


        if len(faces) < 2:
          continue

        if winner == 1:
          lx1, ly1, lx2, ly2 = faces[0]
          cv2.rectangle(bbox_array, (int(lx1), int(ly1)), (int(lx2), int(ly2)), (0, 255, 0), 2)
        else:
          lx1, ly1, lx2, ly2 = faces[1]
          cv2.rectangle(bbox_array, (int(lx1), int(ly1)), (int(lx2), int(ly2)), (0, 255, 0), 2)

        bbox_array[:,:,3] = (bbox_array.max(axis = 2) > 0 ).astype(int) * 255
        bbox_bytes = bbox_to_bytes(bbox_array)
        bbox = bbox_bytes


  def start(self):
    video_stream()

    i = 0
    state = State()
    state.hands = self.hands
    while i < self.hands:
      handler_classes = [
          InitialHandler,
          WaitingForRockHandler,
          CountingDownHandler,
          WaitingForActionHandler,
          CalculateScoresHandler,
      ]

      self._start_streaming(handler_classes, state)
      self.player1_overall_score += state.player1_score
      self.player2_overall_score += state.player2_score
      time.sleep(5)

      i += 1
    self._show_king(state)



In [None]:
RockPaperScissors(hands=1).start()