# Paramètres globaux, initialisation

In [None]:
nom_bandes = ['400', '412', '442', '490', '510', '560', '620', '665', 
              '674', '681', '709', '754', '761', '764', '768', '779', 
              '865', '885', '900', '940', '1020']
ind_bandes = '1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21'

runLUTtool  = '/home/jovyan/bin/run6SV.ksh'
REP_in6SV   = 'NotUsed'
satellite   = 'SENTINEL3B'
capteur     = 'OLCIB'
bande       = '400' #Il est possible aussi d'itérer sur toutes les bandes
AER_NOM     = 'DESERT'
AER_TYPE    = '5'
NOMFICBANDE = 'Bande_%s_%s_%s.dat' % (satellite, capteur, bande)
REP_out6SV  = 'NotUsed'
REP_CODE_TR = 'NotUsed'
REP_WORK    = 'NotUsed'
ficJobArray = 'NotUsed'

FIC_CAPTEUR='/home/jovyan/aux/rep6s_SENTINEL3B_OLCIB.dat'
FICALLBANDS=FIC_CAPTEUR

In [None]:
import pandas as pd
def generate_fic_band_6sV(fic_capteur, band_name, fic_out, nom_bandes):
    all_bands_df = pd.read_csv(fic_capteur, delim_whitespace=True, names=['index'] + nom_bandes)
    #On ne garde que la bande qui nous intéresse
    band_df = all_bands_df.loc[:, ['index', band_name]]
    #On ne garde que les valeur supérieurs à 0
    band_df = band_df[band_df[band_name] > 0]

    # Génération au format 6S : 1ere ligne longueur d'onde début et fin
    # lignes suivantes, valeurs pour la bande donnée
    index_values = list(band_df['index'])
    band_values = list(band_df.iloc[:,1])
    with open(fic_out, 'w') as f:
        f.write("%.7E  %.7E\n" % (index_values[0], index_values[-1]))
        for value in band_values:
            f.write("%.7E\n" % value)

In [None]:
# Génération du fichier bande
generate_fic_band_6sV(FIC_CAPTEUR, bande, ('/home/jovyan/Bande_SENTINEL3B_OLCIB_%s.dat' % bande), nom_bandes)

# Les paramètres de simu

In [None]:
#### Parametres de la LUT
fields = ('u03', 'uw', 'ep_opt', 'psurf', 'alti', 'phi', 'thetas', 'thetav')

# Azimuts relatifs
tab_phi    = ('0', '20', '40', '60', '80', '100', '120', '140', '160', '180')
# Angles zénitaux d'incidence
tab_thetas = ('0', '5', '10', '15', '20', '25', '30', '35', '40', '45', '50', '55', '60')
# Angles zénitaux de visée
tab_thetav = ('0', '5', '10', '15', '20', '25', '30', '35', '40', '45', '50', '55', '60')
# Vapeurs d'épaisseurs optiques
tab_ep_opt = ('0.1', '0.2', '0.3', '0.4', '0.5', '0.6', '0.7')
# Contenus en vapeur d'eau
tab_uw     = ('0.0', '0.5', '1.0', '1.5', '2.0', '2.5', '3.0', '3.5', '4.0', '4.5', '5.0')
# Contenus en ozone
tab_uO3    = ('0.20', '0.40')
# Altitudes
tab_alti   = ('0.100', '0.355', '0.700')
# Pressions atmosphériques
tab_psurf  = ('940.0', '990.0', '1013.0', '1040.0')

field_tabs = {'phi': tab_phi, 'thetas': tab_thetas, 'thetav': tab_thetav, 
              'ep_opt': tab_ep_opt, 'uw': tab_uw, 'u03': tab_uO3, 'alti': tab_alti, 'psurf': tab_psurf}

In [None]:
%%time
#Création des paramètres de simulation
lines = []
for u03 in tab_uO3:
    for uw in tab_uw:
        for ep_opt in tab_ep_opt:
            for psurf in tab_psurf:
                for alti in tab_alti:
                    for phi in tab_phi:
                        for thetas in tab_thetas:
                            for thetav in tab_thetav:
                                lines += [' '.join((runLUTtool, REP_in6SV, satellite, capteur, bande, AER_NOM, AER_TYPE, thetav, phi, thetas, ep_opt, uw, u03, psurf, alti, NOMFICBANDE, REP_out6SV, REP_CODE_TR, REP_WORK))]


In [None]:
print(len(lines))
print(lines[10])

In [None]:
# Exécution d'un appel à sisv
import os
from subprocess import Popen, PIPE
def run_6v(cmd):
    process = Popen(cmd.split(' '), stdout=PIPE, cwd='/home/jovyan/bin')
    output, err = process.communicate()
    exit_code = process.wait()
    if exit_code != 0:
        raise ValueError('Echec de la commande ' + cmd)
    return output

In [None]:
run_6v(lines[15])

# Dask sur Kubernetes

In [None]:
from dask_kubernetes import KubeCluster
cluster = KubeCluster()
cluster.scale(4)
cluster

In [None]:
from dask.distributed import Client
client = Client(cluster)
client

## Test de soumission

In [None]:
future = client.submit(run_6v, lines[15])
result = client.gather(future)
result

In [None]:
%%time
futures = client.map(run_6v, lines[:1000])
results = client.gather(futures)

### Mode adaptatif

In [None]:
cluster.adapt(minimum=2, maximum=40)

In [None]:
futures = client.map(run_6v, lines[:10000])
results = client.gather(futures)

In [None]:
results[0]

In [None]:
results[9999]

In [None]:
cluster.close()