Skip to content
This repository has been archived by the owner on Jul 10, 2024. It is now read-only.

SUBMARINE-561. [SDK] Add PyTorch implementation of AFM model #346

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions submarine-sdk/pysubmarine/example/pytorch/afm/afm.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"input": {
"train_data": "../../data/tr.libsvm",
"valid_data": "../../data/va.libsvm",
"test_data": "../../data/te.libsvm",
"type": "libsvm"
},
"output": {
"save_model_dir": "./output",
"metric": "roc_auc"
},
"training": {
"batch_size": 512,
"num_epochs": 3,
"log_steps": 10,
"num_threads": 2,
"num_gpus": 0,
"seed": 42,
"mode": "distributed",
"backend": "gloo"
},
"model": {
"name": "ctr.afm",
"kwargs": {
"num_fields": 39,
"num_features": 117581,
"attention_dim": 64,
"out_features": 1,
"embedding_dim": 256,
"hidden_units": [400, 400, 400],
"dropout_rate": 0.3
}
},
"loss": {
"name": "BCEWithLogitsLoss",
"kwargs": {}
},
"optimizer": {
"name": "adam",
"kwargs": {
"lr": 5e-4
}
},
"resource": {
"num_cpus": 4,
"num_gpus": 0,
"num_threads": 2
}
}
41 changes: 41 additions & 0 deletions submarine-sdk/pysubmarine/example/pytorch/afm/run_afm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from submarine.ml.pytorch.model.ctr import AFM

import argparse

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"-conf", help="a JSON configuration file for AFM", type=str)
parser.add_argument("-task_type", default='train',
help="train or evaluate, by default is train")
args = parser.parse_args()

trainer = AFM(json_path=args.conf)

if args.task_type == 'train':
trainer.fit()
print('[Train Done]')
elif args.task_type == 'evaluate':
score = trainer.evaluate()
print(f'Eval score: {score}')
elif args.task_type == 'predict':
pred = trainer.predict()
print('Predict:', pred)
else:
assert False, args.task_type
41 changes: 41 additions & 0 deletions submarine-sdk/pysubmarine/example/pytorch/afm/run_afm.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


export JAVA_HOME=${JAVA_HOME:-$HOME/workspace/app/java}
export HADOOP_HOME=${HADOOP_HOME:-$HADOOP_HDFS_HOME}
export CLASSPATH=${CLASSPATH:-`hdfs classpath --glob`}
export ARROW_LIBHDFS_DIR=${ARROW_LIBHDFS_DIR:-$HADOOP_HOME/lib/native}

# path to pysubmarine/submarine
PYTHONPATH=$HOME/workspace/submarine/submarine-sdk/pysubmarine

HADOOP_CONF_PATH=${HADOOP_CONF_PATH:-$HADOOP_CONF_DIR}

SUBMARINE_VERSION=0.5.0-SNAPSHOT
SUBMARINE_HADOOP_VERSION=2.9
SUBMARINE_JAR=/opt/submarine-dist-${SUBMARINE_VERSION}-hadoop-${SUBMARINE_HADOOP_VERSION}/submarine-dist-${SUBMARINE_VERSION}-hadoop-${SUBMARINE_HADOOP_VERSION}/submarine-all-${SUBMARINE_VERSION}-hadoop-${SUBMARINE_HADOOP_VERSION}.jar

java -cp $(${HADOOP_COMMON_HOME}/bin/hadoop classpath --glob):${SUBMARINE_JAR}:${HADOOP_CONF_PATH} \
org.apache.submarine.client.cli.Cli job run --name afm-job-001 \
--framework pytorch \
--verbose \
--input_path "" \
--num_workers 2 \
--worker_resources memory=1G,vcores=1 \
--worker_launch_cmd "JAVA_HOME=$JAVA_HOME HADOOP_HOME=$HADOOP_HOME CLASSPATH=$CLASSPATH ARROW_LIBHDFS_DIR=$ARROW_LIBHDFS_DIR PYTHONPATH=$PYTHONPATH sdk.zip/sdk/bin/python run_afm.py --conf ./afm.json --task_type train" \
--insecure \
--conf tony.containers.resources=sdk.zip#archive,${SUBMARINE_JAR},run_afm.py,afm.json

Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
{
"input": {
"train_data": "../data/tr.libsvm",
"valid_data": "../data/va.libsvm",
"test_data": "../data/te.libsvm",
"train_data": "../../data/tr.libsvm",
"valid_data": "../../data/va.libsvm",
"test_data": "../../data/te.libsvm",
"type": "libsvm"
},
"output": {
"save_model_dir": "./output",
"metric": "roc_auc"
},
"training": {
"batch_size": 64,
"num_epochs": 1,
"batch_size": 512,
"num_epochs": 3,
"log_steps": 10,
"num_threads": 0,
"num_threads": 2,
"num_gpus": 0,
"seed": 42,
"mode": "distributed",
Expand All @@ -22,12 +22,12 @@
"model": {
"name": "ctr.deepfm",
"kwargs": {
"field_dims": [15, 52, 30, 19, 111, 51, 26, 19, 53, 5, 13, 8, 23, 21, 77, 25, 39, 11,
8, 61, 15, 3, 34, 75, 30, 79, 11, 85, 37, 10, 94, 19, 5, 32, 6, 12, 42, 18, 23],
"num_fields": 39,
"num_features": 117581,
"out_features": 1,
"embedding_dim": 16,
"hidden_units": [400, 400],
"dropout_rates": [0.2, 0.2]
"embedding_dim": 256,
"hidden_units": [400, 400, 400],
"dropout_rates": [0.3, 0.3, 0.3]
}
},
"loss": {
Expand All @@ -37,12 +37,12 @@
"optimizer": {
"name": "adam",
"kwargs": {
"lr": 1e-3
"lr": 5e-4
}
},
"resource": {
"num_cpus": 4,
"num_gpus": 0,
"num_threads": 0
"num_threads": 2
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ java -cp $(${HADOOP_COMMON_HOME}/bin/hadoop classpath --glob):${SUBMARINE_JAR}:$
--input_path "" \
--num_workers 2 \
--worker_resources memory=1G,vcores=1 \
--worker_launch_cmd "JAVA_HOME=$JAVA_HOME HADOOP_HOME=$HADOOP_HOME CLASSPATH=$CLASSPATH ARROW_LIBHDFS_DIR=$ARROW_LIBHDFS_DIR PYTHONPATH=$PYTHONPATH sdk.zip/sdk/bin/python run_ctr.py --conf ./deepfm.json --task_type train" \
--worker_launch_cmd "JAVA_HOME=$JAVA_HOME HADOOP_HOME=$HADOOP_HOME CLASSPATH=$CLASSPATH ARROW_LIBHDFS_DIR=$ARROW_LIBHDFS_DIR PYTHONPATH=$PYTHONPATH sdk.zip/sdk/bin/python run_deepfm.py --conf ./deepfm.json --task_type train" \
--insecure \
--conf tony.containers.resources=sdk.zip#archive,${SUBMARINE_JAR},run_ctr.py,deepfm.json
--conf tony.containers.resources=sdk.zip#archive,${SUBMARINE_JAR},run_deepfm.py,deepfm.json

101 changes: 70 additions & 31 deletions submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,59 +13,98 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler

from submarine.utils.fileio import read_file
from submarine.utils.fileio import open_buffered_file_reader, file_info

import os
import itertools
import functools
import multiprocessing as mp
from typing import List, Tuple


class LIBSVMDataset(Dataset):

def __init__(self, path):
self.data, self.label = self.preprocess_data(read_file(path))
def __init__(self, data_uri: str, sample_offset: np.ndarray):
self.data_uri = data_uri
self.sample_offset = sample_offset

def __len__(self) -> int:
return len(self.sample_offset)

def __getitem__(self, idx) -> Tuple[torch.Tensor, torch.Tensor, int]:
with open_buffered_file_reader(self.data_uri) as infile:
infile.seek(self.sample_offset[idx], os.SEEK_SET)
sample = infile.readline()
return LIBSVMDataset.parse_sample(sample)

def __getitem__(self, idx):
return self.data.iloc[idx], self.label.iloc[idx]
@classmethod
def parse_sample(cls,
sample: bytes) -> Tuple[torch.Tensor, torch.Tensor, int]:
label, *entries = sample.rstrip(b'\n').split(b' ')
feature_idx = torch.zeros(len(entries), dtype=torch.long)
feature_value = torch.zeros(len(entries), dtype=torch.float)
for i, entry in enumerate(entries):
fidx, fvalue = entry.split(b':')
feature_idx[i], feature_value[i] = int(fidx), float(fvalue)
return feature_idx, feature_value, int(label)

def __len__(self):
return len(self.data)
@classmethod
def prepare_dataset(cls, data_uri: str, n_jobs: int = os.cpu_count()):
sample_offset = LIBSVMDataset._locate_sample_offsets(data_uri=data_uri,
n_jobs=n_jobs)
return LIBSVMDataset(data_uri=data_uri, sample_offset=sample_offset)

def preprocess_data(self, stream):
@classmethod
def _locate_sample_offsets(cls, data_uri: str, n_jobs: int) -> np.ndarray:
finfo = file_info(data_uri)
chunk_size, _ = divmod(finfo.size, n_jobs)

def _convert_line(line):
feat_ids = []
feat_vals = []
for x in line:
feat_id, feat_val = x.split(':')
feat_ids.append(int(feat_id))
feat_vals.append(float(feat_val))
return (torch.as_tensor(feat_ids, dtype=torch.int64),
torch.as_tensor(feat_vals, dtype=torch.float32))
chunk_starts = [0]
with open_buffered_file_reader(data_uri) as infile:
while infile.tell() < finfo.size:
infile.seek(chunk_size, os.SEEK_CUR)
infile.readline()
chunk_starts.append(min(infile.tell(), finfo.size))

df = pd.read_table(stream, header=None)
df = df.loc[:, 0].str.split(n=1, expand=True)
label = df.loc[:, 0].apply(int)
data = df.loc[:, 1].str.split().apply(_convert_line)
return data, label
with mp.Pool(processes=n_jobs,
maxtasksperchild=1) as pool:
return np.asarray(
list(
itertools.chain.from_iterable(
pool.imap(functools.partial(
LIBSVMDataset._locate_sample_offsets_job, data_uri),
iterable=enumerate(
zip(chunk_starts[:-1],
chunk_starts[1:]))))))

def collate_fn(self, batch):
data, label = tuple(zip(*batch))
_, feat_val = tuple(zip(*data))
return (torch.stack(feat_val, dim=0).type(torch.long),
torch.as_tensor(label, dtype=torch.float32).unsqueeze(dim=-1))
@classmethod
def _locate_sample_offsets_job(
cls, data_uri: str, task: Tuple[int, Tuple[int, int]]) -> List[int]:
_, (start, end) = task
offsets = [start]
with open_buffered_file_reader(data_uri) as infile:
infile.seek(start, os.SEEK_SET)
while infile.tell() < end:
infile.readline()
offsets.append(infile.tell())
assert offsets.pop() == end
return offsets


def libsvm_input_fn(filepath, batch_size=256, num_threads=1, **kwargs):

def _input_fn():
dataset = LIBSVMDataset(filepath)
dataset = LIBSVMDataset.prepare_dataset(data_uri=filepath,
n_jobs=num_threads)
sampler = DistributedSampler(dataset)
return DataLoader(dataset=dataset,
batch_size=batch_size,
sampler=sampler,
num_workers=num_threads,
collate_fn=dataset.collate_fn)
num_workers=0) # should be 0 (pytorch bug)

return _input_fn
Loading