## Computer Vision Project 2 - Game of Blackjack
- Kacper Trębacz

- Jan Gruszczyński

# Notebook role:

Finally in this notebook, we use trained yolo models (for card and token detection) to analyze a Blackjack game.
In this notebook we've implemented the game logic. Results can be seen in the last block code, were we run the experiment.

In [2]:
import torch
import cv2
import numpy as np
from pathlib import Path
from  scipy.spatial.distance import cdist
import math
import shapely
from shapely.geometry import Polygon
from collections import defaultdict

In [3]:
CARDS_SHOW_THRESHOLD = 0.605
CARDS_DELETE_THRESHOLD = 0.01
CHIPS_SHOW_THRESHOLD = 0.28
CHIPS_DELETE_THRESHOLD = 0.01

In [4]:
cuda = True


device = torch.device('cuda:0' if cuda else 'cpu')

model_cards = torch.hub.load('ultralytics/yolov5', 'custom', path='weights/best.pt')
model_chips = torch.hub.load('ultralytics/yolov5', 'custom', path='weights/best_chips.pt')

Using cache found in /home/kacper/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2021-12-30 torch 1.10.0 CUDA:0 (GeForce GTX 1650, 3912MiB)

Fusing layers... 
Model Summary: 213 layers, 7150369 parameters, 0 gradients
Adding AutoShape... 
Using cache found in /home/kacper/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2021-12-30 torch 1.10.0 CUDA:0 (GeForce GTX 1650, 3912MiB)

Fusing layers... 
Model Summary: 213 layers, 7020913 parameters, 0 gradients
Adding AutoShape... 


In [5]:
CONFIDENCE_DECAY = 0.90

In [6]:
class Label():
    def __init__(self,row):
        self.label = row["name"]
        self.x1 = round(row["xmin"])
        self.y1 = round(row["ymin"])
        self.x2 = round(row["xmax"])
        self.y2 = round(row["ymax"])
        self.confidence = row["confidence"]
        self.confidences = {self.label:self.confidence}
        self.last_moved = 0
    
    @classmethod
    def distance(cls,label1,label2):
        center1 = label1.center()
        center2 = label2.center()
        return math.dist(center1,center2)
    
    def center(self):
        return np.array([(self.x1+self.x2)/2,(self.y1+self.y2)/2],dtype=np.int32)
    
    def shift(self,x,y):
        self.x1,self.x2 = self.x1+x,self.x2+x
        self.y1,self.y2 = self.y1+y,self.y2+y
        
    def to_shape(self):
        return Polygon(((self.x1,self.y1),(self.x2,self.y1),(self.x2,self.y2),(self.x1,self.y2)))
    
    def update(self,nlabel = None):
        self.update_confidences(CONFIDENCE_DECAY)
        if nlabel:
            if not(nlabel.label in self.confidences):
                self.confidences[nlabel.label] = 0
            self.confidences[nlabel.label] += (1-CONFIDENCE_DECAY)*nlabel.confidence
            self.label = max(self.confidences, key=self.confidences.get)
            self.x1,self.y1,self.x2,self.y2 = nlabel.x1,nlabel.y1,nlabel.x2,nlabel.y2
            if Label.distance(self,nlabel) > 10:
                self.last_moved = 0
        self.confidence = self.confidences[self.label]
        self.last_moved+=1
        
    def update_confidences(self,mult):
        self.confidences = {k:v*mult for k,v in self.confidences.items()}
        

In [7]:
FONT = cv2.FONT_HERSHEY_SIMPLEX
FONT_SCALE = 2
COLOR = (90,255,0)
TEXT_COLOR_BG = (0,0,0)
FONT_THICKNESS = 2

def put_text(img, text,position, relative = "LB"):
    text_size, _ = cv2.getTextSize(text, FONT, FONT_SCALE, FONT_THICKNESS)
    if relative == "LB":
        cv2.rectangle(img, (position[0], position[1] - text_size[1]), (position[0] + text_size[0], position[1]), TEXT_COLOR_BG, -1)
        return cv2.putText(img, text, position, FONT, FONT_SCALE, COLOR, FONT_THICKNESS, cv2.LINE_AA)
    if relative == "RB":
        cv2.rectangle(img, (position[0] - text_size[0], position[1] - text_size[1]), (position[0], position[1]), TEXT_COLOR_BG, -1)
        return cv2.putText(img, text, (position[0]-text_size[0],position[1]), FONT, FONT_SCALE, COLOR, FONT_THICKNESS, cv2.LINE_AA)
    if relative == "CU":
        cv2.rectangle(img, (position[0]- text_size[0]//2, position[1] + text_size[1]), (position[0] + text_size[0]//2, position[1]), TEXT_COLOR_BG, -1)
        return cv2.putText(img, text, (position[0]- text_size[0]//2, position[1]+text_size[1]), FONT, FONT_SCALE, COLOR, FONT_THICKNESS, cv2.LINE_AA)

In [8]:
def get_splitpoints(frame,i,j,nx,ny,overlap = 0.2):
    h,w = frame.shape[:2]
    return int(i*w/nx*(1-overlap)),int(j*h/ny*(1-overlap)),min(int((i+1)*w/nx*(1+overlap)),w), min(int((j+1)*h/ny*(1+overlap)),h)

In [9]:
def get_ious(labels1,labels2,thld = 0.3):
    shapes1 = [label.to_shape() for label in labels1]
    shapes2 = [label.to_shape() for label in labels2]
    distances = np.array([[shape1.intersection(shape2).area/min(shape1.area,shape2.area) for shape2 in shapes2] for shape1 in shapes1])
    mask = distances > thld
    return mask,shapes1,shapes2

def join_labels(labels,thld=0.5, debug=False):
    if len(labels) == 0:
        return labels
    iterations =0
    previous_labels = labels
    while True:
        iterations += 1
        mask, shapes1, shapes2 = get_ious(previous_labels, previous_labels, thld)
        mask *= (np.ones(mask.shape)*(1-np.triu(np.ones(mask.shape)))).astype(bool)#upper triangular without diagonal
        index1,index2 = np.where(mask)
        output_labels = []
        for i,j in zip(index1,index2):
            if previous_labels[i] != None and previous_labels[j] != None:
                output_labels.append(previous_labels[i] if previous_labels[i].confidence*shapes1[i].area > previous_labels[j].confidence*shapes2[j].area else previous_labels[j])
                previous_labels[i] = None
                previous_labels[j] = None
        for label in previous_labels:
            if label:
                output_labels.append(label)
        if len(previous_labels) == len(output_labels):
            break
        previous_labels = output_labels
    if debug:
        print("Join label iterations:", iterations)
    return output_labels
    
def add_new_labels(old_labels, new_labels, thld = 0.5):
#     return new_labels
    output_labels = []
    if len(new_labels) == 0:
        return old_labels
    if len(old_labels) > 0:
        mask,shapes1,shapes2 = get_ious(old_labels,new_labels)
        for i,old_label in enumerate(old_labels):
            max_iou_id = np.argmax(mask[i,:])
            if mask[i,max_iou_id] > thld:
                old_label.update(new_labels[max_iou_id])
                mask[:,max_iou_id] = 0
                new_labels[max_iou_id] = None
            else:
                old_label.update()
            output_labels.append(old_label)
    for new_label in new_labels:
        if new_label:
            new_label.confidence*=(1-CONFIDENCE_DECAY)
            new_label.update_confidences(1-CONFIDENCE_DECAY)
            output_labels.append(new_label)
    return output_labels

In [10]:
def extract_labels(whole_frame, model, previous_labels, nx=2,ny=2, overlap=0.1, debug=False):
    w,h = whole_frame.shape[:2]
    all_labels = []
    for i in range(nx):
        for j in range(ny):
            sp = get_splitpoints(whole_frame,i,j,nx,ny)
            frame = cv2.cvtColor(whole_frame[sp[1]:sp[3],sp[0]:sp[2]],cv2.COLOR_BGR2RGB)
            detections = model(frame).pandas().xyxy[0]
            frame = cv2.cvtColor(frame,cv2.COLOR_RGB2BGR)
            labels = [Label(row) for index, row in detections.iterrows()]
            if debug:
                for label in labels:
                    cv2.rectangle(frame, (label.x1, label.y1),(label.x2,label.y2), COLOR, 4)
                    frame = put_text(frame,label.label,(label.x1,label.y1))
                cv2.imshow(f'video{i}{j}', cv2.resize(frame.copy(),(604,604)))#yolo size
            for label in labels:
                label.shift(sp[0],sp[1])
            all_labels.extend(labels)
    all_labels = join_labels(all_labels, debug=debug, thld = 0.3)
#     print("joined labels", sorted([label.label for label in all_labels]))
    new_labels = add_new_labels(previous_labels, all_labels, thld = 0.9)
#     print("matched labels", sorted([label.label for label in new_labels]))
    return new_labels

In [11]:
def extract_cards(game,whole_frame,nx=2,ny=2,overlap=0.1,debug=False):
    all_labels = extract_labels(whole_frame, model_cards, game.card_labels, nx, ny, overlap, debug= debug)
#     print([f"{label.label}:{label.confidence}" for label in sorted(all_labels, key = lambda x: x.confidence)])
    game.card_labels = [label for label in all_labels if label.confidence > CARDS_DELETE_THRESHOLD]
    
    cards = []
    all_labels = [label for label in all_labels if label.confidence > CARDS_SHOW_THRESHOLD]
    if len(all_labels) > 0:
        label_points = [label.center() for label in all_labels]
        distances = cdist(label_points,label_points)

        labels_comp = np.array([[ l1.label == l2.label for l2 in all_labels]for l1 in all_labels])
        different = np.array([[i <j for j in range(len(all_labels))] for i in range(len(all_labels))])
        mask = (distances < 500) & different & labels_comp
        index1,index2 = np.where(mask)
        for i,j in zip(index1,index2):
            if all_labels[i] != None and all_labels[j] != None:
                cards.append(Card([all_labels[i],all_labels[j]]))
                all_labels[i] = None
                all_labels[j] = None
        for label in all_labels:
            if label:
                cards.append(Card([label]))
        
    return cards
    

In [12]:
def extract_chips(game,whole_frame,nx=2,ny=2,overlap=0.1,debug=False,):
    all_labels = extract_labels(whole_frame, model_chips, game.chips_labels, nx, ny, overlap, debug= debug)
    all_labels = [label for label in all_labels if label.to_shape().area > 5000]
#     print([f"{label.label}:({round(label.confidence,3)}|{label.x1}|{label.y1}|{label.x2}|{label.y2})" for label in sorted(all_labels, key = lambda x: x.confidence)])
#     print([f"{label.label}:{label.to_shape().area}" for label in sorted(all_labels, key = lambda x: x.confidence)])
    for label in all_labels:
        cv2.rectangle(whole_frame, (label.x1, label.y1),(label.x2,label.y2), COLOR, 4)
        whole_frame = put_text(whole_frame,label.label,(label.x1,label.y1))
    game.chips_labels = [label for label in all_labels if label.confidence > CHIPS_DELETE_THRESHOLD]
    return [Chip([label]) for label in [label for label in all_labels if label.confidence > CHIPS_SHOW_THRESHOLD]]

In [13]:
class TrackedObject():
    def __init__(self,labels):
        self.labels = labels
        
    def get_label(self):
        return self.labels[0].label
    
    def get_position(self):
        pass
    
    def center(self):
        center = np.array([label.center() for label in self.labels])
        return np.mean(center,axis=0)


class Card(TrackedObject):
    @classmethod
    def get_score(cls, cards):
        score = 0
        aces = 0
        for card in cards:
            if card.is_fig():
                if card.is_ace():
                    aces+=1
                    score+=11
                else:
                    score+=10
            else:
                score += int(card.get_label()[:-1])
        while score > 21 and aces >  0:
            score -=10
        return score
    
    def is_ace(self):
        return "A" in self.get_label()
    
    def is_fig(self):
        return any([fig in self.get_label() for fig in ["A","J","K", "Q"]])
        
class Chip(TrackedObject):
    @classmethod
    def get_score(self, chips):
        return sum([int(chip.get_label()) for chip in chips])
    

In [14]:
class Player:
    def __init__(self,x,y, name, text_x, text_y, relative_text="LB"):
        self.name = name
        self.x = x
        self.y = y
        self.cards = []
        self.chips = []
        self.score = 0
        self.current_bet = 0
        self.relative_text = relative_text
        self.text_x = text_x
        self.text_y = text_y
        self.has_bet = False
        
    def update(self,cards,chips):
        self.cards = cards
        self.chips = chips
        self.update_score()
        self.update_bet()
        
    def update_score(self):
        previous_score = self.score
        self.score = Card.get_score(self.cards)
        if previous_score < self.score:
            if self.score == 21 and len(self.cards) == 2:
                print(f"EVENT: player {self.name} has blackjack")
            if self.score > 21:
                print(f"EVENT: player {self.name} lost")
            
    
    def update_bet(self):
        self.current_bet = Chip.get_score(self.chips)
            
    def info(self):
        return f"{self.name}:Score:{self.score} Bet:{self.current_bet}"#Cards:{[card.get_label() for card in self.cards]}
        
    

In [15]:
M = 999999

In [16]:
class Game:
    def __init__(self):
        self.cards = []
        self.chips = []
        self.card_labels = []
        self.chips_labels = []
        self.players = []
        self.has_started=False
        self.has_finished=False
        
    def update(self, card_candidates, chips_candidates):
        self.cards = card_candidates
        self.chips = chips_candidates
        self.assign_to_players()
        
    def add_player(self,player):
        self.players.append(player)
        
    def assign_to_players(self):
        new_cards = [[] for i in range(len(self.players))]
        if(len(self.cards)):
            distance = cdist([[player.x,player.y] for player in self.players], np.array([card.center() for card in self.cards]))
            while np.sum(distance != M) > 0:#while there are still cards
                player_id, card_id = np.unravel_index(np.argmin(distance, axis=None), distance.shape)
                new_cards[player_id].append(self.cards[card_id])
                distance[:, card_id] = M
        
        new_chips = [[] for i in range(len(self.players))]
        if len(self.chips):
            chips_players = self.players[:-1]
            distance = cdist([[player.x,player.y] for player in chips_players], np.array([chip.center() for chip in self.chips]))
            while np.sum(distance != M) > 0:#while there are still chips
                player_id, chip_id = np.unravel_index(np.argmin(distance, axis=None), distance.shape)
                new_chips[player_id].append(self.chips[chip_id])
                distance[:, chip_id] = M
               
        dealer = self.players[-1]
        if self.has_started:
            if len(new_cards[-1]) ==2 and len(dealer.cards) == 1:
                self.message("EVENT: Dealer revealed his card")
        for i,player in enumerate(self.players):
            if len(new_cards[i]) > len(player.cards):
                self.message(f"player:{player.name} got {len(new_cards[i])-len(player.cards)} new cards")
                if len(new_cards[i]) > 2:
                    self.message(f"EVENT: player:{player.name} drew card")
            if len(new_cards[i]) == 0 and len(player.cards) > 0:
                self.message(f"player:{player.name} had his cards taken")
            if len(new_chips[i]) > len(player.chips) and not player.has_bet:
                self.message(f"EVENT: player:{player.name} bet")
                player.has_bet = True
            player.update(new_cards[i],new_chips[i])
            
        if dealer.score >= 17 and not self.has_finished:
            self.message("Dealer draw last card")
            if dealer.score > 21:
                for player in self.players[:-1]:
                    if player.score <= 21:
                        self.message(f"EVENT: player {player.name} won")
            else:
                for player in self.players[:-1]:
                    if player.score <= 21 and len(player.cards) > 2:
                        if player.score > dealer.score:
                            self.message(f"EVENT: player {player.name} won")
                        elif player.score < dealer.score:
                            self.message(f"EVENT: player {player.name} last")
                        else:
                            self.message(f"EVENT: player {player.name} has a draw")
            self.has_finished = True
            
        if not self.has_started and all([len(player.cards) == 2 for player in self.players[:-1]]) and len(dealer.cards) == 1:
            self.message("EVENT game has started")
            self.has_started = True
            
    def message(self, msg):
        if not self.has_finished:
            print(msg)
            
        


In [19]:
paths = [f for f in Path('photos/final').iterdir()]
# for video_path in paths:
video_path = Path('photos/final/proste_final_2_2r.mp4')
video = cv2.VideoCapture(str(video_path))
out = cv2.VideoWriter(f"{video_path.parent}/{video_path.stem}_detected.mp4", cv2.VideoWriter_fourcc(*'MP4V'), 20.0, (1920,1080))
frame_id = 0
game = Game()
game.add_player(Player(200,800, "PL", 0, 1080, relative_text="LB"))
game.add_player(Player(1619,800, "PR", 1919, 1080, relative_text="RB"))
game.add_player(Player(959,0, "D", 959, 0,relative_text="CU"))
while video.isOpened():
    if video.grab():
        flag, whole_frame = video.retrieve()
        frame_id+=1
        if frame_id %2:#every second frame, cause we are recording at 60fps
            continue
        if not flag or frame_id < 0:
            continue
        else:
            new_cards = extract_cards(game,whole_frame,1,1,0.3)
            new_chips = extract_chips(game,whole_frame,1,1,0.3)
            game.update(new_cards,new_chips)
            for i,player in enumerate(game.players):
                put_text(whole_frame,player.info(),(player.text_x,player.text_y), relative=player.relative_text)
            for tracked in game.cards+game.chips:
                for label in tracked.labels:
                    cv2.rectangle(whole_frame, (label.x1, label.y1),(label.x2,label.y2), COLOR, 4)
                    whole_frame = put_text(whole_frame,label.label,(label.x1,label.y1))
            cv2.imshow(f'video', cv2.resize(whole_frame.copy(),(1600,900)))
            out.write(whole_frame)
    if cv2.waitKey(1) == 27:
        break 
video.release()
out.release()


OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


EVENT: player:PR bet
EVENT: player:PL bet
player:PR got 1 new cards
player:PR got 1 new cards
player:PL got 1 new cards
player:PL got 1 new cards
EVENT: player PL has blackjack
player:D got 1 new cards
EVENT game has started
player:PR got 1 new cards
EVENT: player:PR drew card
EVENT: player PR lost
EVENT: Dealer revealed his card
player:D got 1 new cards
player:PR got 1 new cards
EVENT: player:PR drew card
EVENT: player PR lost
player:D got 1 new cards
EVENT: player:D drew card
Dealer draw last card
