Skip to content
This repository has been archived by the owner on Jul 10, 2024. It is now read-only.

Commit

Permalink
fix data input_fn and fileio
Browse files Browse the repository at this point in the history
  • Loading branch information
ifndef012 committed Jul 18, 2020
1 parent f57d732 commit 7471408
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 80 deletions.
110 changes: 79 additions & 31 deletions submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,59 +13,107 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler

from submarine.utils.fileio import read_file
from submarine.utils.fileio import open_buffered_file_reader, file_info

from tqdm.auto import tqdm

import os
import itertools
import functools
import multiprocessing as mp
from typing import List, Tuple


class LIBSVMDataset(Dataset):

def __init__(self, path):
self.data, self.label = self.preprocess_data(read_file(path))
def __init__(self, data_uri: str, sample_offset: np.ndarray):
self.data_uri = data_uri
self.sample_offset = sample_offset

def __len__(self) -> int:
return len(self.sample_offset)

def __getitem__(self, idx) -> Tuple[torch.Tensor, torch.Tensor, int]:
with open_buffered_file_reader(self.data_uri) as infile:
infile.seek(self.sample_offset[idx], io.SEEK_SET)
sample = infile.readline()
return LIBSVMDataset.parse_sample(sample)

def __getitem__(self, idx):
return self.data.iloc[idx], self.label.iloc[idx]
@classmethod
def parse_sample(cls,
sample: bytes) -> Tuple[torch.Tensor, torch.Tensor, int]:
label, *entries = sample.rstrip(b'\n').split(b' ')
feature_idx = torch.zeros(len(entries), dtype=torch.long)
feature_value = torch.zeros(len(entries), dtype=torch.float)
for i, entry in enumerate(entries):
fidx, fvalue = entry.split(b':')
feature_idx[i], feature_value[i] = int(fidx), float(fvalue)
return feature_idx, feature_value, int(label)

def __len__(self):
return len(self.data)
@classmethod
def prepare_dataset(cls, data_uri: str, n_jobs: int = os.cpu_count()):
sample_offset = LIBSVMDataset._locate_sample_offsets(data_uri=data_uri,
n_jobs=n_jobs)
return LIBSVMDataset(data_uri=data_uri, sample_offset=sample_offset)

def preprocess_data(self, stream):
@classmethod
def _locate_sample_offsets(cls, data_uri: str, n_jobs: int) -> np.ndarray:
finfo = file_info(data_uri)
chunk_size, _ = divmod(finfo.size, n_jobs)

def _convert_line(line):
feat_ids = []
feat_vals = []
for x in line:
feat_id, feat_val = x.split(':')
feat_ids.append(int(feat_id))
feat_vals.append(float(feat_val))
return (torch.as_tensor(feat_ids, dtype=torch.int64),
torch.as_tensor(feat_vals, dtype=torch.float32))
chunk_starts = [0]
with open_buffered_file_reader(data_uri) as infile:
while infile.tell() < finfo.size:
infile.seek(chunk_size, os.SEEK_CUR)
infile.readline()
chunk_starts.append(min(infile.tell(), finfo.size))

df = pd.read_table(stream, header=None)
df = df.loc[:, 0].str.split(n=1, expand=True)
label = df.loc[:, 0].apply(int)
data = df.loc[:, 1].str.split().apply(_convert_line)
return data, label
with mp.Pool(processes=n_jobs,
initializer=tqdm.set_lock,
initargs=(tqdm.get_lock(),),
maxtasksperchild=1) as pool:
return np.asarray(
list(
itertools.chain.from_iterable(
pool.imap(functools.partial(
LIBSVMDataset._locate_sample_offsets_job, data_uri),
iterable=enumerate(
zip(chunk_starts[:-1],
chunk_starts[1:]))))))

def collate_fn(self, batch):
data, label = tuple(zip(*batch))
_, feat_val = tuple(zip(*data))
return (torch.stack(feat_val, dim=0).type(torch.long),
torch.as_tensor(label, dtype=torch.float32).unsqueeze(dim=-1))
@classmethod
def _locate_sample_offsets_job(
cls, data_uri: str, task: Tuple[int, Tuple[int, int]]) -> List[int]:
job_id, (start, end) = task
offsets = [start]
with open_buffered_file_reader(data_uri) as infile:
infile.seek(start, os.SEEK_SET)
with tqdm(total=None,
desc=f'[Loacate Sample Offsets] job: {job_id}',
position=job_id,
disable=('DISABLE_TQDM' in os.environ)) as pbar:
while infile.tell() < end:
infile.readline()
offsets.append(infile.tell())
pbar.update()
assert offsets.pop() == end
return offsets


def libsvm_input_fn(filepath, batch_size=256, num_threads=1, **kwargs):

def _input_fn():
dataset = LIBSVMDataset(filepath)
dataset = LIBSVMDataset.prepare_dataset(data_uri=filepath,
n_jobs=num_threads)
sampler = DistributedSampler(dataset)
return DataLoader(dataset=dataset,
batch_size=batch_size,
sampler=sampler,
num_workers=num_threads,
collate_fn=dataset.collate_fn)
num_workers=0) # should be 0 (pytorch bug)

return _input_fn
66 changes: 17 additions & 49 deletions submarine-sdk/pysubmarine/submarine/utils/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,60 +13,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import io
import os
from enum import Enum
from urllib.parse import urlparse

from pyarrow import fs
from pyarrow.lib import NativeFile


class _Scheme(Enum):
HDFS = 'hdfs'
FILE = 'file'
DEFAULT = ''


def read_file(path):
scheme, host, port, path = _parse_path(path)
if _Scheme(scheme) is _Scheme.HDFS:
return _read_hdfs(host=host, port=port, path=path)
else:
return _read_local(path=path)


def write_file(buffer, path):
scheme, host, port, path = _parse_path(path)
if _Scheme(scheme) is _Scheme.HDFS:
_write_hdfs(buffer=buffer, host=host, port=port, path=path)
else:
_write_local(buffer=buffer, path=path)


def _parse_path(path):
p = urlparse(path)
return p.scheme, p.hostname, p.port, os.path.abspath(p.path)


def _read_hdfs(host, port, path):
hdfs = fs.HadoopFileSystem(host=host, port=port)
with hdfs.open_input_stream(path) as stream:
data = stream.read()
return io.BytesIO(data)
import io


def _read_local(path):
with open(path, mode='rb') as f:
data = f.read()
return io.BytesIO(data)
def open_buffered_file_reader(
uri: str,
buffer_size: int = io.DEFAULT_BUFFER_SIZE) -> io.BufferedReader:
try:
input_file = open_input_file(uri)
return io.BufferedReader(input_file, buffer_size=buffer_size)
except:
input_file.close()


def _write_hdfs(buffer, host, port, path):
hdfs = fs.HadoopFileSystem(host=host, port=port)
with hdfs.open_output_stream(path) as stream:
stream.write(buffer.getbuffer())
def open_input_file(uri: str) -> NativeFile:
filesystem, path = fs.FileSystem.from_uri(uri)
return filesystem.open_input_file(path)


def _write_local(buffer, path):
with open(path, mode='wb') as f:
f.write(buffer.getbuffer())
def file_info(uri: str) -> fs.FileInfo:
filesystem, path = fs.FileSystem.from_uri(uri)
info, = filesystem.get_file_info([path])
return info

0 comments on commit 7471408

Please sign in to comment.