From 3f80bc61cf32ae8b13ad8cacd95d830885341337 Mon Sep 17 00:00:00 2001 From: Andrew Hsieh Date: Sun, 12 Jul 2020 17:41:09 +0800 Subject: [PATCH] fix fileio --- .../pysubmarine/example/pytorch/deepfm.json | 20 ++++----- .../ml/pytorch/input/libsvm_dataset.py | 2 +- .../ml/pytorch/model/base_pytorch_model.py | 38 +++++++++-------- .../pysubmarine/submarine/utils/fileio.py | 41 +++++++++++++++++-- 4 files changed, 68 insertions(+), 33 deletions(-) diff --git a/submarine-sdk/pysubmarine/example/pytorch/deepfm.json b/submarine-sdk/pysubmarine/example/pytorch/deepfm.json index a1c70690b..a7264e1a8 100644 --- a/submarine-sdk/pysubmarine/example/pytorch/deepfm.json +++ b/submarine-sdk/pysubmarine/example/pytorch/deepfm.json @@ -10,10 +10,10 @@ "metric": "roc_auc" }, "training": { - "batch_size": 64, - "num_epochs": 1, + "batch_size": 512, + "num_epochs": 3, "log_steps": 10, - "num_threads": 0, + "num_threads": 2, "num_gpus": 0, "seed": 42, "mode": "distributed", @@ -22,12 +22,12 @@ "model": { "name": "ctr.deepfm", "kwargs": { - "field_dims": [15, 52, 30, 19, 111, 51, 26, 19, 53, 5, 13, 8, 23, 21, 77, 25, 39, 11, - 8, 61, 15, 3, 34, 75, 30, 79, 11, 85, 37, 10, 94, 19, 5, 32, 6, 12, 42, 18, 23], + "num_fields": 39, + "num_features": 117581, "out_features": 1, - "embedding_dim": 16, - "hidden_units": [400, 400], - "dropout_rates": [0.2, 0.2] + "embedding_dim": 256, + "hidden_units": [400, 400, 400], + "dropout_rates": [0.3, 0.3, 0.3] } }, "loss": { @@ -37,12 +37,12 @@ "optimizer": { "name": "adam", "kwargs": { - "lr": 1e-3 + "lr": 5e-4 } }, "resource": { "num_cpus": 4, "num_gpus": 0, - "num_threads": 0 + "num_threads": 2 } } diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py index a3cd42dc0..a535ccc7b 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py @@ -40,7 +40,7 @@ def __len__(self) -> int: def __getitem__(self, idx) -> Tuple[torch.Tensor, torch.Tensor, int]: with open_buffered_file_reader(self.data_uri) as infile: - infile.seek(self.sample_offset[idx], io.SEEK_SET) + infile.seek(self.sample_offset[idx], os.SEEK_SET) sample = infile.readline() return LIBSVMDataset.parse_sample(sample) diff --git a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py index 1dfe7f6a1..a9cfe0050 100644 --- a/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py +++ b/submarine-sdk/pysubmarine/submarine/ml/pytorch/model/base_pytorch_model.py @@ -68,32 +68,34 @@ def __del__(self): distributed.destroy_process_group() def train(self, train_loader): - for _, batch in enumerate(train_loader): - sample, target = batch - output = self.model(sample) - loss = self.loss(output, target) - self.optimizer.zero_grad() - loss.backward() - self.optimizer.step() + self.model.train() + with torch.enable_grad(): + for _, batch in enumerate(train_loader): + feature_idx, feature_value, label = batch + output = self.model(feature_idx, feature_value).squeeze() + loss = self.loss(output, label.float()) + self.optimizer.zero_grad() + loss.backward() + self.optimizer.step() def evaluate(self): outputs = [] - targets = [] + labels = [] valid_loader = get_from_registry(self.input_type, input_fn_registry)( filepath=self.params['input']['valid_data'], **self.params['training'])() - + self.model.eval() with torch.no_grad(): for _, batch in enumerate(valid_loader): - sample, target = batch - output = self.model(sample) + feature_idx, feature_value, label = batch + output = self.model(feature_idx, feature_value).squeeze() outputs.append(output) - targets.append(target) + labels.append(label) return self.metric( - torch.cat(targets, dim=0).cpu().numpy(), + torch.cat(labels, dim=0).cpu().numpy(), torch.cat(outputs, dim=0).cpu().numpy()) def predict(self): @@ -102,12 +104,12 @@ def predict(self): test_loader = get_from_registry(self.input_type, input_fn_registry)( filepath=self.params['input']['test_data'], **self.params['training'])() - + self.model.eval() with torch.no_grad(): for _, batch in enumerate(test_loader): - sample, _ = batch - output = self.model(sample) - outputs.append(output) + feature_idx, feature_value, _ = batch + output = self.model(feature_idx, feature_value).squeeze() + outputs.append(torch.sigmoid(output)) return torch.cat(outputs, dim=0).cpu().numpy() @@ -141,7 +143,7 @@ def save_checkpoint(self): 'optimizer': self.optimizer.state_dict() }, buffer) write_file(buffer, - path=os.path.join( + uri=os.path.join( self.params['output']['save_model_dir'], 'ckpt.pkl')) def model_fn(self, params): diff --git a/submarine-sdk/pysubmarine/submarine/utils/fileio.py b/submarine-sdk/pysubmarine/submarine/utils/fileio.py index 2ad5f44b5..cae64df4e 100644 --- a/submarine-sdk/pysubmarine/submarine/utils/fileio.py +++ b/submarine-sdk/pysubmarine/submarine/utils/fileio.py @@ -17,7 +17,9 @@ from pyarrow.lib import NativeFile import io - +from urllib.parse import urlparse +from pathlib import Path +from typing import Tuple def open_buffered_file_reader( uri: str, @@ -25,16 +27,47 @@ def open_buffered_file_reader( try: input_file = open_input_file(uri) return io.BufferedReader(input_file, buffer_size=buffer_size) - except: + except Exception as e: input_file.close() + raise e +def open_buffered_stream_writer( + uri: str, + buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> io.BufferedWriter: + try: + output_stream = open_output_stream(uri) + return io.BufferedWriter(output_stream, buffer_size=buffer_size) + except Exception as e: + output_stream.close() + raise e + +def write_file(buffer: io.BytesIO, + uri: str, + buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> None: + with open_buffered_stream_writer(uri, buffer_size=buffer_size) as output_stream: + output_stream.write(buffer.getbuffer()) def open_input_file(uri: str) -> NativeFile: - filesystem, path = fs.FileSystem.from_uri(uri) + filesystem, path = _parse_uri(uri) return filesystem.open_input_file(path) +def open_output_stream(uri: str) -> NativeFile: + filesystem, path = _parse_uri(uri) + return filesystem.open_output_stream(path) def file_info(uri: str) -> fs.FileInfo: - filesystem, path = fs.FileSystem.from_uri(uri) + filesystem, path = _parse_uri(uri) info, = filesystem.get_file_info([path]) return info + +def _parse_uri(uri: str) -> Tuple[fs.FileSystem, str]: + parsed = urlparse(uri) + uri = uri if parsed.scheme else str(Path(parsed.path).expanduser().resolve()) + filesystem, path = fs.FileSystem.from_uri(uri) + return filesystem, path + + + + + +