In [15]:
import pygame
import random
import numpy as np
import cv2
import os.path
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
from keras.models import Model, load_model
from keras.layers import Input, BatchNormalization, Activation, Dense, Dropout, Flatten, ZeroPadding2D, UpSampling2D
from keras.layers.core import Lambda, RepeatVector, Reshape
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.layers.pooling import MaxPooling2D, GlobalMaxPool2D
from keras.layers.merge import concatenate, add
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.optimizers import Adam
from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow import keras
import pandas as pd 
import re
import matplotlib.pyplot as plt
from datetime import datetime
from scipy.signal import savgol_filter
import math

pathname = r"D:\OneDrive - Hochschule Albstadt-Sigmaringen\Studium\Semester 5\DesignCPS"
datadirname = "data"
testdirname = "test"
validdirname = "valid"
modeldirname = "model"
datacsvname = "data.csv"
modeljsonname="model-regr.json"
modelweightname="model-regr.h5"
dim = (50,50) 
actionstonum = {"RIGHT": 0,
           "LEFT": 1,
           "SPACE" : 2,
          }
numtoactions = {0: "RIGHT",
           1: "LEFT",
           2: "SPACE",
          }
scores = []
overallscores = []


def create_q_model():
        # Network defined by the Deepmind paper
        inputs = layers.Input(shape=(dim[0], dim[1], 3,))

        # Convolutions on the frames on the screen
        layer1 = layers.Conv2D(32, 8, strides=4, activation="relu")(inputs)
        layer2 = layers.Conv2D(64, 4, strides=2, activation="relu")(layer1)
        layer3 = layers.Conv2D(64, 3, strides=1, activation="relu")(layer2)

        layer4 = layers.Flatten()(layer3)

        layer5 = layers.Dense(512, activation="relu")(layer4)
        action = layers.Dense(4, activation="linear")(layer5)

        return keras.Model(inputs=inputs, outputs=action)

def run_game(learning_rate = 1.5e-06, epochs = 5, benchmin = 68.0):
    manual = False
    lr = [learning_rate for i in range(epochs)]

    iterations = len(lr)
    benches = []
    qms = []
    qps = []
    counter = 0

    for i in range(iterations):
        print(f"{i}: learning rate: {lr[i]}")
        print(benchmin)
        k = 2 #40
        game = Game(500,500)
        game.load_replay_memory()
        for j in range(k):
            #game.initialize(i, j)
            game = Game(500,500,game.shufflelist)
            game.run(j)
        bench, qm, qp = game.print_benchmark()
        benches.append(bench)
        qms.append(qm)
        qps.append(qp)
        game.save_replay_memory()
        game.save_checkpoint(f"model-regr_{i}_{lr[i]:.9f}_{bench:.2f}.h5")
        if bench < benchmin:
            benchmin = bench
            game.save_checkpoint()
        else:
            counter += 1
        if counter == 3:
            counter = 0
            lr = [i*0.5 for i in lr] 
            
        overallscore = game.print_overall_score()
        overallscores.append(overallscore)
    return benches, qms, qps

model = create_q_model()
model_json = model.to_json()
with open(os.path.join(pathname, modeldirname,modeljsonname), "w") as json_file:
    json_file.write(model_json)
model.save_weights(os.path.join(pathname, modeldirname,modelweightname))


class Game:
    screen = None
    
    lost = False
    done = False

    def __init__(self, width, height, shufflelist=[], lr=1e-3, checkpointparname="model-regr.h5"):
        pygame.init()
        
        self.aliens = []
        self.rockets = []
        
        self.width = width
        self.height = height
        self.screen = pygame.display.set_mode((int(width), int(height)))
        self.screen.fill([255,0,0])
        self.clock = pygame.time.Clock()

        self.imgresh1 = None
        self.imgresh2 = None

        self.reward = 0
        self.MAXREWARD = 1.0
        self.PENALTY = -1.0
        self.MOVEPENALTY = 0.0
        
        self.BATCHSIZE = 19
        self.DISCOUNT = 0.99
        self.ALPHA = 0.3
        
        manual=False
        if manual == True:
            self.EPSILON = 0.999
        else:
            self.EPSILON = 0.3
        
        self.REPLAYSIZE = 40_000
        self.overall_score = 0
        self.overall_numbatches = 0
        self.overall_accumulatedstates = np.array([0.0,0.0,0.0,0.0])
        
        
        self.path = os.path.join(pathname, datadirname)
        self.modelpath =  os.path.join(pathname, modeldirname)
        
        self.filename = "data.csv"
        
        self.model = create_q_model()
        self.model_target = create_q_model()

        self.learningrate = lr
        self.optimizer = keras.optimizers.Adam(learning_rate=self.learningrate, clipnorm=1.0)
        self.loss_function = keras.losses.Huber()

        self.checkpointname = os.path.join(pathname, modeldirname,checkpointparname)
        print(f"loading checkpoint: {self.checkpointname}")
        self.model_target.load_weights(self.checkpointname)
        
        self.overall_scores=[]
        self.checkpoint_counter=0
        
        self.shufflelist = shufflelist
        self.debugcounter = 0

    

        self.hero = Hero(self, width / 2, height - 20)
        self.generator = Generator(self)
        self.rocket = None

    def run(self, i_index):
        i = i_index + self.get_maxi() + 1
        j = 0
        while True:
            img1 = np.frombuffer(pygame.image.tostring(self.screen, "RGB"), dtype=np.uint8)
            self.imgresh1 = np.reshape(img1,(self.width,self.height, 3))
            self.imgresh1 = cv2.resize(self.imgresh1, dim, interpolation = cv2.INTER_NEAREST )

            current_state = np.array(self.imgresh1, dtype=np.float32)/255.0

            #if len(self.aliens) == 0:
            #    self.displayText("WIN")

            pressed = pygame.key.get_pressed()
            if pressed[pygame.K_LEFT]:  # sipka doleva
                self.hero.x -= 2 if self.hero.x > 20 else 0  # leva hranice plochy
            elif pressed[pygame.K_RIGHT]:  # sipka doprava
                self.hero.x += 2 if self.hero.x < self.width - 20 else 0  # prava hranice
            elif pressed[pygame.K_q]:
                break

            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    self.reward = self.PENALTY
                    pygame.display.flip()                         
                    pygame.quit()
                    break
                if event.type == pygame.KEYDOWN and event.key == pygame.K_SPACE and not self.lost:
                    self.rockets.append(Rocket(self, self.hero.x, self.hero.y))

            pygame.display.flip()
            self.clock.tick(60)
            self.screen.fill((255, 0, 0))

            for alien in self.aliens:
                alien.draw()
                alien.checkCollision(self)
                if (alien.y > self.height):
                    pygame.display.flip()                         
                    pygame.quit()
                    return
                    
            for rocket in self.rockets:
                rocket.draw()

            if not self.lost: self.hero.draw()
                
            img2 = np.frombuffer(pygame.image.tostring(self.screen, "RGB"), dtype=np.uint8)
            #self.imgresh2 = np.reshape(img2,(self.width,self.height, 3))
            self.imgresh2 = cv2.resize(img2, dim, interpolation = cv2.INTER_NEAREST )

            self.write(i,j)

            j+=1

    def write(self, i, j): 

        cv2.imwrite(os.path.join(self.path,"current_{}_{}.png".format(i,j)), self.imgresh1)
        cv2.imwrite(os.path.join(self.path,"next_{}_{}.png".format(i,j)), self.imgresh2)

    def train(self, i, j, term):
        
        # https://pythonprogramming.net/training-deep-q-learning-dqn-reinforcement-learning-python-tutorial/
        
        currentstate = "current_{}_{}.png".format(i,j)

        nextstate = "next_{}_{}.png".format(i,j)      
        
        batch, files = self.pop_batch(self.BATCHSIZE)
        
        assert(self.imgresh1.shape == (dim[0], dim[1],3))
        assert(self.imgresh2.shape == (dim[0], dim[1],3))
        
        batch.append([self.imgresh1, actionstonum[self.changeto], self.reward, self.imgresh2, term, self.snake_pos[0], self.snake_pos[1], self.food_pos[0], self.food_pos[1]])
        files.append(("current_{}_{}.png".format(i,j), "next_{}_{}.png".format(i,j)))
        
        self.write(i,j)
         
        self.backprop(batch)
        
        self.numbatches += 1
            
        self.push_batch(batch, files)   
  
        return    
    
    def get_maxi(self):
        
        maxi = 0
        
        for item in self.shufflelist:
            curr = item[0]
            s = re.findall(r'\d+', curr)[0]
            if int(s) > maxi:
                maxi = int(s)
        
        return maxi
    
    def load_replay_memory(self):

        f = open(os.path.join(os.path.join(self.path,datacsvname)), "r")
        
        df = pd.read_csv(f, index_col = 0) 

        for index, row in df.iterrows():

            currentpicname = row["currentstate"]
            action = actionstonum[row["action"]]
            reward = row["reward"]
            nextpicname = row["nextstate"]
            terminated = row["terminated"]

            assert os.path.isfile(os.path.join(self.path,currentpicname)) == True
            assert (action < 5 and action >= 0)
            assert isinstance(reward,int) or isinstance(reward, float)
            assert os.path.isfile(os.path.join(self.path,nextpicname)) == True
            
            self.shufflelist.append([currentpicname,action,reward,nextpicname, terminated])

        random.shuffle(self.shufflelist)
        
        #print(self.shufflelist)

        #print(f"loading: size of replay memory {len(self.shufflelist)}")
        
        f.close()
        
        return
    
    def save_replay_memory(self):
        
        assert os.path.isfile(os.path.join(self.path,datacsvname)) == True
        
        data = []
        
        if len(self.shufflelist) == 0:
            return
        
        if len(self.shufflelist) > self.REPLAYSIZE:
            
            self.numbatches = len(self.shufflelist) - self.REPLAYSIZE
            self.overall_numbatches += self.numbatches
            
            for i in range(len(self.shufflelist) - self.REPLAYSIZE):
                item = self.shufflelist.pop(0)
                assert os.path.isfile(os.path.join(self.path,item[0])) == True
                assert os.path.isfile(os.path.join(self.path,item[3])) == True
                os.remove(os.path.join(self.path,item[0]))
                os.remove(os.path.join(self.path,item[3]))
                
        for (cs, act, rew, fs, term) in self.shufflelist:
            
            data.append({'currentstate': cs, 'action': numtoactions[act], 'reward': rew, 'nextstate': fs, 'terminated': term})
            
        df = pd.DataFrame(data) 
        
        df.to_csv(os.path.join(self.path, self.filename)) 
        
        #print(f"saving: size of replay memory {len(self.shufflelist)}")
    
        return
    
    def print_benchmark(self):

        maxlist = []
        penaltylist = []
        averagestates = [0,0,0,0]
        averagepenalty = [0,0,0,0]
        pmerror = 0
        pterror = 0

        for (cs, act, rew, fs, term) in self.shufflelist:
            if rew == self.MAXREWARD or rew == 30.0:
                maxlist.append((cs,act,rew,fs,term))
            if rew == self.PENALTY:
                penaltylist.append((cs,act,rew,fs,term))
        print(f"Number of maxrewards in shufflelist: {len(maxlist)}, perc: {100*len(maxlist)/len(self.shufflelist)}")
        print(f"Number of terminations in shufflelist: {len(penaltylist)}, perc: {100*len(penaltylist)/len(self.shufflelist)}")
        
        count = 0
        
        print("Testing maxlist")
        for i in range(len(maxlist)):
            img = cv2.imread(os.path.join(pathname, datadirname, maxlist[i][0]),cv2.IMREAD_COLOR )
            states = self.model.predict(np.array([img])/255.0, batch_size=1, verbose=0)[0]
            averagestates += states
            if np.argmax(states) != maxlist[i][1]:
                count += 1
            pmerror = 100*count/len(maxlist)
        print(f"Number of predicted errors in maxlist: {count}, perc: {pmerror}")
        print(f"Q Values for max: {averagestates/len(maxlist)}")
        
        count = 0
        
        print("Testing penaltylist") 
        for i in range(len(penaltylist)):
            img = cv2.imread(os.path.join(pathname, datadirname, penaltylist[i][0]),cv2.IMREAD_COLOR )
            states = self.model.predict(np.array([img])/255.0, batch_size=1, verbose=0)[0]
            averagepenalty += states
            if np.argmax(states) == penaltylist[i][1]:
                count += 1
            pterror = 100*count/len(penaltylist)
        print(f"Number of predicted terminations in penaltylist: {count}, perc: {pterror}")
        print(f"Q Values for penalty: {[i/len(penaltylist) for i in averagepenalty]}")
        
        return pmerror, [i/len(maxlist) for i in averagestates], [i/len(penaltylist) for i in averagepenalty]
    
    def save_checkpoint(self, checkpointparname=modelweightname):
                                                                         
        self.model_target.set_weights(self.model.get_weights())
        print(f"saving checkpoint: {os.path.join(pathname, modeldirname,checkpointparname)}")
        self.model_target.save_weights(os.path.join(pathname, modeldirname,checkpointparname) )
            
        return

    def print_score(self):
        print(f" ----> TIME IS {datetime.now():%Y-%m-%d_%H-%M-%S}")
        print(f" ----> SCORE is {self.score}")
        print(f" ----> NUM OF BATCHES is {self.numbatches}")
        return self.score, self.numbatches
    
    def print_overall_score(self):
        print(f"--> TIME IS {datetime.now():%Y-%m-%d_%H-%M-%S}")
        print(f"--> OVERALL SCORE is {self.overall_score}")
        print(f"--> OVERALL NUM OF BATCHES is {self.overall_numbatches}")
        return self.overall_score, self.overall_numbatches     
    


class Alien:
    def __init__(self, game, x, y):
        self.x = x
        self.game = game
        self.y = y
        self.size = 40

    def draw(self):
        pygame.draw.rect(self.game.screen,  # renderovací plocha
                         (81, 43, 88),  # barva objektu
                         pygame.Rect(self.x, self.y, self.size, self.size))
        self.y += 0.4

    def checkCollision(self, game):
        for rocket in game.rockets:
            if (rocket.x < self.x + self.size and
                    rocket.x > self.x - self.size and
                    rocket.y < self.y + self.size and
                    rocket.y > self.y - self.size):
                game.rockets.remove(rocket)
                game.aliens.remove(self)


class Hero:
    def __init__(self, game, x, y):
        self.x = x
        self.game = game
        self.y = y

    def draw(self):
        pygame.draw.rect(self.game.screen,
                         (210, 250, 251),
                         pygame.Rect(self.x, self.y, 40, 20))


class Generator:
    def __init__(self, game):
        margin = 30  # mezera od okraju obrazovky
        width = 50  # mezera mezi alieny
        for x in range(margin, game.width - margin, width):
            for y in range(margin, int(game.height / 2), width):
                if(random.randint(0,1)==1):
                    game.aliens.append(Alien(game, x, y))
                
                

        # game.aliens.append(Alien(game, 280, 50))


class Rocket:
    def __init__(self, game, x, y):
        self.x = x
        self.y = y
        self.game = game

    def draw(self):
        pygame.draw.rect(self.game.screen,  # renderovací plocha
                         (254, 52, 110),  # barva objektu
                         pygame.Rect(self.x, self.y, 15, 15))
        self.y -= 2  # poletí po herní ploše nahoru 2px/snímek


#if __name__ == '__main__':
 #   game = Game(500, 500)

In [16]:
run_game(1.5e-06, 5, 60.0)

0: learning rate: 1.5e-06
60.0
loading checkpoint: D:\OneDrive - Hochschule Albstadt-Sigmaringen\Studium\Semester 5\DesignCPS\model\model-regr.h5
loading checkpoint: D:\OneDrive - Hochschule Albstadt-Sigmaringen\Studium\Semester 5\DesignCPS\model\model-regr.h5
loading checkpoint: D:\OneDrive - Hochschule Albstadt-Sigmaringen\Studium\Semester 5\DesignCPS\model\model-regr.h5
Number of maxrewards in shufflelist: 96, perc: 14.285714285714286
Number of terminations in shufflelist: 64, perc: 9.523809523809524
Testing maxlist
Number of predicted errors in maxlist: 32, perc: 33.333333333333336
Q Values for max: [-0.00599414 -0.00305563  0.00524076 -0.00410843]
Testing penaltylist
Number of predicted terminations in penaltylist: 32, perc: 50.0
Q Values for penalty: [-0.00879652239382267, 8.417724166065454e-05, 0.01357804099097848, -0.015757187269628048]
saving checkpoint: D:\OneDrive - Hochschule Albstadt-Sigmaringen\Studium\Semester 5\DesignCPS\model\model-regr_0_0.000001500_33.33.h5
saving ch

([33.333333333333336,
  33.333333333333336,
  66.66666666666667,
  33.333333333333336,
  66.66666666666667],
 [[-0.0059941372989366455,
   -0.003055631648749113,
   0.005240759424244364,
   -0.004108425695449114],
  [0.0008020286913961172,
   -0.004015434145306547,
   -0.0003075614416350921,
   0.0016023863572627306],
  [-0.0011186619328024487,
   0.0047500610041121645,
   0.0034453279028336206,
   -0.0003391734887069712],
  [0.004407218812654416,
   -0.001745964555690686,
   0.00048107355056951445,
   -0.0007549321744590998],
  [-0.002917591637621323,
   -0.0001556643983349204,
   -0.005115877216060956,
   0.0012103892804589123]],
 [[-0.00879652239382267,
   8.417724166065454e-05,
   0.01357804099097848,
   -0.015757187269628048],
  [-0.0008539488480892032,
   -0.01077083870768547,
   -0.0034492413979023695,
   4.2445317376405e-05],
  [0.000698453193763271,
   0.008202211000025272,
   0.001141363987699151,
   0.002284822054207325],
  [0.01396444858983159,
   -0.003857759525999427,
   