Skip to content
This repository has been archived by the owner on Jul 10, 2024. It is now read-only.

Commit

Permalink
fix fileio
Browse files Browse the repository at this point in the history
  • Loading branch information
ifndef012 committed Jul 18, 2020
1 parent 7471408 commit 3f80bc6
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 33 deletions.
20 changes: 10 additions & 10 deletions submarine-sdk/pysubmarine/example/pytorch/deepfm.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
"metric": "roc_auc"
},
"training": {
"batch_size": 64,
"num_epochs": 1,
"batch_size": 512,
"num_epochs": 3,
"log_steps": 10,
"num_threads": 0,
"num_threads": 2,
"num_gpus": 0,
"seed": 42,
"mode": "distributed",
Expand All @@ -22,12 +22,12 @@
"model": {
"name": "ctr.deepfm",
"kwargs": {
"field_dims": [15, 52, 30, 19, 111, 51, 26, 19, 53, 5, 13, 8, 23, 21, 77, 25, 39, 11,
8, 61, 15, 3, 34, 75, 30, 79, 11, 85, 37, 10, 94, 19, 5, 32, 6, 12, 42, 18, 23],
"num_fields": 39,
"num_features": 117581,
"out_features": 1,
"embedding_dim": 16,
"hidden_units": [400, 400],
"dropout_rates": [0.2, 0.2]
"embedding_dim": 256,
"hidden_units": [400, 400, 400],
"dropout_rates": [0.3, 0.3, 0.3]
}
},
"loss": {
Expand All @@ -37,12 +37,12 @@
"optimizer": {
"name": "adam",
"kwargs": {
"lr": 1e-3
"lr": 5e-4
}
},
"resource": {
"num_cpus": 4,
"num_gpus": 0,
"num_threads": 0
"num_threads": 2
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __len__(self) -> int:

def __getitem__(self, idx) -> Tuple[torch.Tensor, torch.Tensor, int]:
with open_buffered_file_reader(self.data_uri) as infile:
infile.seek(self.sample_offset[idx], io.SEEK_SET)
infile.seek(self.sample_offset[idx], os.SEEK_SET)
sample = infile.readline()
return LIBSVMDataset.parse_sample(sample)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,34 @@ def __del__(self):
distributed.destroy_process_group()

def train(self, train_loader):
for _, batch in enumerate(train_loader):
sample, target = batch
output = self.model(sample)
loss = self.loss(output, target)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.model.train()
with torch.enable_grad():
for _, batch in enumerate(train_loader):
feature_idx, feature_value, label = batch
output = self.model(feature_idx, feature_value).squeeze()
loss = self.loss(output, label.float())
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

def evaluate(self):
outputs = []
targets = []
labels = []

valid_loader = get_from_registry(self.input_type, input_fn_registry)(
filepath=self.params['input']['valid_data'],
**self.params['training'])()

self.model.eval()
with torch.no_grad():
for _, batch in enumerate(valid_loader):
sample, target = batch
output = self.model(sample)
feature_idx, feature_value, label = batch
output = self.model(feature_idx, feature_value).squeeze()

outputs.append(output)
targets.append(target)
labels.append(label)

return self.metric(
torch.cat(targets, dim=0).cpu().numpy(),
torch.cat(labels, dim=0).cpu().numpy(),
torch.cat(outputs, dim=0).cpu().numpy())

def predict(self):
Expand All @@ -102,12 +104,12 @@ def predict(self):
test_loader = get_from_registry(self.input_type, input_fn_registry)(
filepath=self.params['input']['test_data'],
**self.params['training'])()

self.model.eval()
with torch.no_grad():
for _, batch in enumerate(test_loader):
sample, _ = batch
output = self.model(sample)
outputs.append(output)
feature_idx, feature_value, _ = batch
output = self.model(feature_idx, feature_value).squeeze()
outputs.append(torch.sigmoid(output))

return torch.cat(outputs, dim=0).cpu().numpy()

Expand Down Expand Up @@ -141,7 +143,7 @@ def save_checkpoint(self):
'optimizer': self.optimizer.state_dict()
}, buffer)
write_file(buffer,
path=os.path.join(
uri=os.path.join(
self.params['output']['save_model_dir'], 'ckpt.pkl'))

def model_fn(self, params):
Expand Down
41 changes: 37 additions & 4 deletions submarine-sdk/pysubmarine/submarine/utils/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,57 @@
from pyarrow.lib import NativeFile

import io

from urllib.parse import urlparse
from pathlib import Path
from typing import Tuple

def open_buffered_file_reader(
uri: str,
buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> io.BufferedReader:
try:
input_file = open_input_file(uri)
return io.BufferedReader(input_file, buffer_size=buffer_size)
except:
except Exception as e:
input_file.close()
raise e

def open_buffered_stream_writer(
uri: str,
buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> io.BufferedWriter:
try:
output_stream = open_output_stream(uri)
return io.BufferedWriter(output_stream, buffer_size=buffer_size)
except Exception as e:
output_stream.close()
raise e

def write_file(buffer: io.BytesIO,
uri: str,
buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> None:
with open_buffered_stream_writer(uri, buffer_size=buffer_size) as output_stream:
output_stream.write(buffer.getbuffer())

def open_input_file(uri: str) -> NativeFile:
filesystem, path = fs.FileSystem.from_uri(uri)
filesystem, path = _parse_uri(uri)
return filesystem.open_input_file(path)

def open_output_stream(uri: str) -> NativeFile:
filesystem, path = _parse_uri(uri)
return filesystem.open_output_stream(path)

def file_info(uri: str) -> fs.FileInfo:
filesystem, path = fs.FileSystem.from_uri(uri)
filesystem, path = _parse_uri(uri)
info, = filesystem.get_file_info([path])
return info

def _parse_uri(uri: str) -> Tuple[fs.FileSystem, str]:
parsed = urlparse(uri)
uri = uri if parsed.scheme else str(Path(parsed.path).expanduser().resolve())
filesystem, path = fs.FileSystem.from_uri(uri)
return filesystem, path






0 comments on commit 3f80bc6

Please sign in to comment.