Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ jobs:
python setup.py sdist
last_dist=$(ls -t dist/auto-sklearn-*.tar.gz | head -n 1)
pip install $last_dist
- name: Store repository status
id: status-before
run: |
echo "::set-output name=BEFORE::$(git status --porcelain -b)"
- name: Conda Run tests
if: matrix.use-conda == true
run: |
Expand All @@ -66,15 +70,26 @@ jobs:
# to change the default python
export PATH="$CONDA/envs/testenv/bin:$PATH"
if [ ${{ matrix.code-cov }} ]; then codecov='--cov=autosklearn --cov-report=xml'; fi
$CONDA/envs/testenv/bin/python3 -m pytest --durations=20 -sv $codecov test
$CONDA/envs/testenv/bin/python3 -m pytest --durations=20 --timeout=300 --timeout-method=thread -v $codecov test
- name: Run tests
if: matrix.use-conda == false
run: |
export OPENBLAS_NUM_THREADS=1
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
if [ ${{ matrix.code-cov }} ]; then codecov='--cov=autosklearn --cov-report=xml'; fi
pytest --durations=20 -sv $codecov test
pytest --durations=20 --timeout=300 --timeout-method=thread -v $codecov test
- name: Check for files left behind by test
if: ${{ always() }}
run: |
before="${{ steps.status-before.outputs.BEFORE }}"
after="$(git status --porcelain -b)"
if [[ "$before" != "$after" ]]; then
echo "git status from before: $before"
echo "git status from after: $after"
echo "Not all generated files have been deleted!"
exit 1
fi
- name: Upload coverage
if: matrix.code-cov && always()
uses: codecov/codecov-action@v1
Expand Down
21 changes: 21 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,24 @@ number_submission
.pypirc
dmypy.json
*.log

# Dask created work space
dask-worker-space

# Python distribution generated files
.eggs

# Unit test / coverage reports
htmlcov/
cover
coverage
htmlcov
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
prof/
12 changes: 5 additions & 7 deletions autosklearn/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import tempfile

from ConfigSpace.read_and_write import json as cs_json
import dask
import dask.distributed
import numpy as np
import numpy.ma as ma
Expand Down Expand Up @@ -230,10 +231,11 @@ def __init__(self,

def _create_dask_client(self):
self._is_dask_client_internally_created = True
dask.config.set({'distributed.worker.daemon': False})
self._dask_client = dask.distributed.Client(
dask.distributed.LocalCluster(
n_workers=self._n_jobs,
processes=False,
processes=True,
threads_per_worker=1,
# We use the temporal directory to save the
# dask workers, because deleting workers
Expand Down Expand Up @@ -269,9 +271,7 @@ def _get_logger(self, name):
# This is gonna be honored by the server
# Which is created below
setup_logger(
output_file=os.path.join(
self._backend.temporary_directory, '%s.log' % str(logger_name)
),
filename='%s.log' % str(logger_name),
logging_config=self.logging_config,
output_dir=self._backend.temporary_directory,
)
Expand All @@ -294,9 +294,7 @@ def _get_logger(self, name):
logname=logger_name,
event=self.stop_logging_server,
port=port,
output_file=os.path.join(
self._backend.temporary_directory, '%s.log' % str(logger_name)
),
filename='%s.log' % str(logger_name),
logging_config=self.logging_config,
output_dir=self._backend.temporary_directory,
),
Expand Down
18 changes: 15 additions & 3 deletions autosklearn/ensemble_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,11 @@ def __init__(

# Setup the logger
self.logger_port = logger_port
self.logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)
self.logger = get_named_client_logger(
name='EnsembleBuilder',
port=self.logger_port,
output_dir=self.backend.temporary_directory,
)

if ensemble_nbest == 1:
self.logger.debug("Behaviour depends on int/float: %s, %s (ensemble_nbest, type)" %
Expand Down Expand Up @@ -556,7 +560,11 @@ def run(
elif time_left is not None and end_at is not None:
raise ValueError('Cannot provide both time_left and end_at.')

self.logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)
self.logger = get_named_client_logger(
name='EnsembleBuilder',
port=self.logger_port,
output_dir=self.backend.temporary_directory,
)

process_start_time = time.time()
while True:
Expand Down Expand Up @@ -627,7 +635,11 @@ def main(self, time_left, iteration, return_predictions):
# Pynisher jobs inside dask 'forget'
# the logger configuration. So we have to set it up
# accordingly
self.logger = get_named_client_logger('EnsembleBuilder', port=self.logger_port)
self.logger = get_named_client_logger(
name='EnsembleBuilder',
port=self.logger_port,
output_dir=self.backend.temporary_directory,
)

self.start_time = time.time()
train_pred, valid_pred, test_pred = None, None, None
Expand Down
6 changes: 5 additions & 1 deletion autosklearn/util/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ def __init__(self,
)
)
self._output_directory = output_directory
self._logger = logging.get_logger(__name__)
self.create_directories()
# This is the first place the logger gets created.
# We want to make sure any logging forward sets the correct directory
# were all files should be created
logging.setup_logger(output_dir=self._temporary_directory)
self._logger = logging.get_logger(__name__)

@property
def output_directory(self) -> Optional[str]:
Expand Down
3 changes: 3 additions & 0 deletions autosklearn/util/hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
def hash_array_or_matrix(X: np.ndarray) -> str:
m = hashlib.md5()

if hasattr(X, "iloc"):
X = X.to_numpy()

if scipy.sparse.issparse(X):
m.update(X.indices)
m.update(X.indptr)
Expand Down
79 changes: 53 additions & 26 deletions autosklearn/util/logging_.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,29 @@


def setup_logger(
output_file: Optional[str] = None,
output_dir: str,
filename: Optional[str] = None,
distributedlog_filename: Optional[str] = None,
logging_config: Optional[Dict] = None,
output_dir: Optional[str] = None,
) -> None:
# logging_config must be a dictionary object specifying the configuration
# for the loggers to be used in auto-sklearn.
if logging_config is not None:
if output_file is not None:
logging_config['handlers']['file_handler']['filename'] = output_file
if output_dir is not None:
logging_config['handlers']['distributed_logfile']['filename'] = os.path.join(
output_dir, 'distributed.log'
)
logging.config.dictConfig(logging_config)
else:
if logging_config is None:
with open(os.path.join(os.path.dirname(__file__), 'logging.yaml'), 'r') as fh:
logging_config = yaml.safe_load(fh)
if output_file is not None:
logging_config['handlers']['file_handler']['filename'] = output_file
if output_dir is not None:
logging_config['handlers']['distributed_logfile']['filename'] = os.path.join(
output_dir, 'distributed.log'
)
logging.config.dictConfig(logging_config)

if filename is None:
filename = logging_config['handlers']['file_handler']['filename']
logging_config['handlers']['file_handler']['filename'] = os.path.join(
output_dir, filename
)

if distributedlog_filename is None:
distributedlog_filename = logging_config['handlers']['distributed_logfile']['filename']
logging_config['handlers']['distributed_logfile']['filename'] = os.path.join(
output_dir, distributedlog_filename
)
logging.config.dictConfig(logging_config)


def _create_logger(name: str) -> logging.Logger:
Expand Down Expand Up @@ -107,15 +106,22 @@ def isEnabledFor(self, level: int) -> bool:


def get_named_client_logger(
output_dir: str,
name: str,
host: str = 'localhost',
port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT,
) -> 'PicklableClientLogger':
logger = PicklableClientLogger(name, host, port)
logger = PicklableClientLogger(
output_dir=output_dir,
name=name,
host=host,
port=port
)
return logger


def _get_named_client_logger(
output_dir: str,
name: str,
host: str = 'localhost',
port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT,
Expand All @@ -133,6 +139,8 @@ def _get_named_client_logger(

Parameters
----------
outputdir: (str)
The path where the log files are going to be dumped
name: (str)
the name of the logger, used to tag the messages in the main log
host: (str)
Expand All @@ -143,7 +151,7 @@ def _get_named_client_logger(
local_loger: a logger object that has a socket handler
"""
# Setup the logger configuration
setup_logger()
setup_logger(output_dir=output_dir)

local_logger = _create_logger(name)

Expand All @@ -159,11 +167,17 @@ def _get_named_client_logger(

class PicklableClientLogger(PickableLoggerAdapter):

def __init__(self, name: str, host: str, port: int):
def __init__(self, output_dir: str, name: str, host: str, port: int):
self.output_dir = output_dir
self.name = name
self.host = host
self.port = port
self.logger = _get_named_client_logger(name, host, port)
self.logger = _get_named_client_logger(
output_dir=output_dir,
name=name,
host=host,
port=port
)

def __getstate__(self) -> Dict[str, Any]:
"""
Expand All @@ -174,7 +188,12 @@ def __getstate__(self) -> Dict[str, Any]:
Dictionary, representing the object state to be pickled. Ignores
the self.logger field and only returns the logger name.
"""
return {'name': self.name, 'host': self.host, 'port': self.port}
return {
'name': self.name,
'host': self.host,
'port': self.port,
'output_dir': self.output_dir,
}

def __setstate__(self, state: Dict[str, Any]) -> None:
"""
Expand All @@ -189,7 +208,13 @@ def __setstate__(self, state: Dict[str, Any]) -> None:
self.name = state['name']
self.host = state['host']
self.port = state['port']
self.logger = _get_named_client_logger(self.name, self.host, self.port)
self.output_dir = state['output_dir']
self.logger = _get_named_client_logger(
name=self.name,
host=self.host,
port=self.port,
output_dir=self.output_dir,
)


class LogRecordStreamHandler(socketserver.StreamRequestHandler):
Expand Down Expand Up @@ -242,11 +267,13 @@ def start_log_server(
logname: str,
event: threading.Event,
port: multiprocessing.Value,
output_file: str,
filename: str,
logging_config: Dict,
output_dir: str,
) -> None:
setup_logger(output_file, logging_config, output_dir)
setup_logger(filename=filename,
logging_config=logging_config,
output_dir=output_dir)

while True:
# Loop until we find a valid port
Expand Down
5 changes: 4 additions & 1 deletion test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import unittest.mock

import dask
from dask.distributed import Client, get_client
import psutil
import pytest
Expand Down Expand Up @@ -124,7 +125,8 @@ def dask_client(request):
Workers are in subprocesses to not create deadlocks with the pynisher and logging.
"""

client = Client(n_workers=2, threads_per_worker=1, processes=False)
dask.config.set({'distributed.worker.daemon': False})
client = Client(n_workers=2, threads_per_worker=1, processes=True)
print("Started Dask client={}\n".format(client))

def get_finalizer(address):
Expand All @@ -149,6 +151,7 @@ def dask_client_single_worker(request):
it is used very rarely to avoid this issue as much as possible.
"""

dask.config.set({'distributed.worker.daemon': False})
client = Client(n_workers=1, threads_per_worker=1, processes=False)
print("Started Dask client={}\n".format(client))

Expand Down
Loading