In [1]:
# si utilisé dans jupyter utiliser la ligne suivante, si utilisé dans VisualStudioCode utiliser la seconde
#%matplotlib widget
#%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import json
import os

Définition des fonctions intervenant dans le calcul du score afin de pouvoir comparer les différentes analyses entre elles

In [2]:
def computeLength(compressionStats:dict[str, str]) -> int:
    content:str = compressionStats["Compression"]
    # len(content) : donne la longeur de la chaine entière
    # content.count('*')*2+int(compressionStats["OptCount"]) : compte le nombre d'options en évitant de compter en double le cas des [A*BCDA*] où A* ne compte que pour une seule option qui peut être soit au début soit à la fin. Donc on compte le nombre d'étoile que l'on multiplie par deux pour compter aussi sont Event associé puis on rajoute le décompte d'options des statistiques de la compression
    # content.count(']') compte le nombre de fin de séquence pour ne prendre en compte que les début de séquence
    return len(content)-content.count('*')*2+int(compressionStats["OptCount"])-content.count(']')

# compressionStats: statistiques d'une compression
# lmax: longueur maximale de toutes les compressions
# optWeight: contrôle le positionnement du curseur pour privilégier la longueur vs le nombre d'options
def computeScore(compressionStats:dict[str, str], lmax:int, lWeight:float, lWay:bool, oWeight:float, oWay:bool, aWeight:float, aWay:bool, mWeight: float, mWay:bool) -> float:
    length:int = computeLength(compressionStats)
    # 1-length/lmax favorise les séquences courtes
    # 1-int(compressionStats["OptCount"])/length favorise les séquences avec peu d'éléments optionnels
    scoreLength:float = lWeight*((1-length/lmax if lWay else length/lmax))
    scoreOption:float = oWeight*((1-int(compressionStats["OptCount"])/length if oWay else int(compressionStats["OptCount"])/length))
    scoreAlign:float = aWeight*((1-int(compressionStats["AlignCount"])/length if aWay else int(compressionStats["AlignCount"])/length))
    scoreMerge:float = mWeight*((1-int(compressionStats["MergeCount"])/length if mWay else int(compressionStats["MergeCount"])/length))
    return scoreLength+scoreOption+scoreAlign+scoreMerge

Définition de la fonction qui permet de calculer pour un jeu d'hyper paramètres donné le nombre de résultats attendu obtenu

In [3]:
# ScenarioCount: Nombre de scénario, un scénario étant un ensemble de séquence et de compression d'une même famille de problème
# dataCache: un dictionnaire ayant comme clé le nom d'un test et comme valeur:
#               "data" -> un objet contenant pour chaque paramètre (gr,ws,pb) sa compression et des stats en terme d'option, d'alignement et de merge
#               "solution" -> un str contenant la solution attendu pour ce test
# hyperParameters: Le jeu d'hyperparamètre à utiliser dans le calcul du score
# mostRecurrent: un booleen indiquant si on ne comptabilise que les résultats les plus reccurent ou si on s'autorise plus de souplesse. Si True, ne compter un succès que si parmis les compressions avec le meilleur score la plus récurrente est la solution attendu. Si False, compter comme un succès si la solution fait partie des compression explorées.
def computePerf(ScenarioCount:int, dataCache, hyperParameters, mostRecurrent) -> list[int]:
    correct_counts:list[int]=[0]*ScenarioCount
    # Parcourir tout le jeu de test
    for test, cache in dataCache.items():
        data:dict[str, list[dict[str, str]]] = cache["data"]
        solution:str = cache["solution"]

        # Recherche des maxima
        maxLen:int = 0
        for k,stats in data.items():
            for stat in stats:
                # Vérifier si on n'est pas dans le cas de l'OverTime (Tout égal à 0)
                if stat["AlignCount"] == "0" and stat["OptCount"] == "0" and stat["MergeCount"] == "0" :
                    continue
                length:int = computeLength(stat)
                if length > maxLen:
                    maxLen = length

        maxScore:float = -1
        bestSol:dict[str, int] = {}
        popularSol:dict[str, int] = {}
        for k,stats in data.items():
            for stat in stats:
                # Vérifier qu'on a au moins une stat (pas le cas de l'OverTime)
                if stat["AlignCount"] != "0" or stat["OptCount"] != "0" or stat["MergeCount"] != "0" :
                    currentScore:float = computeScore(stat, maxLen, hyperParameters[0], hyperParameters[1], hyperParameters[2], hyperParameters[3], hyperParameters[4], hyperParameters[5], hyperParameters[6], hyperParameters[7])
                    if currentScore > maxScore :
                        maxScore = currentScore
                        bestSol = {}
                        bestSol[stat["Compression"]] = 1
                    elif currentScore == maxScore :
                        if stat["Compression"] in bestSol:
                            bestSol[stat["Compression"]] += 1
                        else:
                            bestSol[stat["Compression"]] = 1
                    if stat["Compression"] in popularSol:
                        popularSol[stat["Compression"]] += 1
                    else:
                        popularSol[stat["Compression"]] = 1

        scenarKey:int = int(test.split("_")[0])-1
        if scenarKey < ScenarioCount:
            if mostRecurrent:
                maxOccurence:str = -1
                for k, v in bestSol.items():
                    if v > maxOccurence:
                        maxOccurence = v
                # Comptabiliser comme un succès si la clé de la plus récurrente est la solution
                for k, v in bestSol.items():
                    if k == solution and v >= maxOccurence:
                        correct_counts[scenarKey] += 1
            else:
                if solution in popularSol:
                    correct_counts[scenarKey] += 1

    return correct_counts

Mise en cache des données du jeu de test

In [4]:
# Fonction d'aide pour construire la liste des fichiers à explorer
def defineTests(dir:str):
    if not os.path.isdir(dir):
        print("Error: "+dir+" is not a directory")
        raise
    if not os.path.isdir(dir+"/example") or not os.path.isdir(dir+"/example/solutions"):
        print("Error: "+dir+" is not correctly structured. It must contain these sub directories: "+dir+"/example/solutions/")
        raise
    return [f.replace('.log', '') for f in os.listdir(dir+"/example") if f.endswith(".log")]

In [8]:
tests:list[str] = []
mainDir:str = "./datasetXP_alter"
#mainDir:str = "./datasetXP"

#version:str = "dichotomous"
version:str = "exhaustive"

if tests == []:
    tests = defineTests(mainDir)

ScenarioCount = 0
#identifier le nombre de scénario
for example_filename in os.listdir(mainDir+"/example"):
    if example_filename.endswith(".log"):
        scenarNum = int(example_filename.split('_')[0])
        if scenarNum > ScenarioCount:
            ScenarioCount = scenarNum

# Mise en cache des jeux de test
dataCache = {}
for test in tests:
    dataCache[test] = {}
    # Ouvrir le fichier JSON et le charger dans un dictionnaire
    with open(mainDir+"/solutionsExplored/"+version+"_"+test+".txt", 'r', encoding='utf-8') as fichier:
        dataCache[test]["data"] = json.load(fichier)
    
    # Ouvrir la solution optimale
    with open(mainDir+"/example/solutions/"+test+".log", 'r', encoding='utf-8') as fichier:
        dataCache[test]["solution"] = fichier.read()

# Recherche des hypers paramètres
Les hypers paramètres pour le calcul du score sont relatifs au poids associé à la longueur, aux options, aux alignements et aux merges

Le but est de maximiser le nombre de cas qui sortent la solution attendu en premier

Définition des hypers paramètres à explorer et calcul des performances pour chacun d'eux

In [7]:
# Construction des hyperparamètres
hyperParameters = []
searchSpace:list[float] = [0, 0.25, 0.5, 0.75, 1]
for lWeight in searchSpace:
    for lWay in [True]:#[True, False]:
        for oWeight in searchSpace:
            for oWay in [True]:#[True, False]:
                for aWeight in searchSpace:
                    """if lWeight == 0 and oWeight == 0 and aWeight == 0:
                        continue
                    for aWay in [False]:#[True, False]:
                        hyperParameters.append((lWeight, lWay, oWeight, oWay, aWeight, aWay, 0, False))"""
                    for aWay in [False]:#[True, False]:
                        for mWeight in searchSpace:
                            # ne pas simuler le cas où les 3 hyper paramètres sont à 0 => tout les scénario ont un maxscore
                            if lWeight == 0 and oWeight == 0 and aWeight == 0 and mWeight == 0:
                                continue
                            for mWay in [False]:#[True, False]:
                                hyperParameters.append((lWeight, lWay, oWeight, oWay, aWeight, aWay, mWeight, mWay))

# Exploration des hyper-paramètres pour le calcul du score
betterParams = []
betterResult = 0
for paramSet in hyperParameters:
    # Calcul de la performance pour ce jeu d'hyper paramètres
    correct_counts:list[int]=computePerf(ScenarioCount, dataCache, paramSet, True)

    # Si ce jeu d'hyperparamètres a permis d'identifier plus de cas que le précédent, le retenir
    if sum(correct_counts) > betterResult:
        betterResult = sum(correct_counts)
        betterParams = [paramSet]
    elif sum(correct_counts) == betterResult:
        betterParams.append(paramSet)
    print("Parameters="+str(paramSet)+": "+str(correct_counts)+ " => "+ str(sum(correct_counts)))

print("betterResult: "+str(betterResult)+"; nbSet: "+str(len(betterParams))+"; "+str(betterParams))

Parameters=(0, True, 0, True, 0, False, 0.25, False): [10, 10, 0, 0, 3, 0, 0, 0, 0, 4, 0, 1, 2, 3, 0, 0] => 33
Parameters=(0, True, 0, True, 0, False, 0.5, False): [10, 10, 0, 0, 3, 0, 0, 0, 0, 4, 0, 1, 2, 3, 0, 0] => 33
Parameters=(0, True, 0, True, 0, False, 0.75, False): [10, 10, 0, 0, 3, 0, 0, 0, 0, 4, 0, 1, 2, 3, 0, 0] => 33
Parameters=(0, True, 0, True, 0, False, 1, False): [10, 10, 0, 0, 3, 0, 0, 0, 0, 4, 0, 1, 2, 3, 0, 0] => 33
Parameters=(0, True, 0, True, 0.25, False, 0, False): [10, 10, 10, 10, 10, 6, 4, 10, 8, 9, 3, 10, 10, 0, 1, 0] => 111
Parameters=(0, True, 0, True, 0.25, False, 0.25, False): [10, 10, 10, 10, 6, 0, 0, 3, 1, 7, 4, 10, 10, 3, 1, 0] => 85
Parameters=(0, True, 0, True, 0.25, False, 0.5, False): [10, 10, 1, 0, 5, 0, 0, 1, 1, 6, 0, 1, 6, 3, 0, 0] => 44
Parameters=(0, True, 0, True, 0.25, False, 0.75, False): [10, 10, 0, 0, 3, 0, 0, 0, 0, 6, 0, 1, 4, 3, 0, 0] => 37
Parameters=(0, True, 0, True, 0.25, False, 1, False): [10, 10, 0, 0, 3, 0, 0, 0, 0, 5, 0, 1, 2, 3

# Exploitation du meilleur jeu d'hyper paramètres

In [9]:
#outputResult = "exportResults_MAP"
outputResult = "exportResults_MAP_alter"

# Calcul des résultats pour le meilleur jeu de paramètre trouvé
paramSet = (1, True, 0.5, True, 0.25, False, 0, False)
correct_counts:list[int]=computePerf(ScenarioCount, dataCache, paramSet, True)
print("Parameters="+str(paramSet)+": "+str(correct_counts)+ " => "+ str(sum(correct_counts)))
file = open(outputResult+'_hard.txt', 'w')
file.write(str(correct_counts))
file.close()

correct_counts=computePerf(ScenarioCount, dataCache, paramSet, False)
print("Parameters="+str(paramSet)+": "+str(correct_counts)+ " => "+ str(sum(correct_counts)))
file = open(outputResult+'_soft.txt', 'w')
file.write(str(correct_counts))
file.close()


Parameters=(1, True, 0.5, True, 0.25, False, 0, False): [10, 10, 8, 5, 9, 10] => 52
Parameters=(1, True, 0.5, True, 0.25, False, 0, False): [10, 10, 10, 9, 9, 10] => 58
