#### Imports

In [1]:
import os
from time import time, sleep
from datetime import datetime

import pyautogui

from PIL import Image, ImageOps

import cv2 as cv
import numpy as np

import pandas as pd
from sklearn.neighbors import KNeighborsClassifier

#### General References

In [2]:
dark_needle_img = 'dark_kills_counter_skull_icon.jpg'
dark_needle_img_2 = 'dark_kills_counter_skull_icon_2.jpg'
pr_needle_img = 'players_remaining_icon.png'

n_kills_crop = (136, 0, 174, 28)  # 28x28 crop: (139, 0, 167, 28)
n_players_remaining_crop = (84, 0, 122, 28)

#### Helper Functions

In [3]:
def capture_screenshot(out_route=False):
    """
    return 1280x720p, bgr -> grayscale screenshot
    """
    # capture screenshot & resize to 720p
    screenshot = pyautogui.screenshot() 
    screenshot = screenshot.resize((1280, 720))

    grayscale = ImageOps.grayscale(screenshot)  # want to switch this to subtracting the mean

    # translate colors to opencv (why/is this necessary? w/ grayscale already done?)
    screenshot = cv.cvtColor(np.array(grayscale), cv.COLOR_RGB2BGR)

    if out_route:
        Image.fromarray(screenshot).save(out_route)

    return screenshot


def record_numbers(numbers, file_path):
    existing_df = pd.read_csv(file_path)

    temp_df = pd.DataFrame(numbers)
    temp_df.columns = existing_df.columns

    new_df = pd.concat([existing_df, temp_df], axis=0)
    new_df.to_csv(file_path, index=False)

    return new_df

## Numbers Class

In [4]:
class Numbers():    
    
    def __init__(self):
        self.datasets = {'28x28' : 'numbers.csv', '38x28' : 'digits_only_numbers.csv'}
        self.update_df()
        
        self.IMG_SIZE = 50

        self.icons_dir = 'media/icons/'
        self.output_dir = 'media/test_record_kills_and_players_remaining/'
        
        self.top_left = None
        self.trusted_top_left = None
        self.top_left_pr_icon = None
        
        self.n_save_errors = 0
        self.n_pred_errors = 0
   
    def load_image_arrays(self):
        """
        load & resize image arrays
        
        returns list of 2D np.arrays sized (self.IMG_SIZE, self.IMG_SIZE) 
            > default: (50, 50)
        
        currently supports (38, 28) and (28, 28) sized inputs
        """
        file_paths = self.df['file_path'].values
        image_arrays = []
        for path in file_paths:
            base_size = Image.open(path).size
            if base_size == (38, 28):
                img = cv.imread(path, cv.IMREAD_GRAYSCALE)  # single channel
                img = Image.fromarray(img).crop((0, 0-5, 38, 28+5))  # 38x28 > 38x38
                img = cv.resize(np.array(img), (self.IMG_SIZE, self.IMG_SIZE))  # 50x50
                image_arrays.append(img)
            elif base_size == (28, 28):
                img = cv.imread(path, cv.IMREAD_GRAYSCALE)
                img = Image.fromarray(img).crop((0-3, 0-5, 28+7, 28+5))  # 28x28 > 38x38
                img = cv.resize(np.array(img), (self.IMG_SIZE, self.IMG_SIZE))  # 50x50
                image_arrays.append(img)
            else:
                raise Exception(f'\nerror: unknown size | {base_size}')
        image_labels = self.df['numbers'].values
        return image_arrays, image_labels
    
    def train_knn(self, train_data, labels, n_neighbors=1):
        """
        Train simple KNN model to predict digits
        
        inputs 
        -----
        >> train_data
            > list of unflattend np arrays
        >> labels
            > list of target (y) values
        >> n_neighbors
            > number of neighbors (K) for KNN
        """
        X = [img.flatten() for img in train_data]
        y = labels
        
        knn = KNeighborsClassifier(n_neighbors=n_neighbors)
        
        knn.fit(X, y)
        
        return knn
    
    def record_livestream(self, model, n_loops=420):
        """
        record {n_loops} from displayed Twitch livestream
        
        inputs
        ------
        
        model
        >> ml/dl model to make predictions on livestream captures
            > currently KNN only
        
        n_loops
        >> number of loops to run
            > default == 420
            > recent run of 10**4 loops took ~40 minutes
            > (recommended) max == 10**7
        """
        dark_needle_icon = cv.imread(f'{self.icons_dir}{dark_needle_img}', cv.IMREAD_UNCHANGED)
        
        dark_needle_icon_2 = cv.imread(f'{self.icons_dir}{dark_needle_img_2}', cv.IMREAD_UNCHANGED)
        
        players_remaining_icon = cv.imread(f'{self.icons_dir}{pr_needle_img}', cv.IMREAD_GRAYSCALE)
        players_remaining_icon = cv.cvtColor(players_remaining_icon, cv.COLOR_RGB2BGR)
        
        temp_top_right_numbers_stash = []
        
        for _ in range(n_loops):
            if _ < 10:
                loop = f'loop_000000{_}'
            elif 10 <= _ < 10**2:
                loop = f'loop_00000{_}'
            elif 10**2 <= _ < 10**3:
                loop = f'loop_0000{_}'
            elif 10**3 <= _ < 10**4:
                loop = f'loop_000{_}'
            elif 10**4 <= _ < 10**5:
                loop = f'loop_00{_}'
            elif 10**5 <= _ < 10**6:
                loop = f'loop_0{_}'
            else:
                loop = f'loop_{_}'

            # capture & save greyscaled bgr screenshot (1280, 720)
            screenshot = capture_screenshot(out_route=f'{self.output_dir}og_screenshot/{loop}.jpg')
            record_time = str(datetime.now())

            # crop top (25%) right corner of the screenshot
            top_right_numbers_screenshot = Image.fromarray(screenshot.copy())
            top_right_numbers_screenshot = top_right_numbers_screenshot.crop((int(1280*0.75), 0, 1280, int(720*.25)))
            top_right_numbers_screenshot = np.array(top_right_numbers_screenshot)

            # look for n_kills skull icons
            for needle_img in [dark_needle_icon_2]:  # dark_needle_icon
                result = cv.matchTemplate(top_right_numbers_screenshot, needle_img, cv.TM_CCOEFF_NORMED)

                min_val, max_val, min_loc, max_loc = cv.minMaxLoc(result)

                threshold = 0.7
                # do we have a satasfactory match?
                if max_val >= threshold:
                    
                    needle_w = needle_img.shape[1]
                    if needle_w < 18:
                        print(f'needle_w=={needle_w}, adding 5')
                        needle_w += 5
                    needle_h = needle_img.shape[0]

                    # tag top left corner, add width & height to find bottom right corner of icon
                    top_left = max_loc 
                    bottom_right = (top_left[0] + (needle_w) - 5, top_left[1] + needle_h)

                    # are we within logical area?
                    if (top_left[0] > 120) and (90 > top_left[1] > 10):
                        self.top_left = top_left
                        self.bottom_right = bottom_right

                        # black out kill skull icon
                        cv.rectangle(top_right_numbers_screenshot, top_left, bottom_right, color=(0, 0, 0), thickness=-1)

                # no, fell short of threshold, so go back to the last logical area we had
                else:
                    if self.trusted_top_left is not None:
                        top_left = self.trusted_top_left
                    else:
                        top_left = self.top_left
                    # have we gotten a top left yet?
                    if top_left is not None:
                        # are we within logical area?
                        if (top_left[0] > 120) and (90 > top_left[1] > 10):
                            bottom_right = self.bottom_right
                            # black out kill skull icon
                            cv.rectangle(top_right_numbers_screenshot, top_left, bottom_right, color=(0, 0, 0), thickness=-1)

            # do we have a top left?
            if top_left is not None:

                # correct bottom right and expand left to just grab all linearly alligned numbers at once
                full_bottom_right = (top_left[0] + (needle_w * 2), top_left[1] + needle_h)
                full_top_left = tuple([top_left[0]-125, top_left[1]]) 

                # crop full top right numbers bar
                top_right_numbers_screenshot_2 = top_right_numbers_screenshot[full_top_left[1]:full_bottom_right[1], 
                                                                              full_top_left[0]:full_bottom_right[0]]
                # make sure we still have a screenshot
                if top_right_numbers_screenshot_2.size != 0:
                    top_right_numbers_screenshot = top_right_numbers_screenshot_2
                    self.trusted_top_left = self.top_left

                    # look for players remaining icon
                    for needle_img_2 in [players_remaining_icon]: 
                        try:
                            result_2 = cv.matchTemplate(top_right_numbers_screenshot, needle_img_2, cv.TM_CCOEFF_NORMED)
                            min_val_2, max_val_2, min_loc_2, max_loc_2 = cv.minMaxLoc(result_2)

                            threshold_2 = 0.8
                            needle_2_w = needle_img_2.shape[1]
                            needle_2_h = needle_img_2.shape[0] + 10
                            
                            # do we have a satasfactory match?
                            if max_val_2 >= threshold_2:

                                # tag top left corner, add width & height to find bottom right corner
                                top_left_2 = max_loc_2  # want rectangle
                                bottom_right_2 = (top_left_2[0] + (needle_2_w) - 5, top_left_2[1] + needle_2_h)
                                
                                self.top_left_pr_icon = top_left_2
                                self.bottom_right_pr_icon = bottom_right_2

                                # black out players remaining icon
                                cv.rectangle(top_right_numbers_screenshot, top_left_2, bottom_right_2, color=(0, 0, 0), thickness=-1)
                            # no, so go back to the last one we had
                            else:
                                top_left_2 = self.top_left_pr_icon
                                # do we have this?
                                if top_left_2 is not None:
                                    bottom_right_2 = self.bottom_right_pr_icon
                                    # black out players remaining icon
                                    cv.rectangle(top_right_numbers_screenshot, top_left_2, bottom_right_2, color=(0, 0, 0), thickness=-1)
                        except Exception as e:
                            print(e)
                # bad top left value, crop not as expected
                else:
                    if self.trusted_top_left is None:
                        # no prior success with top_left, forget it
                        self.top_left = None
                    else:
                        # revert to last successful top_left value
                        self.top_left = self.trusted_top_left
                        top_left = self.top_left
                        # correct bottom right and expand left to just grab all linearly alligned numbers at once
                        full_bottom_right = (top_left[0] + (needle_w * 2), top_left[1] + needle_h)
                        full_top_left = tuple([top_left[0]-125, top_left[1]]) 

                        # crop full top right numbers bar
                        top_right_numbers_screenshot = top_right_numbers_screenshot[full_top_left[1]:full_bottom_right[1], 
                                                                                    full_top_left[0]:full_bottom_right[0]]
            
                try:
                    # convert opencv back to PIL
                    i = Image.fromarray(top_right_numbers_screenshot)

                    k_out = f'{self.output_dir}n_kills/{loop}.jpg'
                    pr_out = f'{self.output_dir}n_players_remaining/{loop}.jpg'
                    crop_out = f'{self.output_dir}crop_screenshot/{loop}.jpg'

                    # crop kills & players remaining numbers
                    k_crop = i.crop(n_kills_crop)
                    pr_crop = i.crop(n_players_remaining_crop)

                    # save cropped images (numbers)
                    i.save(crop_out)
                    k_crop.save(k_out)
                    pr_crop.save(pr_out)

                    # crop images to (38, 38) from (38, 28)
                    k_crop = k_crop.crop((0, 0-5, 38, 28+5))
                    pr_crop = pr_crop.crop((0, 0-5, 38, 28+5))

                    # make predictions
                    try:
                        k_pred = self.pull_numbers(np.array(k_crop), model)
                        k_pred = ' '.join(k_pred)
                        # print(f'n_kills: {k_pred}')
                    except Exception as e:
                        self.n_pred_errors += 1
                        k_pred = None
                        print(e)

                    try:
                        pr_pred = self.pull_numbers(np.array(pr_crop), model)
                        pr_pred = ' '.join(pr_pred)
                        # print(f'n_plyrs: {pr_pred}')
                    except Exception as e:
                        self.n_pred_errors += 1
                        pr_pred = None
                        print(e)

                    temp_top_right_numbers_stash.append([pr_pred, k_pred, pr_out, k_out, record_time, 
                                                         f'{self.output_dir}og_screenshot/{loop}.jpg', self.top_left])   

                except Exception as e:
                    self.n_save_errors += 1
                    print(e)
        
            if ((_ % 25 == 0) and (len(temp_top_right_numbers_stash) != 0)) or (len(temp_top_right_numbers_stash) > 10):
                record_numbers(temp_top_right_numbers_stash, f'{self.output_dir}sample_records.csv')
                temp_top_right_numbers_stash = []

        record_numbers(temp_top_right_numbers_stash, f'{self.output_dir}sample_records.csv')
                
    def pull_numbers(self, image, model, knn=True):    
        if knn == True:
            image = cv.resize(image, (self.IMG_SIZE, self.IMG_SIZE))
            image = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
            image = image.reshape(1, -1)  # flatten

            numbers = model.predict(image)

            return list(numbers)
        else:
            raise Exception(f'pull_numbers() error | knn != True | knn == {knn}')
        
    def update_df(self):
        """
        update DataFrame of targets {numbers} and relative file paths {file_path} of labeled images
        """
        dataset_keys = [key for key in self.datasets]
        for _ in range(len(dataset_keys)):
            if _ == 0:
                self.df = pd.read_csv(self.datasets[dataset_keys[_]])
            else:
                temp_df = pd.read_csv(self.datasets[dataset_keys[_]])
                self.df = pd.concat([self.df, temp_df], axis=0) 
                
    def trim_df(self, n, output=False):
        """
        limit number of any label's instances in self.df 
        """
        df = self.df
        for value in df.numbers.unique():
            c = len(df.loc[df.numbers==value])
            if c > max_label_sample:
                temp_df = df.loc[df.numbers == value].sample(max_label_sample)
                df = df.loc[df.numbers != value]
                df = pd.concat([df, temp_df])
            # print(f'{value} | {len(df.loc[df.numbers==value])}')
        self.df = df
        if output:
            return df
        
    def clear_output_dir(self):
        """
        delete image files and reset CSV in {self.output_dir} & sub directories
        """
        for sub in ['n_kills/', 'n_players_remaining/', 'og_screenshot/', 'crop_screenshot/']:
            for f in os.listdir(f'{self.output_dir}{sub}'):
                if '.jpg' in f:
                    os.remove(f'{self.output_dir}{sub}{f}')
        pd.read_csv(f'{self.output_dir}sample_records.csv').head(0).to_csv(f'{self.output_dir}sample_records.csv', index=False)


#### Run

In [5]:
n = Numbers()

In [6]:
data, labels = n.load_image_arrays()

In [7]:
knn = n.train_knn(data, labels)

In [8]:
n.clear_output_dir()

In [9]:
%%time
n.record_livestream(model=knn)

Wall time: 1min 24s


In [10]:
n.n_save_errors, n.n_pred_errors

(0, 0)