In [1]:
import pickle
from distributed import Client, LocalCluster
from dask_jobqueue import SGECluster
import numpy as np
from plasp.config import SAVE_DIR
from plasp.dataloader import MULANLoader, LIBSVMLoader, Synthesizer, FoldsLoader
from plasp.scripts.grid_search import iterate_on_datasets

In [2]:
def experiment(dataset, Loader, methods):
    datasets = [dataset]
    corruptions = [i / 10 for i in range(10)]
    nb_folds = 10
    floader = FoldsLoader(Loader, cross_on_weak_loss=False)
    C_sigmas = np.logspace(-2, 2, 5)
    C_lambdas = np.logspace(-3, 3, 7)
    np.random.seed(0)
    res_dict = iterate_on_datasets(datasets, corruptions, nb_folds, floader,
                                   methods, C_sigmas, C_lambdas, low_rank=200)
    if floader.wk:
        dataset += '_weak'
    with open(SAVE_DIR / (dataset + '.pk'), 'wb') as f:
        pickle.dump(res_dict, f)
    return res_dict


def MULAN_experiment(dataset):
    methods = ['plasp', 'low rank', 'diffusion']
    return experiment(dataset, MULANLoader, methods)


def LIBSVM_experiment(dataset):
    methods = ['plasp', 'averaging', 'diffusion']
    return experiment(dataset, LIBSVMLoader, methods)


def synthetic_experiment(dataset):
    methods = ['plasp', 'diffrac', 'diffusion', 'low rank']
    return experiment(dataset, Synthesizer, methods)

In [11]:
def get_dask_cluster(on_server, n_workers):
    if on_server:
        env_extra = [
            'conda activate main',
            'source /home/vcabanne/.bashrc',
            'export LANG=en_US.UTF-8',
            'export LC_ALL=en_US.UTF-8'
        ]
        env_extra = [
            '$ -e /sequoia/data2/vcabanne/logs',
            '$ -o /sequoia/data2/vcabanne/logs',
        ] + env_extra

        local_directory = '/sequoia/data2/vcabanne/dask-spill'

        cluster = SGECluster(queue='all.q,bigmem.q',
                             resource_spec='h_vmem=12000M,mem_req=8000M',
                             walltime='720:00:00',
                             name='plasp',
                             cores=1,
                             memory='8000m',
                             processes=1,
                             interface='en0',
                             env_extra=env_extra,
                             local_directory=local_directory)
        cluster.scale(n_workers)

    else:
        cluster = LocalCluster(n_workers=n_workers, threads_per_worker=1,
                               processes=False)

    return cluster


In [12]:
# architecture
on_server = True
n_workers = 1
cluster = get_dask_cluster(on_server, n_workers)
client = Client(cluster)

# MULAN data
MULAN_datasets = MULANLoader.datasets
MULAN_datasets = [
    'birds',
    'CAL500',
    'emotions',
    'enron',
    'flags',
    'genbase',
    'medical',
    'scene',
    'yeast'
]
futures = client.map(MULAN_experiment, MULAN_datasets)
# results = client.gather(futures)

# LIBSSM data
LIBSVM_datasets = LIBSVMLoader.datasets
LIBSVM_datasets = [
    'iris',
    'wine',
    'svmguide2',
    'glass',
    'svmguide4',
    'vehicle',
    'vowel',
    'dna'
]
futures = client.map(LIBSVM_experiment, LIBSVM_datasets)
# results = client.gather(futures)

# Synthetic data
datasets = ['CL-1000', 'CL-400', 'CL-100',
            'ML-1000', 'ML-400', 'ML-100']
futures = client.map(synthetic_experiment, datasets)
results = client.gather(futures)

Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /Users/viviencabannes/miniconda3/envs/main/lib/python3.7/asyncio/tasks.py:623> exception=FileNotFoundError(2, "No such file or directory: 'qsub'")>
Traceback (most recent call last):
  File "/Users/viviencabannes/miniconda3/envs/main/lib/python3.7/asyncio/tasks.py", line 630, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/Users/viviencabannes/miniconda3/envs/main/lib/python3.7/site-packages/distributed/deploy/spec.py", line 50, in _
    await self.start()
  File "/Users/viviencabannes/miniconda3/envs/main/lib/python3.7/site-packages/dask_jobqueue/core.py", line 310, in start
    out = await self._submit_job(fn)
  File "/Users/viviencabannes/miniconda3/envs/main/lib/python3.7/site-packages/dask_jobqueue/core.py", line 293, in _submit_job
    return self._call(shlex.split(self.submit_command) + [script_filename])
  File "/Users/viviencabannes/miniconda3/envs/main/

CancelledError: 

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x1a24863bd0>>, <Task finished coro=<SpecCluster._close() done, defined at /Users/viviencabannes/miniconda3/envs/main/lib/python3.7/site-packages/distributed/deploy/spec.py:373> exception=AttributeError("'NoneType' object has no attribute 'close'")>)
Traceback (most recent call last):
  File "/Users/viviencabannes/miniconda3/envs/main/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/Users/viviencabannes/miniconda3/envs/main/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/Users/viviencabannes/miniconda3/envs/main/lib/python3.7/site-packages/distributed/deploy/spec.py", line 386, in _close
    await self.scheduler_comm.close(close_workers=True)
AttributeError: 'NoneType' object has no attribute 'close'
tornado.ap