Skip to content

tornado.iostream.StreamClosedError: Stream is closed #4635

@ashwin-raghavan

Description

@ashwin-raghavan

I am trying to build a model in my dask worker. below is the code. my dask workers are running on kubernetes env. I have opened a NodePort to connect to the scheduler at 30006.

import numpy as np
from dask.distributed import Client


import joblib
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC

client = Client('127.0.0.1:30006', timeout=10000)
client.get_versions(check=True)
import pandas as pd
digits = load_digits()

param_space = {
    'C': np.logspace(-6, 6, 13),
    'gamma': np.logspace(-8, 8, 17),
    'tol': np.logspace(-4, -1, 4),
    'class_weight': [None, 'balanced'],
}

model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=3, n_iter=50, verbose=10)


with joblib.parallel_backend('dask'): #Running it on dask worker
    search.fit(digits.data, digits.target)`

Below is the error:

Error
Traceback (most recent call last):
  File "C:\Users\user\AppData\Local\Programs\Python\Python38\lib\unittest\case.py", line 60, in testPartExecutor
    yield
  File "C:\Users\user\AppData\Local\Programs\Python\Python38\lib\unittest\case.py", line 676, in run
    self._callTestMethod(testMethod)
  File "C:\Users\user\AppData\Local\Programs\Python\Python38\lib\unittest\case.py", line 633, in _callTestMethod
    method()
  File "C:\Users\user\Documents\code\test_index.py", line 68, in test_train_and_save
    search.fit(digits.data, digits.target)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\sklearn\utils\validation.py", line 63, in inner_f
    return f(*args, **kwargs)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\sklearn\model_selection\_search.py", line 841, in fit
    self._run_search(evaluate_candidates)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\sklearn\model_selection\_search.py", line 1619, in _run_search
    evaluate_candidates(ParameterSampler(
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\sklearn\model_selection\_search.py", line 795, in evaluate_candidates
    out = parallel(delayed(_fit_and_score)(clone(base_estimator),
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\parallel.py", line 1054, in __call__
    self.retrieve()
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\parallel.py", line 933, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
TypeError: 'CancelledError' object is not iterable




Ran 1 test in 2.190s

FAILED (errors=1)
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x0000027E6579B070>>, <Task finished name='Task-159' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py:323> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 195, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
    ret = callback()
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
    future.result()
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 275, in maybe_to_futures
    f = await call_data_futures[arg]
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 324, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 311, in _to_func_args
    args = list(await maybe_to_futures(args))
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 301, in maybe_to_futures
    f = await t
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\distributed\client.py", line 2057, in _scatter
    await self.scheduler.scatter(
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\distributed\core.py", line 861, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\distributed\core.py", line 644, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 205, in read
    convert_stream_closed_error(self, e)
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 128, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc)) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x0000027E6579B070>>, <Task finished name='Task-160' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py:323> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "C:\Users\user\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 195, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

Below is my package information:

{
	"scheduler": {
		"host": {
			"python": "3.8.0.final.0",
			"python-bits": 64,
			"OS": "Linux",
			"OS-release": "5.4.72-microsoft-standard-WSL2",
			"machine": "x86_64",
			"processor": "",
			"byteorder": "little",
			"LC_ALL": "C.UTF-8",
			"LANG": "C.UTF-8"
		},
		"packages": {
			"python": "3.8.0.final.0",
			"dask": "2021.03.0",
			"distributed": "2021.03.0",
			"msgpack": "1.0.0",
			"cloudpickle": "1.6.0",
			"tornado": "6.1",
			"toolz": "0.11.1",
			"numpy": "1.18.1",
			"lz4": "3.1.1",
			"blosc": "1.9.2"
		}
	},
	"workers": {
		"tcp://10.1.1.108:45085": {
			"host": {
				"python": "3.8.0.final.0",
				"python-bits": 64,
				"OS": "Linux",
				"OS-release": "5.4.72-microsoft-standard-WSL2",
				"machine": "x86_64",
				"processor": "",
				"byteorder": "little",
				"LC_ALL": "C.UTF-8",
				"LANG": "C.UTF-8"
			},
			"packages": {
				"python": "3.8.0.final.0",
				"dask": "2021.03.0",
				"distributed": "2021.03.0",
				"msgpack": "1.0.0",
				"cloudpickle": "1.6.0",
				"tornado": "6.1",
				"toolz": "0.11.1",
				"numpy": "1.18.1",
				"lz4": "3.1.1",
				"blosc": "1.9.2"
			}
		},
		"tcp://10.1.1.109:34133": {
			"host": {
				"python": "3.8.0.final.0",
				"python-bits": 64,
				"OS": "Linux",
				"OS-release": "5.4.72-microsoft-standard-WSL2",
				"machine": "x86_64",
				"processor": "",
				"byteorder": "little",
				"LC_ALL": "C.UTF-8",
				"LANG": "C.UTF-8"
			},
			"packages": {
				"python": "3.8.0.final.0",
				"dask": "2021.03.0",
				"distributed": "2021.03.0",
				"msgpack": "1.0.0",
				"cloudpickle": "1.6.0",
				"tornado": "6.1",
				"toolz": "0.11.1",
				"numpy": "1.18.1",
				"lz4": "3.1.1",
				"blosc": "1.9.2"
			}
		},
		"tcp://10.1.1.110:35383": {
			"host": {
				"python": "3.8.0.final.0",
				"python-bits": 64,
				"OS": "Linux",
				"OS-release": "5.4.72-microsoft-standard-WSL2",
				"machine": "x86_64",
				"processor": "",
				"byteorder": "little",
				"LC_ALL": "C.UTF-8",
				"LANG": "C.UTF-8"
			},
			"packages": {
				"python": "3.8.0.final.0",
				"dask": "2021.03.0",
				"distributed": "2021.03.0",
				"msgpack": "1.0.0",
				"cloudpickle": "1.6.0",
				"tornado": "6.1",
				"toolz": "0.11.1",
				"numpy": "1.18.1",
				"lz4": "3.1.1",
				"blosc": "1.9.2"
			}
		}
	},
	"client": {
		"host": {
			"python": "3.8.6.final.0",
			"python-bits": 64,
			"OS": "Windows",
			"OS-release": "10",
			"machine": "AMD64",
			"processor": "Intel64 Family 6 Model 142 Stepping 10, GenuineIntel",
			"byteorder": "little",
			"LC_ALL": "None",
			"LANG": "None"
		},
		"packages": {
			"python": "3.8.6.final.0",
			"dask": "2021.03.0",
			"distributed": "2021.03.0",
			"msgpack": "1.0.0",
			"cloudpickle": "1.6.0",
			"tornado": "6.1",
			"toolz": "0.11.1",
			"numpy": "1.19.5",
			"lz4": "None",
			"blosc": "None"
		}
	}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions