From ce535fcbb92b60eb5b5f7ef2f89daa6c72dd3e8c Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Wed, 8 Jul 2020 21:31:41 +0800 Subject: [PATCH 01/26] fix optimizer zero_grad --- .../submarine/ml/pytorch/model/base_pytorch_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py index b862a0ed9..1dfe7f6a1 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py @@ -72,8 +72,8 @@ def train(self, train_loader): sample, target = batch output = self.model(sample) loss = self.loss(output, target) - loss.backward() self.optimizer.zero_grad() + loss.backward() self.optimizer.step() def evaluate(self): From fdcda050cb9530fc061abb9c34f6c9ed386807aa Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Wed, 8 Jul 2020 21:53:16 +0800 Subject: [PATCH 02/26] fix layers/core.py --- .../submarine/ml/pytorch/layers/core.py | 39 ++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py index 6fff591d4..aeb24fc3f 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py @@ -19,46 +19,41 @@ from torch import nn -class FieldLinear(nn.Module): +class FeatureLinear(nn.Module): - def __init__(self, field_dims, out_features): + def __init__(self, num_features, out_features): """ - :param field_dims: List of dimensions of each field. + :param num_features: number of total features. :param out_features: The number of output features. """ super().__init__() - self.weight = nn.Embedding(num_embeddings=sum(field_dims), + self.weight = nn.Embedding(num_embeddings=num_features, embedding_dim=out_features) self.bias = nn.Parameter(torch.zeros((out_features,))) - self.register_buffer( - 'offset', - torch.as_tensor([0, *accumulate(field_dims)][:-1], - dtype=torch.long)) - def forward(self, x): + def forward(self, feature_idx, feature_value): """ - :param x: torch.LongTensor (batch_size, num_fields) + :param feature_idx: torch.LongTensor (batch_size, num_fields) + :param feature_value: torch.LongTensor (batch_size, num_fields) """ - return torch.sum(self.weight(x + self.offset), dim=1) + self.bias + return torch.sum( + self.weight(feature_idx) * feature_value.unsqueeze(dim=-1), + dim=1) + self.bias -class FieldEmbedding(nn.Module): +class FeatureEmbedding(nn.Module): - def __init__(self, field_dims, embedding_dim): + def __init__(self, num_features, embedding_dim): super().__init__() - self.weight = nn.Embedding(num_embeddings=sum(field_dims), + self.weight = nn.Embedding(num_embeddings=num_features, embedding_dim=embedding_dim) - self.register_buffer( - 'offset', - torch.as_tensor([0, *accumulate(field_dims)][:-1], - dtype=torch.long)) - def forward(self, x): + def forward(self, feature_idx, feature_value): """ - :param x: torch.LongTensor (batch_size, num_fields) + :param feature_idx: torch.LongTensor (batch_size, num_fields) + :param feature_value: torch.LongTensor (batch_size, num_fields) """ - return self.weight( - x + self.offset) # (batch_size, num_fields, embedding_dim) + return self.weight(feature_idx) * feature_value.unsqueeze(dim=-1) class PairwiseInteraction(nn.Module): From f57d7327fbbde48b89d8a2a5aa621ddf641b87d1 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Wed, 8 Jul 2020 21:59:22 +0800 Subject: [PATCH 03/26] fix deepfm --- .../submarine/ml/pytorch/model/ctr/deepfm.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py index 6c955d797..53e7a7572 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py @@ -16,7 +16,7 @@ import torch from torch import nn -from submarine.ml.pytorch.layers.core import (DNN, FieldEmbedding, FieldLinear, +from submarine.ml.pytorch.layers.core import (DNN, FeatureEmbedding, FeatureLinear, PairwiseInteraction) from submarine.ml.pytorch.model.base_pytorch_model import BasePyTorchModel @@ -30,25 +30,26 @@ def model_fn(self, params): class _DeepFM(nn.Module): - def __init__(self, field_dims, embedding_dim, out_features, hidden_units, + def __init__(self, num_fields, num_features, embedding_dim, out_features, hidden_units, dropout_rates, **kwargs): super().__init__() - self.field_linear = FieldLinear(field_dims=field_dims, + self.field_linear = FeatureLinear(num_features=num_features, out_features=out_features) - self.field_embedding = FieldEmbedding(field_dims=field_dims, + self.field_embedding = FeatureEmbedding(num_features=num_features, embedding_dim=embedding_dim) self.pairwise_interaction = PairwiseInteraction() - self.dnn = DNN(in_features=len(field_dims) * embedding_dim, + self.dnn = DNN(in_features=num_fields * embedding_dim, out_features=out_features, hidden_units=hidden_units, dropout_rates=dropout_rates) - def forward(self, x): + def forward(self, feature_idx, feature_value): """ - :param x: torch.LongTensor (batch_size, num_fields) + :param feature_idx: torch.LongTensor (batch_size, num_fields) + :param feature_value: torch.LongTensor (batch_size, num_fields) """ - emb = self.field_embedding(x) # (batch_size, num_fields, embedding_dim) - linear_logit = self.field_linear(x) + emb = self.field_embedding(feature_idx, feature_value) # (batch_size, num_fields, embedding_dim) + linear_logit = self.field_linear(feature_idx, feature_value) fm_logit = self.pairwise_interaction(emb) deep_logit = self.dnn(torch.flatten(emb, start_dim=1)) From 74714087d3d4985816462de624f1c8c010f304be Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Wed, 8 Jul 2020 22:18:03 +0800 Subject: [PATCH 04/26] fix data input_fn and fileio --- .../ml/pytorch/input/libsvm_dataset.py | 110 +++++++++++++----- .../pysubmarine/submarine/utils/fileio.py | 66 +++-------- 2 files changed, 96 insertions(+), 80 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py index a0c8a4ed7..a3cd42dc0 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py @@ -13,59 +13,107 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pandas as pd +import numpy as np import torch from torch.utils.data import DataLoader, Dataset from torch.utils.data.distributed import DistributedSampler -from submarine.utils.fileio import read_file +from submarine.utils.fileio import open_buffered_file_reader, file_info + +from tqdm.auto import tqdm + +import os +import itertools +import functools +import multiprocessing as mp +from typing import List, Tuple class LIBSVMDataset(Dataset): - def __init__(self, path): - self.data, self.label = self.preprocess_data(read_file(path)) + def __init__(self, data_uri: str, sample_offset: np.ndarray): + self.data_uri = data_uri + self.sample_offset = sample_offset + + def __len__(self) -> int: + return len(self.sample_offset) + + def __getitem__(self, idx) -> Tuple[torch.Tensor, torch.Tensor, int]: + with open_buffered_file_reader(self.data_uri) as infile: + infile.seek(self.sample_offset[idx], io.SEEK_SET) + sample = infile.readline() + return LIBSVMDataset.parse_sample(sample) - def __getitem__(self, idx): - return self.data.iloc[idx], self.label.iloc[idx] + @classmethod + def parse_sample(cls, + sample: bytes) -> Tuple[torch.Tensor, torch.Tensor, int]: + label, *entries = sample.rstrip(b'\n').split(b' ') + feature_idx = torch.zeros(len(entries), dtype=torch.long) + feature_value = torch.zeros(len(entries), dtype=torch.float) + for i, entry in enumerate(entries): + fidx, fvalue = entry.split(b':') + feature_idx[i], feature_value[i] = int(fidx), float(fvalue) + return feature_idx, feature_value, int(label) - def __len__(self): - return len(self.data) + @classmethod + def prepare_dataset(cls, data_uri: str, n_jobs: int = os.cpu_count()): + sample_offset = LIBSVMDataset._locate_sample_offsets(data_uri=data_uri, + n_jobs=n_jobs) + return LIBSVMDataset(data_uri=data_uri, sample_offset=sample_offset) - def preprocess_data(self, stream): + @classmethod + def _locate_sample_offsets(cls, data_uri: str, n_jobs: int) -> np.ndarray: + finfo = file_info(data_uri) + chunk_size, _ = divmod(finfo.size, n_jobs) - def _convert_line(line): - feat_ids = [] - feat_vals = [] - for x in line: - feat_id, feat_val = x.split(':') - feat_ids.append(int(feat_id)) - feat_vals.append(float(feat_val)) - return (torch.as_tensor(feat_ids, dtype=torch.int64), - torch.as_tensor(feat_vals, dtype=torch.float32)) + chunk_starts = [0] + with open_buffered_file_reader(data_uri) as infile: + while infile.tell() < finfo.size: + infile.seek(chunk_size, os.SEEK_CUR) + infile.readline() + chunk_starts.append(min(infile.tell(), finfo.size)) - df = pd.read_table(stream, header=None) - df = df.loc[:, 0].str.split(n=1, expand=True) - label = df.loc[:, 0].apply(int) - data = df.loc[:, 1].str.split().apply(_convert_line) - return data, label + with mp.Pool(processes=n_jobs, + initializer=tqdm.set_lock, + initargs=(tqdm.get_lock(),), + maxtasksperchild=1) as pool: + return np.asarray( + list( + itertools.chain.from_iterable( + pool.imap(functools.partial( + LIBSVMDataset._locate_sample_offsets_job, data_uri), + iterable=enumerate( + zip(chunk_starts[:-1], + chunk_starts[1:])))))) - def collate_fn(self, batch): - data, label = tuple(zip(*batch)) - _, feat_val = tuple(zip(*data)) - return (torch.stack(feat_val, dim=0).type(torch.long), - torch.as_tensor(label, dtype=torch.float32).unsqueeze(dim=-1)) + @classmethod + def _locate_sample_offsets_job( + cls, data_uri: str, task: Tuple[int, Tuple[int, int]]) -> List[int]: + job_id, (start, end) = task + offsets = [start] + with open_buffered_file_reader(data_uri) as infile: + infile.seek(start, os.SEEK_SET) + with tqdm(total=None, + desc=f'[Loacate Sample Offsets] job: {job_id}', + position=job_id, + disable=('DISABLE_TQDM' in os.environ)) as pbar: + while infile.tell() < end: + infile.readline() + offsets.append(infile.tell()) + pbar.update() + assert offsets.pop() == end + return offsets def libsvm_input_fn(filepath, batch_size=256, num_threads=1, **kwargs): def _input_fn(): - dataset = LIBSVMDataset(filepath) + dataset = LIBSVMDataset.prepare_dataset(data_uri=filepath, + n_jobs=num_threads) sampler = DistributedSampler(dataset) return DataLoader(dataset=dataset, batch_size=batch_size, sampler=sampler, - num_workers=num_threads, - collate_fn=dataset.collate_fn) + num_workers=0) # should be 0 (pytorch bug) return _input_fn diff --git a/submarine-sdk/pysubmarine/submarine/utils/fileio.py b/submarine-sdk/pysubmarine/submarine/utils/fileio.py index 699e1a5cb..2ad5f44b5 100644 --- a/submarine-sdk/pysubmarine/submarine/utils/fileio.py +++ b/submarine-sdk/pysubmarine/submarine/utils/fileio.py @@ -13,60 +13,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -import io -import os -from enum import Enum -from urllib.parse import urlparse - from pyarrow import fs +from pyarrow.lib import NativeFile - -class _Scheme(Enum): - HDFS = 'hdfs' - FILE = 'file' - DEFAULT = '' - - -def read_file(path): - scheme, host, port, path = _parse_path(path) - if _Scheme(scheme) is _Scheme.HDFS: - return _read_hdfs(host=host, port=port, path=path) - else: - return _read_local(path=path) - - -def write_file(buffer, path): - scheme, host, port, path = _parse_path(path) - if _Scheme(scheme) is _Scheme.HDFS: - _write_hdfs(buffer=buffer, host=host, port=port, path=path) - else: - _write_local(buffer=buffer, path=path) - - -def _parse_path(path): - p = urlparse(path) - return p.scheme, p.hostname, p.port, os.path.abspath(p.path) - - -def _read_hdfs(host, port, path): - hdfs = fs.HadoopFileSystem(host=host, port=port) - with hdfs.open_input_stream(path) as stream: - data = stream.read() - return io.BytesIO(data) +import io -def _read_local(path): - with open(path, mode='rb') as f: - data = f.read() - return io.BytesIO(data) +def open_buffered_file_reader( + uri: str, + buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> io.BufferedReader: + try: + input_file = open_input_file(uri) + return io.BufferedReader(input_file, buffer_size=buffer_size) + except: + input_file.close() -def _write_hdfs(buffer, host, port, path): - hdfs = fs.HadoopFileSystem(host=host, port=port) - with hdfs.open_output_stream(path) as stream: - stream.write(buffer.getbuffer()) +def open_input_file(uri: str) -> NativeFile: + filesystem, path = fs.FileSystem.from_uri(uri) + return filesystem.open_input_file(path) -def _write_local(buffer, path): - with open(path, mode='wb') as f: - f.write(buffer.getbuffer()) +def file_info(uri: str) -> fs.FileInfo: + filesystem, path = fs.FileSystem.from_uri(uri) + info, = filesystem.get_file_info([path]) + return info From 3f80bc61cf32ae8b13ad8cacd95d830885341337 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 17:41:09 +0800 Subject: [PATCH 05/26] fix fileio --- .../pysubmarine/example/pytorch/deepfm.json | 20 ++++----- .../ml/pytorch/input/libsvm_dataset.py | 2 +- .../ml/pytorch/model/base_pytorch_model.py | 38 +++++++++-------- .../pysubmarine/submarine/utils/fileio.py | 41 +++++++++++++++++-- 4 files changed, 68 insertions(+), 33 deletions(-) diff --git a/submarine-sdk/pysubmarine/example/pytorch/deepfm.json b/submarine-sdk/pysubmarine/example/pytorch/deepfm.json index a1c70690b..a7264e1a8 100644 --- a/submarine-sdk/pysubmarine/example/pytorch/deepfm.json +++ b/submarine-sdk/pysubmarine/example/pytorch/deepfm.json @@ -10,10 +10,10 @@ "metric": "roc_auc" }, "training": { - "batch_size": 64, - "num_epochs": 1, + "batch_size": 512, + "num_epochs": 3, "log_steps": 10, - "num_threads": 0, + "num_threads": 2, "num_gpus": 0, "seed": 42, "mode": "distributed", @@ -22,12 +22,12 @@ "model": { "name": "ctr.deepfm", "kwargs": { - "field_dims": [15, 52, 30, 19, 111, 51, 26, 19, 53, 5, 13, 8, 23, 21, 77, 25, 39, 11, - 8, 61, 15, 3, 34, 75, 30, 79, 11, 85, 37, 10, 94, 19, 5, 32, 6, 12, 42, 18, 23], + "num_fields": 39, + "num_features": 117581, "out_features": 1, - "embedding_dim": 16, - "hidden_units": [400, 400], - "dropout_rates": [0.2, 0.2] + "embedding_dim": 256, + "hidden_units": [400, 400, 400], + "dropout_rates": [0.3, 0.3, 0.3] } }, "loss": { @@ -37,12 +37,12 @@ "optimizer": { "name": "adam", "kwargs": { - "lr": 1e-3 + "lr": 5e-4 } }, "resource": { "num_cpus": 4, "num_gpus": 0, - "num_threads": 0 + "num_threads": 2 } } diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py index a3cd42dc0..a535ccc7b 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py @@ -40,7 +40,7 @@ def __len__(self) -> int: def __getitem__(self, idx) -> Tuple[torch.Tensor, torch.Tensor, int]: with open_buffered_file_reader(self.data_uri) as infile: - infile.seek(self.sample_offset[idx], io.SEEK_SET) + infile.seek(self.sample_offset[idx], os.SEEK_SET) sample = infile.readline() return LIBSVMDataset.parse_sample(sample) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py index 1dfe7f6a1..a9cfe0050 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py @@ -68,32 +68,34 @@ def __del__(self): distributed.destroy_process_group() def train(self, train_loader): - for _, batch in enumerate(train_loader): - sample, target = batch - output = self.model(sample) - loss = self.loss(output, target) - self.optimizer.zero_grad() - loss.backward() - self.optimizer.step() + self.model.train() + with torch.enable_grad(): + for _, batch in enumerate(train_loader): + feature_idx, feature_value, label = batch + output = self.model(feature_idx, feature_value).squeeze() + loss = self.loss(output, label.float()) + self.optimizer.zero_grad() + loss.backward() + self.optimizer.step() def evaluate(self): outputs = [] - targets = [] + labels = [] valid_loader = get_from_registry(self.input_type, input_fn_registry)( filepath=self.params['input']['valid_data'], **self.params['training'])() - + self.model.eval() with torch.no_grad(): for _, batch in enumerate(valid_loader): - sample, target = batch - output = self.model(sample) + feature_idx, feature_value, label = batch + output = self.model(feature_idx, feature_value).squeeze() outputs.append(output) - targets.append(target) + labels.append(label) return self.metric( - torch.cat(targets, dim=0).cpu().numpy(), + torch.cat(labels, dim=0).cpu().numpy(), torch.cat(outputs, dim=0).cpu().numpy()) def predict(self): @@ -102,12 +104,12 @@ def predict(self): test_loader = get_from_registry(self.input_type, input_fn_registry)( filepath=self.params['input']['test_data'], **self.params['training'])() - + self.model.eval() with torch.no_grad(): for _, batch in enumerate(test_loader): - sample, _ = batch - output = self.model(sample) - outputs.append(output) + feature_idx, feature_value, _ = batch + output = self.model(feature_idx, feature_value).squeeze() + outputs.append(torch.sigmoid(output)) return torch.cat(outputs, dim=0).cpu().numpy() @@ -141,7 +143,7 @@ def save_checkpoint(self): 'optimizer': self.optimizer.state_dict() }, buffer) write_file(buffer, - path=os.path.join( + uri=os.path.join( self.params['output']['save_model_dir'], 'ckpt.pkl')) def model_fn(self, params): diff --git a/submarine-sdk/pysubmarine/submarine/utils/fileio.py b/submarine-sdk/pysubmarine/submarine/utils/fileio.py index 2ad5f44b5..cae64df4e 100644 --- a/submarine-sdk/pysubmarine/submarine/utils/fileio.py +++ b/submarine-sdk/pysubmarine/submarine/utils/fileio.py @@ -17,7 +17,9 @@ from pyarrow.lib import NativeFile import io - +from urllib.parse import urlparse +from pathlib import Path +from typing import Tuple def open_buffered_file_reader( uri: str, @@ -25,16 +27,47 @@ def open_buffered_file_reader( try: input_file = open_input_file(uri) return io.BufferedReader(input_file, buffer_size=buffer_size) - except: + except Exception as e: input_file.close() + raise e +def open_buffered_stream_writer( + uri: str, + buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> io.BufferedWriter: + try: + output_stream = open_output_stream(uri) + return io.BufferedWriter(output_stream, buffer_size=buffer_size) + except Exception as e: + output_stream.close() + raise e + +def write_file(buffer: io.BytesIO, + uri: str, + buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> None: + with open_buffered_stream_writer(uri, buffer_size=buffer_size) as output_stream: + output_stream.write(buffer.getbuffer()) def open_input_file(uri: str) -> NativeFile: - filesystem, path = fs.FileSystem.from_uri(uri) + filesystem, path = _parse_uri(uri) return filesystem.open_input_file(path) +def open_output_stream(uri: str) -> NativeFile: + filesystem, path = _parse_uri(uri) + return filesystem.open_output_stream(path) def file_info(uri: str) -> fs.FileInfo: - filesystem, path = fs.FileSystem.from_uri(uri) + filesystem, path = _parse_uri(uri) info, = filesystem.get_file_info([path]) return info + +def _parse_uri(uri: str) -> Tuple[fs.FileSystem, str]: + parsed = urlparse(uri) + uri = uri if parsed.scheme else str(Path(parsed.path).expanduser().resolve()) + filesystem, path = fs.FileSystem.from_uri(uri) + return filesystem, path + + + + + + From 380358c0ada4ed51f9c5afe1e7d9c3692a0d8c69 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 18:08:26 +0800 Subject: [PATCH 06/26] fix testing --- .../pysubmarine/tests/ml/pytorch/model/conftest.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py b/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py index 997a709b7..0605e7c85 100644 --- a/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py +++ b/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py @@ -18,10 +18,9 @@ import pytest # noqa -LIBSVM_DATA = """ -0 0:0 1:0 2:0 3:0 4:0 5:0 6:0 7:0 8:0 9:0 10:0 11:0 12:0 13:0 14:0 15:24 16:38 17:0 18:0 19:60 20:0 21:0 22:33 23:74 24:29 25:78 26:0 27:84 28:36 29:0 30:0 31:0 32:0 33:31 34:0 35:0 36:41 37:0 38:22 +LIBSVM_DATA = """0 0:0 1:0 2:0 3:0 4:0 5:0 6:0 7:0 8:0 9:0 10:0 11:0 12:0 13:0 14:0 15:24 16:38 17:0 18:0 19:60 20:0 21:0 22:33 23:74 24:29 25:78 26:0 27:84 28:36 29:0 30:0 31:0 32:0 33:31 34:0 35:0 36:41 37:0 38:22 0 0:1 1:1 2:1 3:1 4:1 5:1 6:1 7:0 8:1 9:0 10:1 11:0 12:1 13:0 14:1 15:0 16:0 17:0 18:1 19:60 20:1 21:0 22:33 23:74 24:0 25:78 26:1 27:0 28:0 29:1 30:1 31:0 32:1 33:0 34:0 35:0 36:0 37:0 38:0 -0 0:1 1:1 2:2 3:2 4:2 5:2 6:2 7:0 8:2 9:0 10:2 11:1 12:2 13:1 14:2 15:1 16:1 17:0 18:0 19:60 20:1 21:0 22:0 23:74 24:1 25:78 26:2 27:84 28:1 29:2 30:2 31:1 32:2 33:1 34:1 35:0 36:1 37:1 38:1 +0 0:1 1:1 2:2 3:2 4:2 5:2 6:2 7:0 8:2 9:0 10:2 11:1 12:2 13:1 14:2 15:1 16:1 17:0 18:0 19:60 20:1 21:0 22:0 23:74 24:1 25:78 26:2 27:84 28:1 29:2 30:2 31:1 32:2 33:1 34:1 35:0 36:1 37:1 38:1 0 0:2 1:2 2:3 3:3 4:3 5:3 6:3 7:1 8:3 9:1 10:3 11:0 12:3 13:0 14:3 15:24 16:38 17:0 18:1 19:60 20:1 21:0 22:1 23:0 24:29 25:0 26:2 27:1 28:36 29:3 30:3 31:1 32:2 33:31 34:0 35:0 36:2 37:1 38:1 0 0:3 1:3 2:3 3:0 4:4 5:4 6:2 7:1 8:3 9:0 10:1 11:0 12:4 13:2 14:4 15:24 16:2 17:0 18:2 19:60 20:1 21:0 22:33 23:74 24:2 25:78 26:0 27:84 28:2 29:3 30:93 31:1 32:2 33:2 34:0 35:1 36:3 37:1 38:1 0 0:2 1:3 2:3 3:3 4:5 5:3 6:3 7:1 8:4 9:1 10:3 11:0 12:3 13:3 14:76 15:24 16:38 17:1 18:3 19:0 20:1 21:0 22:0 23:1 24:29 25:1 26:2 27:84 28:36 29:4 30:93 31:1 32:2 33:31 34:2 35:2 36:41 37:1 38:1 @@ -56,7 +55,7 @@ def get_model_param(tmpdir): "batch_size": 4, "num_epochs": 1, "log_steps": 10, - "num_threads": 0, + "num_threads": 1, "num_gpus": 0, "seed": 42, "mode": "distributed", @@ -65,11 +64,8 @@ def get_model_param(tmpdir): "model": { "name": "ctr.deepfm", "kwargs": { - "field_dims": [ - 15, 52, 30, 19, 111, 51, 26, 19, 53, 5, 13, 8, 23, 21, 77, - 25, 39, 11, 8, 61, 15, 3, 34, 75, 30, 79, 11, 85, 37, 10, - 94, 19, 5, 32, 6, 12, 42, 18, 23 - ], + "num_fields": 39, + "num_features": 117581, "out_features": 1, "embedding_dim": 16, "hidden_units": [400, 400], From fa151e57181bda58d218721776f2e0cf5dc02a2d Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 20:02:37 +0800 Subject: [PATCH 07/26] fix deepfm --- .../pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py index 53e7a7572..c560f6829 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py @@ -33,9 +33,9 @@ class _DeepFM(nn.Module): def __init__(self, num_fields, num_features, embedding_dim, out_features, hidden_units, dropout_rates, **kwargs): super().__init__() - self.field_linear = FeatureLinear(num_features=num_features, + self.feature_linear = FeatureLinear(num_features=num_features, out_features=out_features) - self.field_embedding = FeatureEmbedding(num_features=num_features, + self.feature_embedding = FeatureEmbedding(num_features=num_features, embedding_dim=embedding_dim) self.pairwise_interaction = PairwiseInteraction() self.dnn = DNN(in_features=num_fields * embedding_dim, @@ -48,8 +48,8 @@ def forward(self, feature_idx, feature_value): :param feature_idx: torch.LongTensor (batch_size, num_fields) :param feature_value: torch.LongTensor (batch_size, num_fields) """ - emb = self.field_embedding(feature_idx, feature_value) # (batch_size, num_fields, embedding_dim) - linear_logit = self.field_linear(feature_idx, feature_value) + emb = self.feature_embedding(feature_idx, feature_value) # (batch_size, num_fields, embedding_dim) + linear_logit = self.feature_linear(feature_idx, feature_value) fm_logit = self.pairwise_interaction(emb) deep_logit = self.dnn(torch.flatten(emb, start_dim=1)) From ab7b4b7460f6fa84a36960c04c17a3e834fcb720 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 20:03:00 +0800 Subject: [PATCH 08/26] add afm --- .../submarine/ml/pytorch/model/ctr/afm.py | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py new file mode 100644 index 000000000..5c5ca4f67 --- /dev/null +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch +from torch import nn + +from submarine.ml.pytorch.layers.core import (FeatureEmbedding, FeatureLinear) +from submarine.ml.pytorch.model.base_pytorch_model import BasePyTorchModel + + +class AttentionalFM(BasePyTorchModel): + + def model_fn(self, params): + super().model_fn(params) + return _AttentionalFM(**self.params['model']['kwargs']) + + +class _AttentionalFM(nn.Module): + + def __init__(self, num_features: int, embedding_dim: int, + attention_dim: int, out_features: int, dropout_rate: float, + **kwargs): + super().__init__() + self.feature_linear = FeatureLinear(num_features=num_features, + out_features=out_features) + self.feature_embedding = FeatureEmbedding(num_features=num_features, + embedding_dim=embedding_dim) + self.attentional_interaction = AttentionalInteratction( + embedding_dim=embedding_dim, + attention_dim=attention_dim, + out_features=out_features, + dropout_rate=dropout_rate) + + def forward(self, feature_idx: torch.LongTensor, + feature_value: torch.LongTensor): + """ + :param feature_idx: torch.LongTensor (batch_size, num_fields) + :param feature_value: torch.LongTensor (batch_size, num_fields) + """ + return self.feature_linear( + feature_idx, feature_value) + self.attentional_interaction( + self.feature_embedding(feature_idx, feature_value)) + + +class AttentionalInteratction(nn.Module): + + def __init__(self, embedding_dim: int, attention_dim: int, + out_features: int, dropout_rate: float): + super().__init__() + self.attention_score = nn.Sequential( + nn.Linear(in_features=embedding_dim, out_features=attention_dim), + nn.ReLU(), nn.Linear(in_features=attention_dim, out_features=1), + nn.Softmax(dim=1)) + self.pairwise_product = PairwiseProduct() + self.dropout = nn.Dropout(p=dropout_rate) + self.fc = nn.Linear(in_features=embedding_dim, + out_features=out_features) + + def forward(self, x: torch.FloatTensor): + """ + :param x: torch.FloatTensor (batch_size, num_fields, embedding_dim) + """ + x = self.pairwise_product(x) + score = self.attention_score(x) + attentioned = torch.sum(score * x, dim=1) + return self.fc(self.dropout(attentioned)) + + +class PairwiseProduct(nn.Module): + + def __init__(self): + super().__init__() + + def forward(self, x: torch.FloatTensor): + """ + :param x: torch.FloatTensor (batch_sie, num_fields, embedding_dim) + """ + batch_size, num_fields, embedding_dim = x.size() + + all_pairs_product = x.unsqueeze(dim=1) * x.unsqueeze(dim=2) + idx_row, idx_col = torch.unbind(torch.triu_indices(num_fields, + num_fields, + offset=1), + dim=0) + return all_pairs_product[:, idx_row, idx_col] From a7da1c3487783cb7a9933192bc862d78015b718d Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 20:22:13 +0800 Subject: [PATCH 09/26] add afm to ctr --- .../pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py index 3fb493537..ac6fd3f57 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py @@ -14,5 +14,6 @@ # limitations under the License. from .deepfm import DeepFM +from .afm import AttentionalFM -__all__ = ["DeepFM"] +__all__ = ['DeepFM', 'AttentionalFM'] From b2600422786c62072e9b4006569655ad4888b801 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 20:42:45 +0800 Subject: [PATCH 10/26] add afm example --- .../pysubmarine/example/pytorch/afm.json | 49 +++++++++++++++++++ .../pysubmarine/example/pytorch/run_afm.py | 41 ++++++++++++++++ .../pytorch/{run_ctr.py => run_deepfm.py} | 0 .../pysubmarine/example/pytorch/run_deepfm.sh | 4 +- 4 files changed, 92 insertions(+), 2 deletions(-) create mode 100644 submarine-sdk/pysubmarine/example/pytorch/afm.json create mode 100644 submarine-sdk/pysubmarine/example/pytorch/run_afm.py rename submarine-sdk/pysubmarine/example/pytorch/{run_ctr.py => run_deepfm.py} (100%) diff --git a/submarine-sdk/pysubmarine/example/pytorch/afm.json b/submarine-sdk/pysubmarine/example/pytorch/afm.json new file mode 100644 index 000000000..68522396e --- /dev/null +++ b/submarine-sdk/pysubmarine/example/pytorch/afm.json @@ -0,0 +1,49 @@ +{ + "input": { + "train_data": "../data/tr.libsvm", + "valid_data": "../data/va.libsvm", + "test_data": "../data/te.libsvm", + "type": "libsvm" + }, + "output": { + "save_model_dir": "./output", + "metric": "roc_auc" + }, + "training": { + "batch_size": 512, + "num_epochs": 3, + "log_steps": 10, + "num_threads": 2, + "num_gpus": 0, + "seed": 42, + "mode": "distributed", + "backend": "gloo" + }, + "model": { + "name": "ctr.afm", + "kwargs": { + "num_fields": 39, + "num_features": 117581, + "attention_dim": 64, + "out_features": 1, + "embedding_dim": 256, + "hidden_units": [400, 400, 400], + "dropout_rate": 0.3 + } + }, + "loss": { + "name": "BCEWithLogitsLoss", + "kwargs": {} + }, + "optimizer": { + "name": "adam", + "kwargs": { + "lr": 5e-4 + } + }, + "resource": { + "num_cpus": 4, + "num_gpus": 0, + "num_threads": 2 + } +} diff --git a/submarine-sdk/pysubmarine/example/pytorch/run_afm.py b/submarine-sdk/pysubmarine/example/pytorch/run_afm.py new file mode 100644 index 000000000..06cb37373 --- /dev/null +++ b/submarine-sdk/pysubmarine/example/pytorch/run_afm.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from submarine.ml.pytorch.model.ctr import AttentionalFM + +import argparse + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument( + "-conf", help="a JSON configuration file for AttentionalFM", type=str) + parser.add_argument("-task_type", default='train', + help="train or evaluate, by default is train") + args = parser.parse_args() + + trainer = AttentionalFM(json_path=args.conf) + + if args.task_type == 'train': + trainer.fit() + print('[Train Done]') + elif args.task_type == 'evaluate': + score = trainer.evaluate() + print(f'Eval score: {score}') + elif args.task_type == 'predict': + pred = trainer.predict() + print('Predict:', pred) + else: + assert False, args.task_type diff --git a/submarine-sdk/pysubmarine/example/pytorch/run_ctr.py b/submarine-sdk/pysubmarine/example/pytorch/run_deepfm.py similarity index 100% rename from submarine-sdk/pysubmarine/example/pytorch/run_ctr.py rename to submarine-sdk/pysubmarine/example/pytorch/run_deepfm.py diff --git a/submarine-sdk/pysubmarine/example/pytorch/run_deepfm.sh b/submarine-sdk/pysubmarine/example/pytorch/run_deepfm.sh index 96f8add4f..8cfe09602 100644 --- a/submarine-sdk/pysubmarine/example/pytorch/run_deepfm.sh +++ b/submarine-sdk/pysubmarine/example/pytorch/run_deepfm.sh @@ -35,7 +35,7 @@ java -cp $(${HADOOP_COMMON_HOME}/bin/hadoop classpath --glob):${SUBMARINE_JAR}:$ --input_path "" \ --num_workers 2 \ --worker_resources memory=1G,vcores=1 \ - --worker_launch_cmd "JAVA_HOME=$JAVA_HOME HADOOP_HOME=$HADOOP_HOME CLASSPATH=$CLASSPATH ARROW_LIBHDFS_DIR=$ARROW_LIBHDFS_DIR PYTHONPATH=$PYTHONPATH sdk.zip/sdk/bin/python run_ctr.py --conf ./deepfm.json --task_type train" \ + --worker_launch_cmd "JAVA_HOME=$JAVA_HOME HADOOP_HOME=$HADOOP_HOME CLASSPATH=$CLASSPATH ARROW_LIBHDFS_DIR=$ARROW_LIBHDFS_DIR PYTHONPATH=$PYTHONPATH sdk.zip/sdk/bin/python run_deepfm.py --conf ./deepfm.json --task_type train" \ --insecure \ - --conf tony.containers.resources=sdk.zip#archive,${SUBMARINE_JAR},run_ctr.py,deepfm.json + --conf tony.containers.resources=sdk.zip#archive,${SUBMARINE_JAR},run_deepfm.py,deepfm.json From 827c78593d2d6ba0fc28520f6e993cf5dbdb04be Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 20:51:33 +0800 Subject: [PATCH 11/26] update conftest --- submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py b/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py index 0605e7c85..ecf29e606 100644 --- a/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py +++ b/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py @@ -68,7 +68,9 @@ def get_model_param(tmpdir): "num_features": 117581, "out_features": 1, "embedding_dim": 16, + "attention_dim": 64, "hidden_units": [400, 400], + "dropout_rate": 0.3, "dropout_rates": [0.2, 0.2] } }, From 5d6dfc0a8046d89a96d1f78d0e2303f6a9915faf Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 20:59:47 +0800 Subject: [PATCH 12/26] add afm testing --- .../ml/pytorch/model/test_afm_pytorch.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 submarine-sdk/pysubmarine/tests/ml/pytorch/model/test_afm_pytorch.py diff --git a/submarine-sdk/pysubmarine/tests/ml/pytorch/model/test_afm_pytorch.py b/submarine-sdk/pysubmarine/tests/ml/pytorch/model/test_afm_pytorch.py new file mode 100644 index 000000000..96d61c983 --- /dev/null +++ b/submarine-sdk/pysubmarine/tests/ml/pytorch/model/test_afm_pytorch.py @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from submarine.ml.pytorch.model.ctr import AttentionalFM + + +def test_run_afm(get_model_param): + param = get_model_param + + trainer = AttentionalFM(param) + trainer.fit() + trainer.evaluate() + trainer.predict() From 573a4e892ba0dfe55fa24a89fe1114cab9cdea78 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 21:45:23 +0800 Subject: [PATCH 13/26] fix fileio coding style --- .../pysubmarine/submarine/utils/fileio.py | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/utils/fileio.py b/submarine-sdk/pysubmarine/submarine/utils/fileio.py index cae64df4e..14365dc41 100644 --- a/submarine-sdk/pysubmarine/submarine/utils/fileio.py +++ b/submarine-sdk/pysubmarine/submarine/utils/fileio.py @@ -18,9 +18,10 @@ import io from urllib.parse import urlparse -from pathlib import Path +from pathlib import Path from typing import Tuple + def open_buffered_file_reader( uri: str, buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> io.BufferedReader: @@ -31,6 +32,7 @@ def open_buffered_file_reader( input_file.close() raise e + def open_buffered_stream_writer( uri: str, buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> io.BufferedWriter: @@ -41,33 +43,34 @@ def open_buffered_stream_writer( output_stream.close() raise e + def write_file(buffer: io.BytesIO, uri: str, buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> None: - with open_buffered_stream_writer(uri, buffer_size=buffer_size) as output_stream: + with open_buffered_stream_writer(uri, + buffer_size=buffer_size) as output_stream: output_stream.write(buffer.getbuffer()) + def open_input_file(uri: str) -> NativeFile: filesystem, path = _parse_uri(uri) return filesystem.open_input_file(path) + def open_output_stream(uri: str) -> NativeFile: filesystem, path = _parse_uri(uri) return filesystem.open_output_stream(path) + def file_info(uri: str) -> fs.FileInfo: filesystem, path = _parse_uri(uri) info, = filesystem.get_file_info([path]) return info -def _parse_uri(uri: str) -> Tuple[fs.FileSystem, str]: - parsed = urlparse(uri) - uri = uri if parsed.scheme else str(Path(parsed.path).expanduser().resolve()) - filesystem, path = fs.FileSystem.from_uri(uri) - return filesystem, path - - - - - +def _parse_uri(uri: str) -> Tuple[fs.FileSystem, str]: + parsed = urlparse(uri) + uri = uri if parsed.scheme else str( + Path(parsed.path).expanduser().resolve()) + filesystem, path = fs.FileSystem.from_uri(uri) + return filesystem, path From 2b4eecf696ee34b6521a0634c5f1d6ae90667fc8 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 21:48:19 +0800 Subject: [PATCH 14/26] fix base_pytorch_model coding style --- .../submarine/ml/pytorch/model/base_pytorch_model.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py index a9cfe0050..f54c30b6d 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py @@ -68,8 +68,8 @@ def __del__(self): distributed.destroy_process_group() def train(self, train_loader): - self.model.train() - with torch.enable_grad(): + self.model.train() + with torch.enable_grad(): for _, batch in enumerate(train_loader): feature_idx, feature_value, label = batch output = self.model(feature_idx, feature_value).squeeze() @@ -85,7 +85,7 @@ def evaluate(self): valid_loader = get_from_registry(self.input_type, input_fn_registry)( filepath=self.params['input']['valid_data'], **self.params['training'])() - self.model.eval() + self.model.eval() with torch.no_grad(): for _, batch in enumerate(valid_loader): feature_idx, feature_value, label = batch @@ -104,7 +104,7 @@ def predict(self): test_loader = get_from_registry(self.input_type, input_fn_registry)( filepath=self.params['input']['test_data'], **self.params['training'])() - self.model.eval() + self.model.eval() with torch.no_grad(): for _, batch in enumerate(test_loader): feature_idx, feature_value, _ = batch @@ -143,8 +143,8 @@ def save_checkpoint(self): 'optimizer': self.optimizer.state_dict() }, buffer) write_file(buffer, - uri=os.path.join( - self.params['output']['save_model_dir'], 'ckpt.pkl')) + uri=os.path.join(self.params['output']['save_model_dir'], + 'ckpt.pkl')) def model_fn(self, params): seed = params["training"]["seed"] From cb6be070e4587149e318fddfb2e2a2b744711f50 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 21:49:41 +0800 Subject: [PATCH 15/26] fix ctr.__init__ coding style --- .../pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py index ac6fd3f57..7ebc1f477 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py @@ -14,6 +14,6 @@ # limitations under the License. from .deepfm import DeepFM -from .afm import AttentionalFM +from .afm import AttentionalFM __all__ = ['DeepFM', 'AttentionalFM'] From e4b3e50404a8fff7225663abf1ea6aa1cf34101f Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 21:51:22 +0800 Subject: [PATCH 16/26] fix deepfm.py coding style --- .../submarine/ml/pytorch/model/ctr/deepfm.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py index c560f6829..d6c86ae0e 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/deepfm.py @@ -16,7 +16,8 @@ import torch from torch import nn -from submarine.ml.pytorch.layers.core import (DNN, FeatureEmbedding, FeatureLinear, +from submarine.ml.pytorch.layers.core import (DNN, FeatureEmbedding, + FeatureLinear, PairwiseInteraction) from submarine.ml.pytorch.model.base_pytorch_model import BasePyTorchModel @@ -30,13 +31,13 @@ def model_fn(self, params): class _DeepFM(nn.Module): - def __init__(self, num_fields, num_features, embedding_dim, out_features, hidden_units, - dropout_rates, **kwargs): + def __init__(self, num_fields, num_features, embedding_dim, out_features, + hidden_units, dropout_rates, **kwargs): super().__init__() self.feature_linear = FeatureLinear(num_features=num_features, - out_features=out_features) + out_features=out_features) self.feature_embedding = FeatureEmbedding(num_features=num_features, - embedding_dim=embedding_dim) + embedding_dim=embedding_dim) self.pairwise_interaction = PairwiseInteraction() self.dnn = DNN(in_features=num_fields * embedding_dim, out_features=out_features, @@ -48,7 +49,9 @@ def forward(self, feature_idx, feature_value): :param feature_idx: torch.LongTensor (batch_size, num_fields) :param feature_value: torch.LongTensor (batch_size, num_fields) """ - emb = self.feature_embedding(feature_idx, feature_value) # (batch_size, num_fields, embedding_dim) + emb = self.feature_embedding( + feature_idx, + feature_value) # (batch_size, num_fields, embedding_dim) linear_logit = self.feature_linear(feature_idx, feature_value) fm_logit = self.pairwise_interaction(emb) deep_logit = self.dnn(torch.flatten(emb, start_dim=1)) From 4facbcef14f22c19f8549578856eea9495133459 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 21:53:46 +0800 Subject: [PATCH 17/26] fix conftest.py coding style --- submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py b/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py index ecf29e606..2da47efa4 100644 --- a/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py +++ b/submarine-sdk/pysubmarine/tests/ml/pytorch/model/conftest.py @@ -70,7 +70,7 @@ def get_model_param(tmpdir): "embedding_dim": 16, "attention_dim": 64, "hidden_units": [400, 400], - "dropout_rate": 0.3, + "dropout_rate": 0.3, "dropout_rates": [0.2, 0.2] } }, From adae61310e68b11ce90efc9377104ae3440b8c23 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 22:04:13 +0800 Subject: [PATCH 18/26] fix tqdm --- .../submarine/ml/pytorch/input/libsvm_dataset.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py index a535ccc7b..7623e966b 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py @@ -20,8 +20,6 @@ from submarine.utils.fileio import open_buffered_file_reader, file_info -from tqdm.auto import tqdm - import os import itertools import functools @@ -74,8 +72,6 @@ def _locate_sample_offsets(cls, data_uri: str, n_jobs: int) -> np.ndarray: chunk_starts.append(min(infile.tell(), finfo.size)) with mp.Pool(processes=n_jobs, - initializer=tqdm.set_lock, - initargs=(tqdm.get_lock(),), maxtasksperchild=1) as pool: return np.asarray( list( @@ -93,14 +89,9 @@ def _locate_sample_offsets_job( offsets = [start] with open_buffered_file_reader(data_uri) as infile: infile.seek(start, os.SEEK_SET) - with tqdm(total=None, - desc=f'[Loacate Sample Offsets] job: {job_id}', - position=job_id, - disable=('DISABLE_TQDM' in os.environ)) as pbar: - while infile.tell() < end: - infile.readline() - offsets.append(infile.tell()) - pbar.update() + while infile.tell() < end: + infile.readline() + offsets.append(infile.tell()) assert offsets.pop() == end return offsets From 9ff2f8d6e52834b0675bd047a77de67044ad3747 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 22:07:27 +0800 Subject: [PATCH 19/26] fix core, afm coding style --- submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py | 4 ++-- .../pysubmarine/submarine/ml/pytorch/model/ctr/afm.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py index aeb24fc3f..4e7b1e34f 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py @@ -34,7 +34,7 @@ def __init__(self, num_features, out_features): def forward(self, feature_idx, feature_value): """ :param feature_idx: torch.LongTensor (batch_size, num_fields) - :param feature_value: torch.LongTensor (batch_size, num_fields) + :param feature_value: torch.LongTensor (batch_size, num_fields) """ return torch.sum( self.weight(feature_idx) * feature_value.unsqueeze(dim=-1), @@ -51,7 +51,7 @@ def __init__(self, num_features, embedding_dim): def forward(self, feature_idx, feature_value): """ :param feature_idx: torch.LongTensor (batch_size, num_fields) - :param feature_value: torch.LongTensor (batch_size, num_fields) + :param feature_value: torch.LongTensor (batch_size, num_fields) """ return self.weight(feature_idx) * feature_value.unsqueeze(dim=-1) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py index 5c5ca4f67..aa1fa5841 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py @@ -70,7 +70,7 @@ def __init__(self, embedding_dim: int, attention_dim: int, def forward(self, x: torch.FloatTensor): """ - :param x: torch.FloatTensor (batch_size, num_fields, embedding_dim) + :param x: torch.FloatTensor (batch_size, num_fields, embedding_dim) """ x = self.pairwise_product(x) score = self.attention_score(x) @@ -85,7 +85,7 @@ def __init__(self): def forward(self, x: torch.FloatTensor): """ - :param x: torch.FloatTensor (batch_sie, num_fields, embedding_dim) + :param x: torch.FloatTensor (batch_sie, num_fields, embedding_dim) """ batch_size, num_fields, embedding_dim = x.size() From 42d5091e6bd17d93bd778bb1a7f643f6da7cbd04 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 22:36:05 +0800 Subject: [PATCH 20/26] try to make codestyle checker happy --- .../pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py | 2 +- .../pysubmarine/submarine/ml/pytorch/layers/core.py | 2 -- submarine-sdk/pysubmarine/submarine/utils/fileio.py | 5 ++--- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py index 7623e966b..bb33697a7 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py @@ -85,7 +85,7 @@ def _locate_sample_offsets(cls, data_uri: str, n_jobs: int) -> np.ndarray: @classmethod def _locate_sample_offsets_job( cls, data_uri: str, task: Tuple[int, Tuple[int, int]]) -> List[int]: - job_id, (start, end) = task + _, (start, end) = task offsets = [start] with open_buffered_file_reader(data_uri) as infile: infile.seek(start, os.SEEK_SET) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py index 4e7b1e34f..98c847206 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/layers/core.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from itertools import accumulate - import torch from torch import nn diff --git a/submarine-sdk/pysubmarine/submarine/utils/fileio.py b/submarine-sdk/pysubmarine/submarine/utils/fileio.py index 14365dc41..6e02243f7 100644 --- a/submarine-sdk/pysubmarine/submarine/utils/fileio.py +++ b/submarine-sdk/pysubmarine/submarine/utils/fileio.py @@ -14,7 +14,6 @@ # limitations under the License. from pyarrow import fs -from pyarrow.lib import NativeFile import io from urllib.parse import urlparse @@ -52,12 +51,12 @@ def write_file(buffer: io.BytesIO, output_stream.write(buffer.getbuffer()) -def open_input_file(uri: str) -> NativeFile: +def open_input_file(uri: str): filesystem, path = _parse_uri(uri) return filesystem.open_input_file(path) -def open_output_stream(uri: str) -> NativeFile: +def open_output_stream(uri: str): filesystem, path = _parse_uri(uri) return filesystem.open_output_stream(path) From 2929dfcf4b98f7b7ed611cf95cb71be088d95b51 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 22:55:09 +0800 Subject: [PATCH 21/26] try to make codestyle checker happy v2 --- .../pysubmarine/submarine/ml/pytorch/model/ctr/afm.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py index aa1fa5841..dec231481 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py @@ -80,14 +80,11 @@ def forward(self, x: torch.FloatTensor): class PairwiseProduct(nn.Module): - def __init__(self): - super().__init__() - def forward(self, x: torch.FloatTensor): """ :param x: torch.FloatTensor (batch_sie, num_fields, embedding_dim) """ - batch_size, num_fields, embedding_dim = x.size() + _, num_fields, _ = x.size() all_pairs_product = x.unsqueeze(dim=1) * x.unsqueeze(dim=2) idx_row, idx_col = torch.unbind(torch.triu_indices(num_fields, From d4d93c4a668045b9b2add6f6d83b0f5105791769 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 22:57:33 +0800 Subject: [PATCH 22/26] try to make python3.5 happy --- submarine-sdk/pysubmarine/submarine/utils/fileio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/submarine-sdk/pysubmarine/submarine/utils/fileio.py b/submarine-sdk/pysubmarine/submarine/utils/fileio.py index 6e02243f7..d4a8196eb 100644 --- a/submarine-sdk/pysubmarine/submarine/utils/fileio.py +++ b/submarine-sdk/pysubmarine/submarine/utils/fileio.py @@ -70,6 +70,6 @@ def file_info(uri: str) -> fs.FileInfo: def _parse_uri(uri: str) -> Tuple[fs.FileSystem, str]: parsed = urlparse(uri) uri = uri if parsed.scheme else str( - Path(parsed.path).expanduser().resolve()) + Path(parsed.path).expanduser().absolute()) filesystem, path = fs.FileSystem.from_uri(uri) return filesystem, path From f89d070760b8cfe01be9326e0899204cf34841bb Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Mon, 13 Jul 2020 00:36:02 +0800 Subject: [PATCH 23/26] python3.6 yapf --- .../pysubmarine/submarine/experiment/api_client.py | 2 +- .../pysubmarine/submarine/store/sqlalchemy_store.py | 1 - submarine-sdk/pysubmarine/submarine/utils/fileio.py | 6 +++--- submarine-sdk/pysubmarine/submarine/utils/tf_utils.py | 1 - submarine-sdk/pysubmarine/tests/tracking/test_utils.py | 1 - submarine-sdk/pysubmarine/tests/utils/test_env.py | 1 - submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py | 1 - submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py | 1 - submarine-sdk/pysubmarine/tests/utils/test_validation.py | 1 - 9 files changed, 4 insertions(+), 11 deletions(-) diff --git a/submarine-sdk/pysubmarine/submarine/experiment/api_client.py b/submarine-sdk/pysubmarine/submarine/experiment/api_client.py index 5bbc5c0e8..7720f65a4 100644 --- a/submarine-sdk/pysubmarine/submarine/experiment/api_client.py +++ b/submarine-sdk/pysubmarine/submarine/experiment/api_client.py @@ -37,10 +37,10 @@ # python 2 and python 3 compatibility library import six -from dateutil.parser import parse from six.moves.urllib.parse import quote import submarine.experiment.models +from dateutil.parser import parse from submarine.experiment import rest from submarine.experiment.configuration import Configuration from submarine.experiment.exceptions import ApiException, ApiValueError diff --git a/submarine-sdk/pysubmarine/submarine/store/sqlalchemy_store.py b/submarine-sdk/pysubmarine/submarine/store/sqlalchemy_store.py index 38c5a175a..f08d038d3 100644 --- a/submarine-sdk/pysubmarine/submarine/store/sqlalchemy_store.py +++ b/submarine-sdk/pysubmarine/submarine/store/sqlalchemy_store.py @@ -18,7 +18,6 @@ from contextlib import contextmanager import sqlalchemy - from submarine.exceptions import SubmarineException from submarine.store.abstract_store import AbstractStore from submarine.store.database.models import Base, SqlMetric, SqlParam diff --git a/submarine-sdk/pysubmarine/submarine/utils/fileio.py b/submarine-sdk/pysubmarine/submarine/utils/fileio.py index d4a8196eb..d756757d0 100644 --- a/submarine-sdk/pysubmarine/submarine/utils/fileio.py +++ b/submarine-sdk/pysubmarine/submarine/utils/fileio.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from pyarrow import fs - import io -from urllib.parse import urlparse from pathlib import Path from typing import Tuple +from urllib.parse import urlparse + +from pyarrow import fs def open_buffered_file_reader( diff --git a/submarine-sdk/pysubmarine/submarine/utils/tf_utils.py b/submarine-sdk/pysubmarine/submarine/utils/tf_utils.py index 96da0ce1f..545395891 100644 --- a/submarine-sdk/pysubmarine/submarine/utils/tf_utils.py +++ b/submarine-sdk/pysubmarine/submarine/utils/tf_utils.py @@ -17,7 +17,6 @@ import os import tensorflow as tf - from submarine.ml.tensorflow.optimizer import get_optimizer diff --git a/submarine-sdk/pysubmarine/tests/tracking/test_utils.py b/submarine-sdk/pysubmarine/tests/tracking/test_utils.py index 33422b904..3348fcb7a 100644 --- a/submarine-sdk/pysubmarine/tests/tracking/test_utils.py +++ b/submarine-sdk/pysubmarine/tests/tracking/test_utils.py @@ -16,7 +16,6 @@ import os import mock - from submarine.store import DEFAULT_SUBMARINE_JDBC_URL from submarine.store.sqlalchemy_store import SqlAlchemyStore from submarine.tracking.utils import (_JOB_NAME_ENV_VAR, _TRACKING_URI_ENV_VAR, diff --git a/submarine-sdk/pysubmarine/tests/utils/test_env.py b/submarine-sdk/pysubmarine/tests/utils/test_env.py index eefd176e9..7aa2b8f16 100644 --- a/submarine-sdk/pysubmarine/tests/utils/test_env.py +++ b/submarine-sdk/pysubmarine/tests/utils/test_env.py @@ -17,7 +17,6 @@ from os import environ import pytest - from submarine.utils.env import (get_env, get_from_dicts, get_from_json, get_from_registry, unset_variable) diff --git a/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py b/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py index 3a866e93a..c5e6f063a 100644 --- a/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py +++ b/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py @@ -17,7 +17,6 @@ import pytest from mock import Mock, patch - from submarine.exceptions import RestException, SubmarineException from submarine.utils.rest_utils import http_request, verify_rest_response diff --git a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py index 52ed4ad09..86675079b 100644 --- a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py +++ b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py @@ -14,7 +14,6 @@ # limitations under the License. import pytest - from submarine.utils.tf_utils import get_tf_config diff --git a/submarine-sdk/pysubmarine/tests/utils/test_validation.py b/submarine-sdk/pysubmarine/tests/utils/test_validation.py index 0030913d2..285215c2a 100644 --- a/submarine-sdk/pysubmarine/tests/utils/test_validation.py +++ b/submarine-sdk/pysubmarine/tests/utils/test_validation.py @@ -14,7 +14,6 @@ # limitations under the License. import pytest - from submarine.exceptions import SubmarineException from submarine.utils.validation import (_validate_db_type_string, _validate_length_limit, From 30578995ff024f5638e1e52aa69116935aa67737 Mon Sep 17 00:00:00 2001 From: andrewhsiehth Date: Mon, 13 Jul 2020 11:57:36 +0000 Subject: [PATCH 24/26] use pysubmarine-ci to auto-format --- submarine-sdk/pysubmarine/submarine/experiment/api_client.py | 2 +- submarine-sdk/pysubmarine/submarine/store/sqlalchemy_store.py | 1 + submarine-sdk/pysubmarine/submarine/utils/tf_utils.py | 1 + submarine-sdk/pysubmarine/tests/tracking/test_utils.py | 1 + submarine-sdk/pysubmarine/tests/utils/test_env.py | 1 + submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py | 1 + submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py | 1 + submarine-sdk/pysubmarine/tests/utils/test_validation.py | 1 + 8 files changed, 8 insertions(+), 1 deletion(-) diff --git a/submarine-sdk/pysubmarine/submarine/experiment/api_client.py b/submarine-sdk/pysubmarine/submarine/experiment/api_client.py index 7720f65a4..5bbc5c0e8 100644 --- a/submarine-sdk/pysubmarine/submarine/experiment/api_client.py +++ b/submarine-sdk/pysubmarine/submarine/experiment/api_client.py @@ -37,10 +37,10 @@ # python 2 and python 3 compatibility library import six +from dateutil.parser import parse from six.moves.urllib.parse import quote import submarine.experiment.models -from dateutil.parser import parse from submarine.experiment import rest from submarine.experiment.configuration import Configuration from submarine.experiment.exceptions import ApiException, ApiValueError diff --git a/submarine-sdk/pysubmarine/submarine/store/sqlalchemy_store.py b/submarine-sdk/pysubmarine/submarine/store/sqlalchemy_store.py index f08d038d3..38c5a175a 100644 --- a/submarine-sdk/pysubmarine/submarine/store/sqlalchemy_store.py +++ b/submarine-sdk/pysubmarine/submarine/store/sqlalchemy_store.py @@ -18,6 +18,7 @@ from contextlib import contextmanager import sqlalchemy + from submarine.exceptions import SubmarineException from submarine.store.abstract_store import AbstractStore from submarine.store.database.models import Base, SqlMetric, SqlParam diff --git a/submarine-sdk/pysubmarine/submarine/utils/tf_utils.py b/submarine-sdk/pysubmarine/submarine/utils/tf_utils.py index 545395891..96da0ce1f 100644 --- a/submarine-sdk/pysubmarine/submarine/utils/tf_utils.py +++ b/submarine-sdk/pysubmarine/submarine/utils/tf_utils.py @@ -17,6 +17,7 @@ import os import tensorflow as tf + from submarine.ml.tensorflow.optimizer import get_optimizer diff --git a/submarine-sdk/pysubmarine/tests/tracking/test_utils.py b/submarine-sdk/pysubmarine/tests/tracking/test_utils.py index 3348fcb7a..33422b904 100644 --- a/submarine-sdk/pysubmarine/tests/tracking/test_utils.py +++ b/submarine-sdk/pysubmarine/tests/tracking/test_utils.py @@ -16,6 +16,7 @@ import os import mock + from submarine.store import DEFAULT_SUBMARINE_JDBC_URL from submarine.store.sqlalchemy_store import SqlAlchemyStore from submarine.tracking.utils import (_JOB_NAME_ENV_VAR, _TRACKING_URI_ENV_VAR, diff --git a/submarine-sdk/pysubmarine/tests/utils/test_env.py b/submarine-sdk/pysubmarine/tests/utils/test_env.py index 7aa2b8f16..eefd176e9 100644 --- a/submarine-sdk/pysubmarine/tests/utils/test_env.py +++ b/submarine-sdk/pysubmarine/tests/utils/test_env.py @@ -17,6 +17,7 @@ from os import environ import pytest + from submarine.utils.env import (get_env, get_from_dicts, get_from_json, get_from_registry, unset_variable) diff --git a/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py b/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py index c5e6f063a..3a866e93a 100644 --- a/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py +++ b/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py @@ -17,6 +17,7 @@ import pytest from mock import Mock, patch + from submarine.exceptions import RestException, SubmarineException from submarine.utils.rest_utils import http_request, verify_rest_response diff --git a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py index 86675079b..52ed4ad09 100644 --- a/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py +++ b/submarine-sdk/pysubmarine/tests/utils/test_tf_utils.py @@ -14,6 +14,7 @@ # limitations under the License. import pytest + from submarine.utils.tf_utils import get_tf_config diff --git a/submarine-sdk/pysubmarine/tests/utils/test_validation.py b/submarine-sdk/pysubmarine/tests/utils/test_validation.py index 285215c2a..0030913d2 100644 --- a/submarine-sdk/pysubmarine/tests/utils/test_validation.py +++ b/submarine-sdk/pysubmarine/tests/utils/test_validation.py @@ -14,6 +14,7 @@ # limitations under the License. import pytest + from submarine.exceptions import SubmarineException from submarine.utils.validation import (_validate_db_type_string, _validate_length_limit, From f98d59fe424672a685a7cda399c6a72f00bb7229 Mon Sep 17 00:00:00 2001 From: andrewhsiehth Date: Sat, 18 Jul 2020 23:30:58 +0800 Subject: [PATCH 25/26] mkdir for non-existing output directory --- .../submarine/ml/pytorch/model/base_pytorch_model.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py index f54c30b6d..168a78ad8 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py @@ -17,6 +17,7 @@ import logging import os from abc import ABC +from pathlib import Path import torch from torch import distributed @@ -44,6 +45,9 @@ def __init__(self, params=None, json_path=None): self.params = get_from_dicts(params, default_parameters) self.params = get_from_json(json_path, self.params) self._sanity_check() + Path(self.params['output'] + ['save_model_dir']).expanduser().resolve().mkdir(parents=True, + exist_ok=True) logging.info("Model parameters : %s", self.params) self.input_type = self.params['input']['type'] From 0521639cc83a171b9beef22f4ff39c8d821d0ac6 Mon Sep 17 00:00:00 2001 From: andrewhsiehth Date: Sun, 19 Jul 2020 15:36:55 +0800 Subject: [PATCH 26/26] rename afm && refactor example/pytorch folder --- .../example/pytorch/{ => afm}/afm.json | 6 +-- .../example/pytorch/{ => afm}/run_afm.py | 6 +-- .../example/pytorch/afm/run_afm.sh | 41 +++++++++++++++++++ .../example/pytorch/{ => deepfm}/deepfm.json | 6 +-- .../pytorch/{ => deepfm}/run_deepfm.py | 0 .../pytorch/{ => deepfm}/run_deepfm.sh | 0 .../ml/pytorch/model/ctr/__init__.py | 4 +- .../submarine/ml/pytorch/model/ctr/afm.py | 6 +-- .../ml/pytorch/model/test_afm_pytorch.py | 4 +- 9 files changed, 57 insertions(+), 16 deletions(-) rename submarine-sdk/pysubmarine/example/pytorch/{ => afm}/afm.json (87%) rename submarine-sdk/pysubmarine/example/pytorch/{ => afm}/run_afm.py (88%) create mode 100644 submarine-sdk/pysubmarine/example/pytorch/afm/run_afm.sh rename submarine-sdk/pysubmarine/example/pytorch/{ => deepfm}/deepfm.json (87%) rename submarine-sdk/pysubmarine/example/pytorch/{ => deepfm}/run_deepfm.py (100%) rename submarine-sdk/pysubmarine/example/pytorch/{ => deepfm}/run_deepfm.sh (100%) diff --git a/submarine-sdk/pysubmarine/example/pytorch/afm.json b/submarine-sdk/pysubmarine/example/pytorch/afm/afm.json similarity index 87% rename from submarine-sdk/pysubmarine/example/pytorch/afm.json rename to submarine-sdk/pysubmarine/example/pytorch/afm/afm.json index 68522396e..cc68e9533 100644 --- a/submarine-sdk/pysubmarine/example/pytorch/afm.json +++ b/submarine-sdk/pysubmarine/example/pytorch/afm/afm.json @@ -1,8 +1,8 @@ { "input": { - "train_data": "../data/tr.libsvm", - "valid_data": "../data/va.libsvm", - "test_data": "../data/te.libsvm", + "train_data": "../../data/tr.libsvm", + "valid_data": "../../data/va.libsvm", + "test_data": "../../data/te.libsvm", "type": "libsvm" }, "output": { diff --git a/submarine-sdk/pysubmarine/example/pytorch/run_afm.py b/submarine-sdk/pysubmarine/example/pytorch/afm/run_afm.py similarity index 88% rename from submarine-sdk/pysubmarine/example/pytorch/run_afm.py rename to submarine-sdk/pysubmarine/example/pytorch/afm/run_afm.py index 06cb37373..e6c28d6c3 100644 --- a/submarine-sdk/pysubmarine/example/pytorch/run_afm.py +++ b/submarine-sdk/pysubmarine/example/pytorch/afm/run_afm.py @@ -14,19 +14,19 @@ # limitations under the License. -from submarine.ml.pytorch.model.ctr import AttentionalFM +from submarine.ml.pytorch.model.ctr import AFM import argparse if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( - "-conf", help="a JSON configuration file for AttentionalFM", type=str) + "-conf", help="a JSON configuration file for AFM", type=str) parser.add_argument("-task_type", default='train', help="train or evaluate, by default is train") args = parser.parse_args() - trainer = AttentionalFM(json_path=args.conf) + trainer = AFM(json_path=args.conf) if args.task_type == 'train': trainer.fit() diff --git a/submarine-sdk/pysubmarine/example/pytorch/afm/run_afm.sh b/submarine-sdk/pysubmarine/example/pytorch/afm/run_afm.sh new file mode 100644 index 000000000..494931cb5 --- /dev/null +++ b/submarine-sdk/pysubmarine/example/pytorch/afm/run_afm.sh @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +export JAVA_HOME=${JAVA_HOME:-$HOME/workspace/app/java} +export HADOOP_HOME=${HADOOP_HOME:-$HADOOP_HDFS_HOME} +export CLASSPATH=${CLASSPATH:-`hdfs classpath --glob`} +export ARROW_LIBHDFS_DIR=${ARROW_LIBHDFS_DIR:-$HADOOP_HOME/lib/native} + +# path to pysubmarine/submarine +PYTHONPATH=$HOME/workspace/submarine/submarine-sdk/pysubmarine + +HADOOP_CONF_PATH=${HADOOP_CONF_PATH:-$HADOOP_CONF_DIR} + +SUBMARINE_VERSION=0.5.0-SNAPSHOT +SUBMARINE_HADOOP_VERSION=2.9 +SUBMARINE_JAR=/opt/submarine-dist-${SUBMARINE_VERSION}-hadoop-${SUBMARINE_HADOOP_VERSION}/submarine-dist-${SUBMARINE_VERSION}-hadoop-${SUBMARINE_HADOOP_VERSION}/submarine-all-${SUBMARINE_VERSION}-hadoop-${SUBMARINE_HADOOP_VERSION}.jar + +java -cp $(${HADOOP_COMMON_HOME}/bin/hadoop classpath --glob):${SUBMARINE_JAR}:${HADOOP_CONF_PATH} \ + org.apache.submarine.client.cli.Cli job run --name afm-job-001 \ + --framework pytorch \ + --verbose \ + --input_path "" \ + --num_workers 2 \ + --worker_resources memory=1G,vcores=1 \ + --worker_launch_cmd "JAVA_HOME=$JAVA_HOME HADOOP_HOME=$HADOOP_HOME CLASSPATH=$CLASSPATH ARROW_LIBHDFS_DIR=$ARROW_LIBHDFS_DIR PYTHONPATH=$PYTHONPATH sdk.zip/sdk/bin/python run_afm.py --conf ./afm.json --task_type train" \ + --insecure \ + --conf tony.containers.resources=sdk.zip#archive,${SUBMARINE_JAR},run_afm.py,afm.json + diff --git a/submarine-sdk/pysubmarine/example/pytorch/deepfm.json b/submarine-sdk/pysubmarine/example/pytorch/deepfm/deepfm.json similarity index 87% rename from submarine-sdk/pysubmarine/example/pytorch/deepfm.json rename to submarine-sdk/pysubmarine/example/pytorch/deepfm/deepfm.json index a7264e1a8..41a694b1a 100644 --- a/submarine-sdk/pysubmarine/example/pytorch/deepfm.json +++ b/submarine-sdk/pysubmarine/example/pytorch/deepfm/deepfm.json @@ -1,8 +1,8 @@ { "input": { - "train_data": "../data/tr.libsvm", - "valid_data": "../data/va.libsvm", - "test_data": "../data/te.libsvm", + "train_data": "../../data/tr.libsvm", + "valid_data": "../../data/va.libsvm", + "test_data": "../../data/te.libsvm", "type": "libsvm" }, "output": { diff --git a/submarine-sdk/pysubmarine/example/pytorch/run_deepfm.py b/submarine-sdk/pysubmarine/example/pytorch/deepfm/run_deepfm.py similarity index 100% rename from submarine-sdk/pysubmarine/example/pytorch/run_deepfm.py rename to submarine-sdk/pysubmarine/example/pytorch/deepfm/run_deepfm.py diff --git a/submarine-sdk/pysubmarine/example/pytorch/run_deepfm.sh b/submarine-sdk/pysubmarine/example/pytorch/deepfm/run_deepfm.sh similarity index 100% rename from submarine-sdk/pysubmarine/example/pytorch/run_deepfm.sh rename to submarine-sdk/pysubmarine/example/pytorch/deepfm/run_deepfm.sh diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py index 7ebc1f477..34bc8d677 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/__init__.py @@ -14,6 +14,6 @@ # limitations under the License. from .deepfm import DeepFM -from .afm import AttentionalFM +from .afm import AFM -__all__ = ['DeepFM', 'AttentionalFM'] +__all__ = ['DeepFM', 'AFM'] diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py index dec231481..e5314f753 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/ctr/afm.py @@ -20,14 +20,14 @@ from submarine.ml.pytorch.model.base_pytorch_model import BasePyTorchModel -class AttentionalFM(BasePyTorchModel): +class AFM(BasePyTorchModel): def model_fn(self, params): super().model_fn(params) - return _AttentionalFM(**self.params['model']['kwargs']) + return _AFM(**self.params['model']['kwargs']) -class _AttentionalFM(nn.Module): +class _AFM(nn.Module): def __init__(self, num_features: int, embedding_dim: int, attention_dim: int, out_features: int, dropout_rate: float, diff --git a/submarine-sdk/pysubmarine/tests/ml/pytorch/model/test_afm_pytorch.py b/submarine-sdk/pysubmarine/tests/ml/pytorch/model/test_afm_pytorch.py index 96d61c983..72befacbd 100644 --- a/submarine-sdk/pysubmarine/tests/ml/pytorch/model/test_afm_pytorch.py +++ b/submarine-sdk/pysubmarine/tests/ml/pytorch/model/test_afm_pytorch.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from submarine.ml.pytorch.model.ctr import AttentionalFM +from submarine.ml.pytorch.model.ctr import AFM def test_run_afm(get_model_param): param = get_model_param - trainer = AttentionalFM(param) + trainer = AFM(param) trainer.fit() trainer.evaluate() trainer.predict()