# Broadcast

## But : Faire un broadcast, départ décalé 


### Informations générales

#### Le nom de l'expérience : Nom du folder contenant les fichiers sources et résultats

In [None]:
experiment_name = "Time_Bcast"
crashed = True

#### Versions

In [None]:
%%bash -s
git log -n 1
python3 --version
R --version

### Préparation de l'environnement (FRONTEND)

#### Installation des modules

In [None]:
%%bash -s
pip3 install --user execo
pip3 install --user requests

# Permet affichage pllus lisible avec couleurs
pip3 install --user termcolor

# Permettent d'utiliser R :
pip3 install --user rpy2
pip3 install --user tzlocal

#### Chargement des modules dans l'environnement

In [None]:
import execo
import math
import collections
import os
import sys
import json
import time
import random
import datetime
import re
import tzlocal

from execo import *
from execo_g5k import *
from execo_engine import *

from shutil import copy
from subprocess import check_output

from threading import Thread

from collections import deque, OrderedDict
from termcolor import *
from subprocess import *

#### Chargement de l'outil permettant l'utilisation de R

In [None]:
%load_ext rpy2.ipython

### Tools

#### Affichage coloré

In [None]:
# Termcolor
# OPTIONS  : bold, dark, underline, blink, reverse, concealed

text = colored('Hello, World!', 'red', attrs = ['blink'])
print(text)

text = colored('Hello, World!', 'red', attrs = ['dark'])
print(text)

text = colored('Hello, World!', 'red', attrs = ['underline'])
print(text)

text = colored('Hello, World!', 'red', attrs = ['bold'])
print(text)

text = colored('Hello, World!', 'red', attrs = ['concealed'])
print(text)

cprint('Hello, World!', 'green', 'on_red',attrs = ['reverse'])

#### Affichage lisible d'un dictionnaire complexe

In [None]:
# Display more readable dict
# SOURCE : https://stackoverflow.com/questions/3229419/how-to-pretty-print-nested-dictionaries
def pretty(d, indent=0):
    for key, value in d.items():
        print('\t' * indent + colored (str(key), "magenta"))
        if isinstance(value, type({})):
            pretty(value, indent+1)
        else:
            print('\t' * (indent+1) + str(value))

### Préparation de l'environnement pour l'expérience

#### Paramètres réservation

In [None]:
# Nom du job
jobname = ' '
# Nombre de noeuds
nodecount = 5
# Temps réservation
walltime = "3:0:0" 

# filters out Nantes's econome cluster
resources_selection = "-p \"cluster in ('ecotype','parasilo','grisou','uvb','paravance','genepi')\"" 

# Nancy
site = "nancy"

#Type de deploiement 
FILE_DEPLOY = True

#### Réservation

In [None]:
jobs = get_current_oar_jobs()
jobid = None
waiting_jobs = []
while jobs:
    j, site = jobs.pop()
    info = get_oar_job_info(j, site)
    if info['name'] == jobname:
        if info['state'] == 'Running':
            jobid = j
            print("A {} job is already running, using it. jobid is {}".format(jobname, jobid))
            break
        else:
            waiting_jobs.append(j)
if not jobid and not waiting_jobs:
    jobspec = OarSubmission(resources="/cluster=1/nodes={}".format(nodecount), walltime = walltime,
                            additional_options = resources_selection, job_type = "deploy", name = jobname)
    jobid, _ = oarsub([(jobspec, site)]).pop()
    print("New job submitted, jobid is {}".format(jobid))
elif not jobid:
    print("One or more {} jobs exist ({}) but are not running.\n"
          " Connect to the frontend to see what is happening, and/or run the cell again.".format(
          jobname, ", ".join([str(j) for j in waiting_jobs])))

#### Récupération des infos sur les noeuds obtenus

###### Note : Si cette cellule met du temps à répondre, vérifier l'état de la réservation sur la frontale **oartstat -u username** ou via https://www.grid5000.fr/mediawiki/index.php/Status

In [None]:
nodes = get_oar_job_nodes(jobid)
nodes.sort(key = lambda n: n.address)
nodes

##### Déploiement de l'environnement : 2 options

###### Via fichier 

In [None]:
force_redeploy = False # set to True to force redeploying the OS on the nodes in the deployment section
environment_dsc_file = 'src/debian9-x64-bigdata-tutorial.yaml' # filename of the kadeploy environment file (YAML)

In [None]:
if (FILE_DEPLOY):
    deployment = Deployment(hosts = nodes, env_file = os.path.abspath(environment_dsc_file),
                        other_options = "-r ext4 --no-debug-mode")

    deploy_ok, deploy_failed = deploy(deployment, check_deployed_command = not force_redeploy,
                              stdout_handlers = [sys.stdout],
                              stderr_handlers = [sys.stderr])
else :
    deploy_ok, deploy_failed = deploy(Deployment(nodes, env_name = "debian9-x64-base",
                              other_options = "-r ext4 --no-debug-mode"),
                              stdout_handlers = [sys.stdout],
                              stderr_handlers = [sys.stderr])

In [None]:
print("Deployement status:\n* ok: {}\n* failed: {}".format(deploy_ok, deploy_failed))

#### Test : commande simple 'ls'

In [None]:
Remote_test = execo.action.Remote(cmd = 'ls',hosts = nodes, connection_params = None, process_args = None)
Remote_test.run().ok

#### Installation environnement/tools sur noeuds

In [None]:
if (not crashed):
    Remote_install = execo.action.Remote(cmd = 'apt-get install g++ libboost-all-dev && wget http://gforge.inria.fr/frs/download.php/latestfile/8/SimGrid-3.18.tar.gz && tar -xvf SimGrid-3.18.tar.gz && cd SimGrid-3.18 && cmake -DCMAKE_INSTALL_PREFIX=/usr/local -Denable_smpi=on -Denable_documentation=off && make -j && make check && make install -j',
                                     hosts = nodes,connection_params = {'user':'root'})
    Remote_install.run().ok

In [None]:
if (not crashed):
    Pajeng_install = execo.action.Remote(cmd='git clone git://github.com/schnorr/pajeng.git ; mkdir -p pajeng/build ; cd pajeng/build ; cmake .. ; make ; make install',hosts=nodes,connection_params={'user':'root'}).run().ok
    Pajeng_install

#### Version Simgrid

In [None]:
def remote_version(software):
    execo.action.Remote(cmd = 'touch version.txt &&' + str(software) + ' --version > version.txt'
                    , hosts = nodes, connection_params = {'user': 'root'}).run()

    version = ''

    for i in range(0, nodecount):
        execo.action.Get(hosts = nodes[i], remote_files = ['~/version.txt'],
                         local_location = './version-'
                                            + get_host_shortname(nodes[i]) + '.txt',
                         connection_params = {'user': 'root'}).run()
    
        version = version + execo.Process(cmd = 'cat ./version-'
                                                + get_host_shortname(nodes[i]) 
                                                + '.txt').run().stdout
        execo.Process(cmd = 'rm ./version-'
                            + get_host_shortname(nodes[i]) + '.txt').run()

    print(colored('VERSION : ' + str(software),"blue"), '\n' + version + '\n')
    return

remote_version('smpicc')
remote_version('g++')

#### Chemins (à configurer)

In [None]:
path_base = "/home/nezzine/Stage-POLARIS/SimGrid/examples/SMPI/" + experiment_name
path_src = path_base + '/src/'
path_rslt = path_base + '/results/'

#### Traitement des données pour les calculs de D et S

In [None]:
# Intialise tableau
def init_tab(num, elt) :
    D = [elt] * num
    return D

# Parsing the results
def parser(num_node,date_precise):
    with open( '/home/nezzine/Stage-POLARIS/SimGrid/examples/SMPI/Time_Bcast/results/'
               + date_precise.split('_')[0] + '/'
               + get_host_shortname(nodes[num_node]) + '_'
               + date_precise
               + '.txt') as f:
        return f.readlines()
    
# Converting and organizing data
def tab_conv(l, NPROCS, precision) :
    S = init_tab(NPROCS, None)
        
    # On récupère les données des fichiers
    # Un fichier contient des lignes telles que : From 0 -> S[0] : 0.0819526    
    for i in range(0, NPROCS) :
        line = l[i].split()
        print("LINNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNE" + str(line))
        S[int(line[1])] = round(float(line[5]), precision)
    return S

def delayer(num_node, NPROCS, date, precision):
    D = init_tab(NPROCS, 0.0)
    
    raw_data = parser(num_node, date)
    
    S = tab_conv(raw_data, NPROCS, precision)
    
    for p in range(0, NPROCS):
        D[p] = round(S[p] - min(S), precision)
    return [D, min(S)]

In [None]:
# Example
# delayer(0,10,'2018-06-04_12:38',2)

## SIMULATION

#### Mise en place de l'environnement (fichiers src simulation...) sur noeuds

In [None]:
def setup() :
    Src_dir = execo.action.Remote(cmd = 'mkdir -p experiments_src',
                              hosts = nodes,
                              connection_params = {'user': 'root'})
    Src_get = execo.action.Put(hosts = nodes, local_files = [path_src
                                   + 'griffon.xml', path_src
                                   + 'griffon_hostfile.txt', path_src
                                   + 'broadcast.c', path_src 
                                   + 'Makefile'],
                                   remote_location = '~/experiments_src',
                                   connection_params = {'user': 'root'})
    Rslt_dir = \
        execo.Process(cmd='mkdir -p '
                  + path_rslt
                  + datetime.datetime.now().strftime('%Y-%m-%d'))

# On récup les src
    Src_dir.run().ok
    Src_get.run().ok
    Rslt_dir.run().ok


#### Défintion du thread pour parallélisation de la simulation

In [None]:
# Thread pour la simulation // opt.
class ParSimer(Thread):

    """Thread chargé simplement d'afficher une lettre dans la console."""
    
    # Paramètrage du thread
    def __init__(self,i,MIN,MAX,N,DELAY):
        Thread.__init__(self)
        self.MIN = MIN
        self.MAX = MAX
        self.N = N
        self.DELAY = DELAY
        self.i = i
    
    # Set la valeur du délai
    def set_delay(self,delay):
        self.DELAY = delay
    
    def get_delay(self):
        return self.DELAY
    
    def get_MIN(self):
        return self.MIN
    
    def run(self):
        """Code à exécuter pendant l'exécution du thread."""
        if (DEBUG): 
            print(
                get_host_shortname(nodes[self.i]) + " => " + str(self.i) + " NPROCS = " + str(NPROCS)
             + " MIN = " + str(self.MIN)
             + " MAX = " + str(self.MAX)
             + " N = " + str(self.N)
             + " DELAY = '" + self.DELAY + "'\n"
            )
       
        # make run sur toutes la machine dont le thread s'occupe avec ses paramètres
        execo.action.Remote(cmd ='cd ~/experiments_src && make && make run NPROCS='
                                  + str(NPROCS) + ' MIN='+ str(self.MIN) 
                                  + ' MAX=' + str(self.MAX) + ' N=' + str(self.N)
                                  + " DELAY='" + self.DELAY
                                  + "'" , 
                            hosts = nodes[self.i], 
                            connection_params = {'user':'root'}).run().ok


#### Conversion string < = > int pour manipulation de D & S

In [None]:
# Formate le tableau en string pour pouvoir etre passé en ligne de commande
def format_cmd(data):
    return (' '.join(str(e) for e in data))

# Formate le string en tableau d'int pour être manipulé 
def unformat_cmd(data):
    return list(map(float,data.split()))

#### Génération du nouvel état initial

In [None]:
def generate_Dprime(NPROCS,border_inf,border_sup,precision):
    # Variable pour le stockage du nouveau D
    new_Dprime = []
    
    if (border_inf == 0):
        for p in range(NPROCS):
            new_value = random.uniform(border_inf, border_sup[p])
            new_round_value = round(new_value, precision)
            new_Dprime.append(new_round_value)
    else : 
        for p in range(NPROCS):
            new_Dprime.append(round(random.uniform(border_inf[p], border_sup[p]), precision))
    return format_cmd(new_Dprime)

#### Code de la simulation

In [None]:
# SIMULATION
DEBUG = True
def simulation(N, NPROCS, fragment_size, precision, mode, catch_me):
    
    """Contrôle de la validité des paramètres"""
    # Taille du fragment ne peut être supérieure à la zone d'itération
    if (fragment_size > N): 
        raise ValueError("Fragment size cannot be superior to N")
        
    """Mise en place de l'environnement et des variables"""
    # Environnement
    setup() 
    
    # Dates: pour la gestion de fichiers (IMPORTANT pour lecture/écriture)
    date_precise = datetime.datetime.now().strftime('%Y-%m-%d_%H:%M')
    date = date_precise.split('_')[0]
    
    # Temps d'exécution total
    total_execution_time = 0
    
    # Tableau de threads
    machines = init_tab(nodecount, None)
      
    # position trajectoire cohérente courante
    t_curr = 0
    
    # D de la trajectorie cohérente courante
    D_curr = format_cmd(init_tab(NPROCS, 0.0))
    
    # Taille du dernier slot
    last_fragment_size = N % fragment_size
    
    # Nombre de fragments
    fragment_number = int((math.ceil(N / fragment_size)))
    
    # zone d'itérations à explorer
    To_explore = collections.deque()
    
    # Initialisation des zones à explorer D = '0...0' pour chaque zone T -> (Min d'itération , D initial)
    for i in range(fragment_number):
        To_explore.append((i * fragment_size, format_cmd(init_tab(NPROCS, 0.0))))

    # Round actuel
    curr_round = 0
    
    # Liste des trajectoires pour un temps t et un D donnée
    sim = OrderedDict()
    
    # Vrai si un état final a fusionné avec un état initial au moins une fois
    success = False 
    
    catch_me_ind = 0
    
    while(True):
        curr_round = curr_round + 1
        
        # Trajectoire cohérente complète trouvé : FIN DE SIMULATION
        if (t_curr >= N) :
            cprint ("Total time : " + str(total_execution_time) , "grey" , "on_yellow")
            cprint ("Success : " + str(success) , "grey" , "on_green")
            break;        
        if (len(To_explore) == 0):
            raise 
        # Il n'existe pas encore de trajectoire cohérente -> Nouveau round
        cprint("--------------------------------------------  ROUND :  " + str(curr_round) + "   --------------------------------------------\n","red", attrs=['reverse', 'blink'])
        
        # Zones restantes à explorer
        number_paths_To_explore = len(To_explore)
        
        # Nombre de tâches parallèles lançables
        number_launchable_tasks = min(nodecount, number_paths_To_explore)
        
        # Affichage des paramètres
        cprint ("  || LAUNCHING || \n","green", attrs=[ 'reverse', 'blink'])
        if (DEBUG): 
            cprint ( "TO EXPLORE : " , "blue")
            for key, value in (To_explore):
                print( colored( str(key), "green") + ' -> ' + str(value))
        
        """ Paramètrage des machines """
        
        # Soit j'ai plus de zone à explorer que de machines disponibles soit l'inverse
        for q in range(number_launchable_tasks):
            
            # On récupère la première zone à explorer
            parameters = To_explore.popleft()
            
            # MIN est le champ de gauche
            MIN = parameters[0]
            
            # MAX de la zone à explorer vaut le min + la taille du slot
            MAX = min (MIN + fragment_size, N)
            
            # D est le champ de droite
            delay = parameters[1]
            
            # Paramètrage des machines
            machines[q] = ParSimer(q, MIN, MAX, N, delay)
            #print("Machine " + str(q) + " -> MIN :" + str(MIN) + " MAX : " + str(MAX) + " D : "+ delay)   
                
        """ Simulation """  
        # Lancement de la simulation
        for q in range(number_launchable_tasks):
            machines[q].start()
            
        # Fin de simulation
        for q in range(number_launchable_tasks) :
            machines[q].join()
            # Récupération des résultats
            execo.action.Get(hosts=nodes[q],
                     remote_files=['~/experiments_src/bcast_results.txt'
                     ],
                     local_location= path_rslt
                     + date
                     + '/' + get_host_shortname(nodes[q]) + '_'
                     + date_precise + '.txt', connection_params={'user': 'root'
                     }).run().ok

            execo.action.Remote(cmd='pj_dump --ignore-incomplete-links ./experiments_src/Time_Bcast.trace > ./experiments_src/is_64_small.csv',hosts=nodes,connection_params={'user':'root'}).run()
            name = path_rslt + date + '/' + get_host_shortname(nodes[q]) + '_'+ date_precise
            execo.action.Get(hosts=nodes[q],
                     remote_files=['~/experiments_src/is_64_small.csv'
                     ],
                     local_location= path_rslt
                     + date
                     + '/' + get_host_shortname(nodes[q]) + '_'
                     + date_precise + '.csv', connection_params={'user': 'root'
                     }).run().ok
            
            execo.Process(cmd='touch ./is_64_small.state.csv').run()
            execo.Process(cmd='grep State'+ str(name)+'.csv > is_64_small.state.csv').run()
            execo.Process(cmd='touch ./is_64_small.link.csv').run()
            execo.Process(cmd='grep Link'+ str(name)+'.csv > is_64_small.link.csv').run()
            execo.Process(cmd='touch ./is_64_small.container.csv').run()
            execo.Process(cmd='grep Container'+ str(name)+'.csv > is_64_small.container.csv').run()
            execo.Process(cmd='touch ./is_64_small.variable.csv').run()
            execo.Process(cmd='grep Variable'+ str(name)+'.csv > is_64_small.variable.csv').run()
            
            
            
            """           
            execo.Process(cmd='grep State'+ str(name)+'.csv >' + str(name)+ '.state.csv')
            execo.Process(cmd='grep Link'+ str(name)+'.csv >' + str(name)+ '.link.csv')
            execo.Process(cmd='grep Container'+ str(name)+'.csv >' + str(name)+ '.container.csv')
            execo.Process(cmd='grep Variable'+ str(name)+'.csv >' + str(name)+ '.variable.csv')
            
           
           #subprocess.Popen(['bash', '-c', '. greper.sh; grepping',name])
            subprocess.call(["grep", "State", name, ".csv > is_64_small.state.csv"])
            """ 
            %R library(ggplot2)
            %R df_state = read.csv("./is_64_small.csv", header=F, strip.white=T)
            %R names(df_state) = c("Type", "Rank", "Container", "Start", "End", "Duration", "Level", "State"); 
            %R df_state = df_state[!(names(df_state) %in% c("Type","Container","Level"))]
            %R df_state$Rank = as.numeric(gsub("rank-","",df_state$Rank))
            %R head(df_state)
            %R str(df_state)
            %R gc = ggplot(data=df_state) + geom_rect(aes(xmin=Start, xmax=End, ymin=Rank, ymax=Rank+1,fill=State)) + scale_fill_brewer(palette="Set1")
            %R gc
        
          
        """ Analyse des résultats"""    
        # Traitement des données
        for q in range(number_launchable_tasks):
            # On récupère les paramètres des machines
            t = machines[q].get_MIN()
            D = machines[q].get_delay()
            
            # Calcul du délai
            # On récupère D et S
            raw_data = delayer(q, NPROCS, date_precise,precision)
            
            # Filtre D : Calcul du délai
            Dprime = format_cmd(raw_data[0])
            
            #Filte S : Calcul du temps d'éxécution total 
            T = raw_data[1]
            
            # Ajout dans ma table (Dprime, round, machine qui l'a calculé, temps)
            sim.setdefault(t,{}).update({D:(Dprime,curr_round,q,T)})
            MAX = min (t + fragment_size, N)
        
        # Affichage des paramètres
        print ( colored ("T_curr : " , "yellow") , str(t_curr))
        if (DEBUG) :
            print ( colored ("D_curr : " , "yellow") , str(D_curr))
            print ( colored ("Dprime : ", "yellow") , str(Dprime))
        print ( colored ("total execution time : ", "red") , str(total_execution_time) + "\n") 
        
        merged_paths = 0
        # Calcul de la trajectoire cohérente
        while (t_curr < N and t_curr in sim and D_curr in sim[t_curr]):
            merged_paths += 1
            # Update le temps d'exécution
            total_execution_time += sim[t_curr][D_curr][3]
            
            # Update du temps d'exécution dans le dernier fragment
            if (t_curr + fragment_size >= N) : total_execution_time += max(unformat_cmd(sim[t_curr][D_curr][0]))
            
            # Update des t_curr et D_curr
            D_curr = sim[t_curr][D_curr][0]
            t_curr = min(t_curr + fragment_size, N)
        if (merged_paths > 1) : 
            cprint("****** SUCCESS : " + str(merged_paths), attrs=['reverse', 'bold'])
            success = True 
        
        # Affichage des résultats
        cprint ("\n  || RESULTS || \n","red", attrs=[ 'reverse', 'blink'])
        if (DEBUG):
            cprint ( "TO EXPLORE : " , "blue")
            for key, value in (To_explore):
                print( colored( str(key), "green") + ' -> ' + str(value)) 
            pretty (sim) 
        print ( colored ("T_curr : " , "yellow", attrs=[ 'bold']), str(t_curr))
        
        if (DEBUG):
            print ( colored ("D_curr : " , "yellow", attrs=[ 'bold']), str(D_curr))
            print ( colored ("Dprime : ", "yellow", attrs=[ 'bold']) , str(Dprime))
            
        print ( colored ("total execution time : ", "red", attrs=[ 'bold']) , str(total_execution_time) + "\n") 
        
        
        """ Nouveau paramètrages """
        cprint("  || ANALYSIS || \n","magenta", attrs=[ 'reverse', 'blink'])
        while (To_explore and To_explore[0][0] <= t_curr):
            To_explore.popleft()
        
        for q in reversed(range(number_launchable_tasks)):
            t = machines[q].get_MIN()
            D = machines[q].get_delay()
            
            MAX = min(t + fragment_size, N)
            Dprime = sim[t][D][0]
           
            # Zone valide
            if(MAX < N):
                # Zone pas encore relié à la trajectoire initiale
                if (MAX > t_curr):
                    caught = False
                # Ajout dans les zones à explorer du nouveau point de départ à exploiter en fonction du mode
                    # Test catch me sur 2^catch_me_ind round
                    if (catch_me and 1 << catch_me_ind == curr_round):
                        catch_me_ind += 1
                        try :
                            caught = Dprime in sim[MAX]
                        except KeyError:
                            pass 
                       
                    if (not catch_me or not caught):
                        borders = unformat_cmd(Dprime)
                        double_Dprime = [(2*(x)) for x in borders]
                      
                        # Bornes pour le random 
                        border_inf = 0
                        border_sup = double_Dprime
                        if (mode == "RAND_SUP") : 
                            border_sup = borders
                            Dprime =  generate_Dprime(NPROCS, border_inf, border_sup,precision)
                        elif (mode == "RAND_INF") : 
                            border_inf = borders
                            Dprime = generate_Dprime(NPROCS, border_inf, border_sup,precision)
                        elif (mode == "RAND") : 
                            Dprime =  generate_Dprime(NPROCS, border_inf, border_sup,precision)
                        if (DEBUG) : print("["+str(MAX) +"] Generated " + str(mode) + " Dprime : " + str(Dprime))
                    # Mode fix implicite : append sans modification des valeurs MAX et Dprime    
                    To_explore.appendleft((MAX, Dprime))
        
        if (t_curr != N):
            To_explore.appendleft((t_curr, D_curr))
        if (DEBUG):
            cprint ( "TO EXPLORE : " , "blue")
            for key, value in (To_explore):
                print( colored( str(key), "green") + ' -> ' + str(value))
        

## SIMULATION

In [None]:
# Nbr pcs
NPROCS = 10
    
# Nbr itérations
N = 100

# taille paquet d'itérations
fragment_size = 10

precision = 4

# mode de génération de l'état initial à chaque round
mode_opt = ["FIX","RAND","RAND_INF","RAND_SUP"]
mode = mode_opt[1]

catch_me = False

# Lancement de la simulation
#simulation(N,NPROCS,fragment_size,precision,mode,catch_me)

In [None]:
# Nbr pcs
NPROCS = 1000
    
# Nbr itérations
N = 100

# taille paquet d'itérations
fragment_size = 10

precision = 2

# mode de génération de l'état initial à chaque round
mode_opt = ["FIX","RAND","RAND_INF","RAND_SUP"]
mode = mode_opt[1]

catch_me = False

# Lancement de la simulation
#simulation(N,NPROCS,fragment_size,precision,mode,catch_me)

#### Publication des résultats sur GitHub : https://github.com/HooBaeBoo/Stage-POLARIS

In [None]:
%%bash -s $experiment_name $site
git add .
git commit -m 'Automatic results update : '"$1"' --- '"$2 ""R tutorial"

#### Fin d'expérience : suppression du job

In [None]:
# oardel([(jobid,site)])

In [None]:
execo.Process(cmd='touch ./test2').run()

In [None]:
execo.process.Process(cmd = ' echo "hey"  > version.txt').run()