In [44]:
import random
import h5py
import numpy as np
import pandas as pd
import xarray as xr
import seaborn as sns
import matplotlib.pyplot as plt
import csv
import math
import concurrent.futures

from tqdm import tqdm
from pathlib import Path
from sklearn.preprocessing import normalize
from sklearn.preprocessing import MinMaxScaler

In [45]:
BASEDIR = Path("")

fn = BASEDIR / "train_eval.hdf5"

In [46]:
def to_xarray(dataset):

    BANDS = ["coastal_aerosol", "blue", "green", "red",
                "veg_red_1", "veg_red_2", "veg_red_3", "nir", 
                "veg_red_4", "water_vapour", "swir_1", "swir_2"]
        
    post = dataset["post_fire"][...].astype("float32") / 10000.0
    
    pre = dataset["pre_fire"][...].astype("float32") / 10000.0
        
    # Da die Maske nur ein "Band" hat können wir die dritte Dimension einfach
    # weglassen. Das erreichen wir in dem wir mit `0` am Ende indizieren.
    mask = dataset["mask"][..., 0]
    
    return {"pre": xr.DataArray(pre, dims=["x", "y", "band"], coords={"x": range(512), "y": range(512), "band": BANDS}),
            "post": xr.DataArray(post, dims=["x", "y", "band"], coords={"x": range(512), "y": range(512), "band": BANDS}),
            "mask": xr.DataArray(mask, dims=["x", "y"], coords={"x": range(512), "y": range(512)}),
            "fold": dataset.attrs["fold"]}

In [47]:
objectWithMask = 0
objectWithPost_fire = 0
objectWithPre_fire = 0

objectsSummary = []
currentObject = {}

def listObject(obj, indent=0):
    global objectWithMask
    global objectWithPost_fire
    global objectWithPre_fire
    global objectsSummary
    global currentObject
    
    for name, values in obj.items():
        if indent == 0:
            currentObject = { "name" : "", "mask": False, "pre_fire" : False, "post_fire": False, "fold": None}

        if isinstance(values, h5py.Group):
            if indent == 0:
                currentObject["fold"] = values.attrs["fold"]
                currentObject["name"] = name
            listObject(values, indent+1)
            
        else:
            if name == "mask":
                currentObject["mask"] = True
                objectWithMask += 1
            elif name == "post_fire":
                currentObject["post_fire"] = True
                objectWithPost_fire += 1
            elif name == "pre_fire":
                currentObject["pre_fire"] = True
                objectWithPre_fire += 1
            else:
                print(name)
    
        if indent == 0:
            objectsSummary.append(currentObject)

In [48]:
objectWithMask = 0
objectWithPost_fire = 0
objectWithPre_fire = 0

objectsSummary = []
currentObject = {}

with h5py.File(fn, "r") as fd:
    listObject(fd)

folds = {}

for entry in objectsSummary:
    fold = entry["fold"]
    if not fold in folds:
        folds[fold] = 1
    else:
        folds[fold] += 1


completeObjects = 0

for entry in objectsSummary:
    if entry["mask"] == True and entry["pre_fire"] == True and entry["post_fire"] == True:
        completeObjects += 1

print("Folds: " + str(folds))
print("Objects with Mask: " + str(objectWithMask))
print("Objects with Post_Fire: " + str(objectWithPost_fire))
print("Objects with Pre_Fire: " + str(objectWithPre_fire))
print("Complete Objects: " + str(completeObjects))


Folds: {1: 104, 4: 92, 3: 109, 0: 130, 2: 99}
Objects with Mask: 534
Objects with Post_Fire: 534
Objects with Pre_Fire: 356
Complete Objects: 356


In [49]:
def addToMatrice(left, right):
    return left + right

def subtractFromMatrice(left, right):
    return left - right

In [50]:
class Gene:

    def __init__(self, band_key : str, band_op : callable):
        self.bandKey = band_key
        self.bandOp = band_op

    def __str__(self):
        string = ""
        if self.bandOp == addToMatrice:
            string += "+ "
        elif self.bandOp == subtractFromMatrice:
            string += "- "
        else:
            string += "? "

        string += str(self.bandKey)
        return string

class Child:
    def __init__(self, genes : list[Gene], threshold : float = 0.5):
        self.genes = genes
        self.threshold = threshold
        self.score = 0


    def __str__(self):
        string = "[" + str(self.score)+" = 0"
        for gene in self.genes:
            string += " " + str(gene)
        
        string += ", treshold = " + str(self.threshold) + "]"
        return string

class Population:
    def __init__(self, children : list[Child]):
        self.children = children

    def __str__(self):
        string = "["
        first = False
        for child in self.children:
            if not first:
                string += ", " + str(child)
            else:
                first = True
                string += str(child)
        
        string += "]"
        return string

class AlgorithmSettings:
    def __init__(self, band_keys : list[str], band_ops : list[callable], mutationChance : float, maxGeneCount : int, generationSize : int, generations: int, survivorMod : float, childMod : float):
        self.bandKeys = band_keys
        self.bandOps = band_ops
        self.mutationChance = mutationChance
        self.maxGeneCount = maxGeneCount
        self.generationSize = generationSize
        self.generations = generations
        self.survivorMod = survivorMod
        self.childMod = childMod
        self.testsPerGeneration = 10

    def __str__(self):
        string = "{bandKeys :" + str(self.bandKeys) + ", "
        string += "bandOps :" +str(self.bandOps)+ ", "
        string += "mutationChance :" +str(self.mutationChance)+ ", "
        string += "maxGeneCount :" +str(self.maxGeneCount)+ ", "
        string += "generationSize :" +str(self.generationSize)+ ", "
        string += "generations :" +str(self.generations)+ ", "
        string += "survivorMod :" +str(self.survivorMod)+ ", "
        string += "childMod :" +str(self.childMod) + "}"
        return string

class Generation:
    def __init__(self, population: Population, age: int, settings : AlgorithmSettings) -> None:
        self.population = population
        self.settings = settings
        self.age = age

    def __str__(self):
        string = "{age : " + str(self.age) + ", "
        string += "settings : " + str(self.settings) + ", "
        string += "population : " + str(self.population) + "}"
        return string

In [51]:
def generateNewChildFromRandom(settings : AlgorithmSettings) -> Child:

    geneCount = random.randint(1, settings.maxGeneCount)

    newGenes = []
    
    for _ in range(geneCount):
        band_op = random.choice(settings.bandOps)
        band_key = random.choice(settings.bandKeys)
        newGenes.append(Gene(band_key, band_op))

    threshold = random.randint(1, 100) / 100
    return Child(genes=newGenes, threshold = threshold)

def generateNewPopulationFromRandom(nGeneSets : int, settings : AlgorithmSettings) -> Population:
    newChildren = []

    for _ in range(nGeneSets):
        newChildren.append(generateNewChildFromRandom(settings))

    return Population(children=newChildren)

In [52]:
def generateNewPopulationFromParents(parents : list[Child], nGeneSets : int, settings : AlgorithmSettings) -> Population:
    nParents = len(parents)
    weights = [i / (nParents * (nParents + 1) / 2) for i in range(nParents, 0, -1)]
    

    newChildren = []
    for _ in range(nGeneSets):
        newGenes = []
        thresholds = []

        #Grab the length of a random parent and use that for child gene count
        parent_index = random.choices(range(nParents), weights=weights, k=1)[0]
        parent = parents[parent_index]
        
        geneCount = len(parent.genes)

        # Mutation is applied in a seperate step
        # Generate new child using parents and weights
        for _ in range(geneCount):
            parent_index = random.choices(range(nParents), weights=weights, k=1)[0]
            parent = parents[parent_index]
            thresholds.append(parent.threshold)
            # NOTE: This doesnt respect gene order, which to some degree is important
            newGenes.append(random.choice(parent.genes))

        newThreshold = sum(thresholds) / len(thresholds)
        newChildren.append(Child(genes=newGenes, threshold=newThreshold))

    return Population(children=newChildren)

In [53]:
# Flips the operation on a Gene
def flip(child : Child, settings : AlgorithmSettings) -> Child:
    newGenes = child.genes
    opIndex = random.randint(0, len(newGenes) -1)
    op = newGenes[opIndex].bandOp

    if op == addToMatrice:
        op = subtractFromMatrice
    else:
        op = addToMatrice

    newGenes[opIndex].bandOp = op

    return Child(newGenes, child.threshold)

# Mutates a BandKey of a Gene to a random new one
def mutateKey(child : Child, settings : AlgorithmSettings) -> Child:
    newGenes = child.genes

    keyIndex = random.randint(0, len(newGenes) - 1)
    key = random.choice(settings.bandKeys)

    newGenes[keyIndex].bandKey = key

    return Child(newGenes, child.threshold)

# Swaps the BandOp between two Genes
def swapOp(child : Child, settings : AlgorithmSettings) -> Child:
    newGenes = child.genes

    targetIndex1 = random.randint(0, len(newGenes) - 1)
    targetIndex2 = random.randint(0, len(newGenes) - 1)

    bandOp = newGenes[targetIndex1].bandOp
    newGenes[targetIndex1].bandOp = newGenes[targetIndex2].bandOp
    newGenes[targetIndex2].bandOp = bandOp

    return Child(newGenes, child.threshold)

# Swaps the BandKey between two Keys
def swapKey(child : Child, settings : AlgorithmSettings) -> Child:
    newGenes = child.genes

    targetIndex1 = random.randint(0, len(newGenes) - 1)
    targetIndex2 = random.randint(0, len(newGenes) - 1)

    bandKey = newGenes[targetIndex1].bandKey
    newGenes[targetIndex1].bandKey = newGenes[targetIndex2].bandKey
    newGenes[targetIndex2].bandKey = bandKey

    return Child(newGenes, child.threshold)

# Expands the genome by one entry. Will remove the first gene if the max length is reached
def expandSet(child : Child, settings : AlgorithmSettings) -> Child:

    newGenes = child.genes

    newOp = random.choice(settings.bandOps)
    newKey = random.choice(settings.bandKeys)
    newGene = Gene(newKey,newOp)

    while len(newGenes) >= settings.maxGeneCount:
        newGenes.pop(0)

    newGenes.append(newGene)

    return Child(newGenes, child.threshold)

# Removes a random gene
def reduceSet(child : Child, settings : AlgorithmSettings) -> Child:
    
    newGenes = child.genes
    # Cant remove genes from minimum length
    if (len(child.genes) == 1):
        return Child(newGenes, child.threshold)
    
    keyIndex = random.randint(len(newGenes))
    newGenes.pop(keyIndex)

    return Child(newGenes, child.threshold)


def mutateThreshold(child : Child, settings : AlgorithmSettings) -> Child:
    newThreshold = child.threshold
    newThreshold += (random.randint(-100,100) / 100)
    if newThreshold > 1:
        newThreshold = 1
    elif newThreshold < 0.01:
        newThreshold = 0.01

    return Child(child.genes, newThreshold)


In [54]:
# Mutates a childs genes
def mutateChild(child : Child, settings : AlgorithmSettings) -> Child:
    mutations = [
        flip,
        swapKey,
        swapOp,
        mutateKey,
        expandSet,
        mutateThreshold
    ]
    
    newChild = child
    while random.randint(0, 100) <= 100 * settings.mutationChance :
        mutator = random.choice(mutations)
        newChild = mutator(newChild, settings)

    return newChild

In [55]:
def scoreChild(child : Child, settings : AlgorithmSettings, deltaDataArray : xr.DataArray, flatMask : list[float]) -> int:
    # NOTE: Currently only using delta data
    predictionDataArray = deltaDataArray.sel(band="red") * 0
    
    for gene in child.genes:
        predictionDataArray = gene.bandOp(predictionDataArray, deltaDataArray.sel(band=gene.bandKey))

    scaler = MinMaxScaler()

    
    normalizedFlatPrediction = scaler.fit_transform(predictionDataArray.values).flatten()
    threshold = child.threshold
    
    score = 0
    for prediction, actual in zip(normalizedFlatPrediction, flatMask):
        if prediction >= threshold:
            if actual == 1:
                score += 1
        else:
            if actual == 0:
                score += 1

    return score

In [56]:
def scoreAllSync(children : list[Child], settings : AlgorithmSettings, testDataKeys : list[str]):
    
    possibleScore = 0

    for testDataKey in testDataKeys:
        with h5py.File(fn, "r") as fd:
            data = to_xarray(fd[testDataKey])

        pre = data["pre"]
        post = data["post"]
        mask = data["mask"]
        diff = post - pre

        flatMask = mask.values.flatten()
        possibleScore += (512 * 512)

        for child in children:
            child.score += scoreChild(child,settings,diff,flatMask)   

    for child in children:
            child.score = child.score / possibleScore
            
    sortedChildren = sorted(children, key=lambda x: x.score , reverse=True)

    return sortedChildren
    

In [57]:
def runGeneration(generation : Generation, settings : AlgorithmSettings, testDataKeys : list[str]) -> Generation:

    children = generation.population.children
    iGeneration = generation.age

    subkeys = []
    for i in range(settings.testsPerGeneration):
        testDataKeyIndex = iGeneration + i % (len(testDataKeys) - 1)
        testDataKey = testDataKeys[testDataKeyIndex]
        subkeys.append(testDataKey)
    
    print("Scoring Gen " + str(iGeneration) )
    
    sorted_array = scoreAllSync(children,settings,subkeys)

    print("Best Scorer in Generation " + str(generation.age) + ": " + str(sorted_array[0]))
    generationSize = settings.generationSize
    survivorMod = settings.survivorMod
    
    survivorCount = math.floor(generationSize * survivorMod)

    survivors = [sorted_array[i] for i in range(survivorCount)]

    childrenCount = math.floor((generationSize - survivorCount) * settings.childMod)
    newChildren = generateNewPopulationFromParents(survivors, childrenCount, settings)
    for i, child in enumerate(newChildren.children):
        newChildren.children[i] = mutateChild(child, settings)

    randomCount = generationSize - survivorCount - childrenCount
    newRandoms = generateNewPopulationFromRandom(randomCount, settings)

    newGeneration = []
    newGeneration.extend(survivors)
    newGeneration.extend(newChildren.children)
    newGeneration.extend(newRandoms.children)

    random.shuffle(newGeneration)
    
    iGeneration += 1
    return Generation(Population(newGeneration), iGeneration, settings)


In [58]:
def runGeneticAlgorithm(settings : AlgorithmSettings, testDataKeys : list[str]) -> Generation:
    population = {}
    population = generateNewPopulationFromRandom(settings.generationSize, settings)
    
    nGenerations = settings.generations
    generation = Generation(population, 0, settings)

    for i in range(nGenerations):
        
        generation = runGeneration(generation, settings, testDataKeys)

    return generation

In [59]:
testDataKeys = []
validationDataKeys = []


for entry in objectsSummary:
    if entry["mask"] == True and entry["pre_fire"] == True and entry["post_fire"] == True:
        if entry["fold"] == 0:
            validationDataKeys.append(entry["name"])
        else:
            testDataKeys.append(entry["name"])

In [60]:
print("TestData Entries: " + str(len(testDataKeys)))
print("ValidationData Entries: " + str(len(validationDataKeys)))

TestData Entries: 278
ValidationData Entries: 78


In [61]:
random.seed(1234565769)



settings = AlgorithmSettings(
                                band_keys = [
                                    "coastal_aerosol", 
                                    "blue", 
                                    "green", 
                                    "red",
                                    "veg_red_1", 
                                    "veg_red_2", 
                                    "veg_red_3", 
                                    "nir", 
                                    "veg_red_4", 
                                    "water_vapour", 
                                    "swir_1", 
                                    "swir_2"
                                ],
                                band_ops = [
                                    addToMatrice,
                                    subtractFromMatrice
                                ],
                                mutationChance = 0.3,
                                maxGeneCount = 26,
                                generationSize = 200,
                                survivorMod = 0.3,
                                childMod = 0.5,
                                generations = 100
                            )

In [62]:
resultingPop = runGeneticAlgorithm(settings = settings, testDataKeys= testDataKeys)

Scoring Gen 0
Best Scorer in Generation 0: [0.9530467987060547 = 0 - blue + green - nir + green - water_vapour - veg_red_2 + swir_1 - nir + green - swir_2 - swir_2, treshold = 1.0]
Scoring Gen 1
Best Scorer in Generation 1: [0.9558696746826172 = 0 - water_vapour - water_vapour - green - swir_1 - veg_red_3 + red - nir + red + green, treshold = 1.0]
Scoring Gen 2
Best Scorer in Generation 2: [0.9558971236685465 = 0 - water_vapour - water_vapour - veg_red_1 - swir_1 - veg_red_3 + red - nir + red + green, treshold = 1.0]
Scoring Gen 3
Best Scorer in Generation 3: [0.9565425872802734 = 0 - nir + nir, treshold = 0.9807142857142858]
Scoring Gen 4
Best Scorer in Generation 4: [0.9555732561273909 = 0 + nir - nir, treshold = 0.9807142857142858]
Scoring Gen 5
Best Scorer in Generation 5: [0.9558982679646515 = 0 + nir - nir, treshold = 0.9807142857142858]
Scoring Gen 6
Best Scorer in Generation 6: [0.9779987166970319 = 0 + veg_red_4 - veg_red_4, treshold = 0.9964516315640675]
Scoring Gen 7
Best Sc

Traceback (most recent call last):
  File "C:\Users\Jack5\AppData\Roaming\Python\Python310\site-packages\IPython\core\interactiveshell.py", line 3508, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\Jack5\AppData\Local\Temp\ipykernel_39320\2921041177.py", line 1, in <module>
    resultingPop = runGeneticAlgorithm(settings = settings, testDataKeys= testDataKeys)
  File "C:\Users\Jack5\AppData\Local\Temp\ipykernel_39320\97968044.py", line 10, in runGeneticAlgorithm
    generation = runGeneration(generation, settings, testDataKeys)
  File "C:\Users\Jack5\AppData\Local\Temp\ipykernel_39320\526447089.py", line 14, in runGeneration
    sorted_array = scoreAllSync(children,settings,subkeys)
  File "C:\Users\Jack5\AppData\Local\Temp\ipykernel_39320\1696837086.py", line 18, in scoreAllSync
    child.score += scoreChild(child,settings,diff,flatMask)
  File "C:\Users\Jack5\AppData\Local\Temp\ipykernel_39320\3904574652.py", line -1, in scoreChild
KeyboardInterrup

In [None]:
print(str(resultingPop))

{age : 100, settings : {bandKeys :['coastal_aerosol', 'blue', 'green', 'red', 'veg_red_1', 'veg_red_2', 'veg_red_3', 'nir', 'veg_red_4', 'water_vapour', 'swir_1', 'swir_2'], bandOps :[<function addToMatrice at 0x0000019A62FE2F80>, <function subtractFromMatrice at 0x0000019A34BFE710>], mutationChance :0.3, maxGeneCount :26, generationSize :20, generations :100, survivorMod :0.3, childMod :0.5}, population : [, [0 = 0 + veg_red_4 - veg_red_4 + coastal_aerosol + nir + red + swir_2 - water_vapour - water_vapour + swir_1 - red - red - veg_red_2 + veg_red_1 - blue + nir + red + water_vapour + blue + swir_1 - green - veg_red_2, treshold = 0.8], [0 = 0 - swir_1 + coastal_aerosol + nir + veg_red_1 - swir_1 + veg_red_2 + veg_red_2 + nir + veg_red_1 + veg_red_2 - swir_1 + veg_red_1 + nir + nir + veg_red_2 + veg_red_2 + nir + nir + veg_red_2 - coastal_aerosol + veg_red_2 + water_vapour + veg_red_3 + water_vapour + veg_red_1 + nir, treshold = 0.9999999928492336], [0 = 0 - swir_1 + veg_red_3 - veg_r