Skip to content
This repository has been archived by the owner on Jul 10, 2024. It is now read-only.

Commit

Permalink
SUBMARINE-1002. Remove worker index in params and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
KUAN-HSUN-LI committed Aug 31, 2021
1 parent 605d674 commit 2490be6
Show file tree
Hide file tree
Showing 19 changed files with 85 additions and 173 deletions.
26 changes: 13 additions & 13 deletions dev-support/database/submarine-data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,23 @@ INSERT INTO `team` VALUES ('e9ca23d68d884d4ebb19d07889721234', 'admin', 'Submari
-- ----------------------------
-- Records of metrics
-- ----------------------------
INSERT INTO `metrics` (`id`, `key`, `value`, `worker_index`, `timestamp`, `step`, `is_nan`) VALUES
('application_123651651', 'score', 0.666667, 'worker-1', 1569139525097, 0, 0),
('application_123651651', 'score', 0.666670, 'worker-1', 1569149139731, 1, 0),
('experiment_1595332719154_0001', 'score', 0.666667, 'worker-1', 1569169376482, 0, 0),
('experiment_1595332719154_0001', 'score', 0.666671, 'worker-1', 1569236290721, 0, 0),
('experiment_1595332719154_0001', 'score', 0.666680, 'worker-1', 1569236466722, 0, 0);
INSERT INTO `metrics` (`id`, `key`, `value`, `timestamp`, `step`, `is_nan`) VALUES
('application_123651651', 'score', 0.666667, 1569139525097, 0, 0),
('application_123651651', 'score', 0.666670, 1569149139731, 1, 0),
('experiment_1595332719154_0001', 'score', 0.666667, 1569169376482, 0, 0),
('experiment_1595332719154_0001', 'score', 0.666671, 1569236290721, 1, 0),
('experiment_1595332719154_0001', 'score', 0.666680, 1569236466722, 2, 0);

-- ----------------------------
-- Records of params
-- ----------------------------
INSERT INTO `params` (`id`, `key`, `value`, `worker_index`) VALUES
('application_123651651', 'max_iter', '100', 'worker-1'),
('application_123456898', 'n_jobs', '5', 'worker-1'),
('application_123456789', 'alpha', '20', 'worker-1'),
('experiment_1595332719154_0001', 'max_iter', '100', 'worker-1'),
('experiment_1595332719154_0002', 'n_jobs', '5', 'worker-1'),
('experiment_1595332719154_0003', 'alpha', '20', 'worker-1');
INSERT INTO `params` (`id`, `key`, `value`) VALUES
('application_123651651', 'max_iter', '100'),
('application_123456898', 'n_jobs', '5'),
('application_123456789', 'alpha', '20'),
('experiment_1595332719154_0001', 'max_iter', '100'),
('experiment_1595332719154_0002', 'n_jobs', '5'),
('experiment_1595332719154_0003', 'alpha', '20');

-- ----------------------------
-- Records of environment
Expand Down
6 changes: 2 additions & 4 deletions dev-support/database/submarine-model.sql
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,10 @@ CREATE TABLE `metrics` (
`id` varchar(64) NOT NULL COMMENT 'Id of the Experiment',
`key` varchar(190) NOT NULL COMMENT 'Metric key: `String` (limit 190 characters). Part of *Primary Key* for ``metrics`` table.',
`value` float NOT NULL COMMENT 'Metric value: `Float`. Defined as *Non-null* in schema.',
`worker_index` varchar(32) NOT NULL COMMENT 'Metric worker_index: `String` (limit 32 characters). Part of *Primary Key* for\r\n ``metrics`` table.',
`timestamp` bigint(20) NOT NULL COMMENT 'Timestamp recorded for this metric entry: `BigInteger`. Part of *Primary Key* for ``metrics`` table.',
`step` bigint(11) NOT NULL COMMENT 'Step recorded for this metric entry: `BigInteger`.',
`is_nan` BOOLEAN NOT NULL COMMENT 'True if the value is in fact NaN.',
CONSTRAINT `metrics_pk` PRIMARY KEY (`id`, `key`, `timestamp`, `worker_index`)
CONSTRAINT `metrics_pk` PRIMARY KEY (`id`, `key`, `timestamp`),
FOREIGN KEY(`id`) REFERENCES `experiment` (`id`) ON UPDATE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Expand All @@ -72,8 +71,7 @@ CREATE TABLE `params` (
`id` varchar(64) NOT NULL COMMENT 'Id of the Experiment',
`key` varchar(190) NOT NULL COMMENT '`String` (limit 190 characters). Part of *Primary Key* for ``params`` table.',
`value` varchar(32) NOT NULL COMMENT '`String` (limit 190 characters). Defined as *Non-null* in schema.',
`worker_index` varchar(32) NOT NULL COMMENT '`String` (limit 32 characters). Part of *Primary Key* for\r\n ``metrics`` table.',
CONSTRAINT `params_pk` PRIMARY KEY (`id`, `key`, `worker_index`)
CONSTRAINT `params_pk` PRIMARY KEY (`id`, `key`),
FOREIGN KEY(`id`) REFERENCES `experiment` (`id`) ON UPDATE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Expand Down
8 changes: 1 addition & 7 deletions submarine-sdk/pysubmarine/submarine/entities/Metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ class Metric(_SubmarineObject):
Metric object.
"""

def __init__(self, key, value, worker_index, timestamp, step):
def __init__(self, key, value, timestamp, step):
self._key = key
self._value = value
self._worker_index = worker_index
self._timestamp = timestamp
self._step = step

Expand All @@ -38,11 +37,6 @@ def value(self):
"""Float value of the metric."""
return self._value

@property
def worker_index(self):
"""string value of the metric."""
return self._worker_index

@property
def timestamp(self):
"""Metric timestamp as an integer (milliseconds since the Unix epoch)."""
Expand Down
8 changes: 1 addition & 7 deletions submarine-sdk/pysubmarine/submarine/entities/Param.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ class Param(_SubmarineObject):
Parameter object.
"""

def __init__(self, key, value, worker_index):
def __init__(self, key, value):
self._key = key
self._value = value
self._worker_index = worker_index

@property
def key(self):
Expand All @@ -35,8 +34,3 @@ def key(self):
def value(self):
"""String value of the parameter."""
return self._value

@property
def worker_index(self):
"""String value of the parameter."""
return self._worker_index
56 changes: 20 additions & 36 deletions submarine-sdk/pysubmarine/submarine/store/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,14 @@ def to_submarine_entity(self):
"""
return ModelVersionTag(self.tag)

# +--------------------+-------+-------------------+--------------+---------------+------+--------+
# | id | key | value | worker_index | timestamp | step | is_nan |
# +--------------------+-------+-------------------+--------------+---------------+------+--------+
# | application_123456 | score | 0.666666666666667 | worker-1 | 1595414873838 | 0 | 0 |
# | application_123456 | score | 0.666666666666667 | worker-1 | 1595472286360 | 0 | 0 |
# | application_123456 | score | 0.666666666666667 | worker-1 | 1595414632967 | 0 | 0 |
# | application_123456 | score | 0.666666666666667 | worker-1 | 1595415075067 | 0 | 0 |
# +--------------------+-------+-------------------+--------------+---------------+------+--------+
# +--------------------+-------+-------------------+---------------+------+--------+
# | id | key | value | timestamp | step | is_nan |
# +--------------------+-------+-------------------+---------------+------+--------+
# | application_123456 | score | 0.666666666666667 | 1595414873838 | 0 | 0 |
# | application_123456 | score | 0.666666666666667 | 1595472286360 | 0 | 0 |
# | application_123456 | score | 0.666666666666667 | 1595414632967 | 0 | 0 |
# | application_123456 | score | 0.666666666666667 | 1595415075067 | 0 | 0 |
# +--------------------+-------+-------------------+---------------+------+--------+


class SqlMetric(Base):
Expand All @@ -296,11 +296,6 @@ class SqlMetric(Base):
"""
Metric value: `Float`. Defined as *Non-null* in schema.
"""
worker_index = Column(String(64))
"""
Metric worker_index: `String` (limit 32 characters). Part of *Primary Key* for
``metrics`` table.
"""
timestamp = Column(BigInteger, default=lambda: int(time.time()))
"""
Timestamp recorded for this metric entry: `BigInteger`. Part of *Primary Key* for
Expand All @@ -318,14 +313,12 @@ class SqlMetric(Base):
__table_args__ = (PrimaryKeyConstraint('id',
'key',
'timestamp',
'worker_index',
name='metric_pk'),)

def __repr__(self):
return '<SqlMetric({}, {}, {}, {}, {})>'.format(self.key, self.value,
self.worker_index,
self.timestamp,
self.step)
return '<SqlMetric({}, {}, {}, {})>'.format(self.key, self.value,
self.timestamp,
self.step)

def to_submarine_entity(self):
"""
Expand All @@ -334,18 +327,17 @@ def to_submarine_entity(self):
"""
return Metric(key=self.key,
value=self.value if not self.is_nan else float("nan"),
worker_index=self.worker_index,
timestamp=self.timestamp,
step=self.step)


# +-----------------------+----------+-------+--------------+
# | id | key | value | worker_index |
# +-----------------------+----------+-------+--------------+
# | application_123456898 | max_iter | 100 | worker-1 |
# | application_123456898 | alpha | 10 | worker-1 |
# | application_123456898 | n_jobs | 5 | worker-1 |
# +-----------------------+----------+-------+--------------+
# +-----------------------+----------+-------+
# | id | key | value |
# +-----------------------+----------+-------+
# | application_123456898 | max_iter | 100 |
# | application_123456898 | alpha | 10 |
# | application_123456898 | n_jobs | 5 |
# +-----------------------+----------+-------+


class SqlParam(Base):
Expand All @@ -364,26 +356,18 @@ class SqlParam(Base):
"""
Param value: `String` (limit 190 characters). Defined as *Non-null* in schema.
"""
worker_index = Column(String(32), nullable=False)
"""
Param worker_index: `String` (limit 32 characters). Part of *Primary Key* for
``metrics`` table.
"""

__table_args__ = (PrimaryKeyConstraint('id',
'key',
'worker_index',
name='param_pk'),)

def __repr__(self):
return '<SqlParam({}, {}, {})>'.format(self.key, self.value,
self.worker_index)
return '<SqlParam({}, {})>'.format(self.key, self.value)

def to_submarine_entity(self):
"""
Convert DB model to corresponding submarine entity.
:return: :py:class:`submarine.entities.Param`.
"""
return Param(key=self.key,
value=self.value,
worker_index=self.worker_index)
value=self.value)
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ def log_metric(self, job_id, metric):
id=job_id,
key=metric.key,
value=value,
worker_index=metric.worker_index,
timestamp=metric.timestamp,
step=metric.step,
session=session,
Expand All @@ -156,8 +155,7 @@ def log_param(self, job_id, param):
id=job_id,
session=session,
key=param.key,
value=param.value,
worker_index=param.worker_index)
value=param.value)
session.commit()
except sqlalchemy.exc.IntegrityError:
session.rollback()
16 changes: 4 additions & 12 deletions submarine-sdk/pysubmarine/submarine/tracking/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,30 @@ def __init__(self, tracking_uri=None):
self.tracking_uri = tracking_uri or utils.get_tracking_uri()
self.store = utils.get_sqlalchemy_store(self.tracking_uri)

def log_metric(self,
job_id,
key,
value,
worker_index,
timestamp=None,
step=None):
def log_metric(self, job_id, key, value, timestamp=None, step=None):
"""
Log a metric against the run ID.
:param job_id: The job name to which the metric should be logged.
:param key: Metric name.
:param value: Metric value (float). Note that some special values such
as +/- Infinity may be replaced by other values depending on the store. For
example, the SQLAlchemy store replaces +/- Inf with max / min float values.
:param worker_index: Metric worker_index (string).
:param timestamp: Time when this metric was calculated. Defaults to the current system time.
:param step: Training step (iteration) at which was the metric calculated. Defaults to 0.
"""
timestamp = timestamp if timestamp is not None else int(time.time())
step = step if step is not None else 0
validate_metric(key, value, timestamp, step)
metric = Metric(key, value, worker_index, timestamp, step)
metric = Metric(key, value, timestamp, step)
self.store.log_metric(job_id, metric)

def log_param(self, job_id, key, value, worker_index):
def log_param(self, job_id, key, value):
"""
Log a parameter against the job name. Value is converted to a string.
:param job_id: The job name to which the parameter should be logged.
:param key: Parameter name.
:param value: Parameter value (string).
:param worker_index: Parameter worker_index (string).
"""
validate_param(key, value)
param = Param(key, str(value), worker_index)
param = Param(key, str(value))
self.store.log_param(job_id, param)
10 changes: 4 additions & 6 deletions submarine-sdk/pysubmarine/submarine/tracking/fluent.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import time

from submarine.tracking.client import SubmarineClient
from submarine.tracking.utils import get_job_id, get_worker_index
from submarine.tracking.utils import get_job_id

_RUN_ID_ENV_VAR = "SUBMARINE_RUN_ID"
_active_run_stack = []
Expand All @@ -37,8 +37,7 @@ def log_param(key, value):
:param value: Parameter value (string, but will be string-field if not)
"""
job_id = get_job_id()
worker_index = get_worker_index()
SubmarineClient().log_param(job_id, key, value, worker_index)
SubmarineClient().log_param(job_id, key, value)


def log_metric(key, value, step=None):
Expand All @@ -51,6 +50,5 @@ def log_metric(key, value, step=None):
:param step: Metric step (int). Defaults to zero if unspecified.
"""
job_id = get_job_id()
worker_index = get_worker_index()
SubmarineClient().log_metric(job_id, key, value, worker_index,
int(time.time() * 1000), step or 0)
SubmarineClient().log_metric(job_id, key, value, int(time.time() * 1000),
step or 0)
8 changes: 3 additions & 5 deletions submarine-sdk/pysubmarine/tests/entities/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,19 @@
from submarine.entities import Metric


def _check(metric, key, value, worker_index, timestamp, step):
def _check(metric, key, value, timestamp, step):
assert type(metric) == Metric
assert metric.key == key
assert metric.value == value
assert metric.worker_index == worker_index
assert metric.timestamp == timestamp
assert metric.step == step


def test_creation_and_hydration():
key = "alpha"
value = 10000
worker_index = 1
ts = int(time.time())
step = 0

metric = Metric(key, value, worker_index, ts, step)
_check(metric, key, value, worker_index, ts, step)
metric = Metric(key, value, ts, step)
_check(metric, key, value, ts, step)
8 changes: 3 additions & 5 deletions submarine-sdk/pysubmarine/tests/entities/test_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@
from submarine.entities import Param


def _check(param, key, value, worker_index):
def _check(param, key, value):
assert type(param) == Param
assert param.key == key
assert param.value == value
assert param.worker_index == worker_index


def test_creation_and_hydration():
key = "alpha"
value = 10000
worker_index = 1

param = Param(key, value, worker_index)
_check(param, key, value, worker_index)
param = Param(key, value)
_check(param, key, value)
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def tearDown(self):
models.Base.metadata.drop_all(self.store.engine)

def test_log_param(self):
param1 = Param("name_1", "a", "worker-1")
param1 = Param("name_1", "a")
self.store.log_param(JOB_ID, param1)

# Validate params
Expand All @@ -53,12 +53,11 @@ def test_log_param(self):
.filter(SqlParam.id == JOB_ID).all()
assert params[0].key == "name_1"
assert params[0].value == "a"
assert params[0].worker_index == "worker-1"
assert params[0].id == JOB_ID

def test_log_metric(self):
metric1 = Metric("name_1", 5, "worker-1", int(time.time()), 0)
metric2 = Metric("name_1", 6, "worker-2", int(time.time()), 0)
metric1 = Metric("name_1", 5, int(time.time()), 0)
metric2 = Metric("name_1", 6, int(time.time()), 0)
self.store.log_metric(JOB_ID, metric1)
self.store.log_metric(JOB_ID, metric2)

Expand All @@ -71,7 +70,5 @@ def test_log_metric(self):
assert len(metrics) == 2
assert metrics[0].key == "name_1"
assert metrics[0].value == 5
assert metrics[0].worker_index == "worker-1"
assert metrics[0].id == JOB_ID
assert metrics[1].value == 6
assert metrics[1].worker_index == "worker-2"
Loading

0 comments on commit 2490be6

Please sign in to comment.