Skip to content
This repository has been archived by the owner on Jul 10, 2024. It is now read-only.

Commit

Permalink
SUBMARINE-561. [SDK] Add PyTorch implementation of AFM model
Browse files Browse the repository at this point in the history
### What is this PR for?
Add PyTorch implementation of Attentional Factorization Machine for CTR prediction. ([AFM](https://arxiv.org/pdf/1708.04617.pdf))
Make minor modifications to the PyTorch training flow.
Add testing for the AFM model.

### What type of PR is it?
[Improvement]

### Todos
* [ ] - Task

### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-561

### How should this be tested?
[python-sdk](https://github.com/andrewhsiehth/submarine/actions/runs/169985131)
[Submarine](https://github.com/andrewhsiehth/submarine/actions/runs/169985125)

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Andrew Hsieh <andrewhsiehth@gmail.com>
Author: andrewhsiehth <andrewhsiehth@gmail.com>

Closes #346 from andrewhsiehth/SUBMARINE-561 and squashes the following commits:

0521639 [andrewhsiehth] rename afm && refactor example/pytorch folder
f98d59f [andrewhsiehth] mkdir for non-existing output directory
3057899 [andrewhsiehth] use pysubmarine-ci to auto-format
f89d070 [Andrew Hsieh] python3.6 yapf
d4d93c4 [Andrew Hsieh] try to make python3.5 happy
2929dfc [Andrew Hsieh] try to make codestyle checker happy v2
42d5091 [Andrew Hsieh] try to make codestyle checker happy
9ff2f8d [Andrew Hsieh] fix core, afm coding style
adae613 [Andrew Hsieh] fix tqdm
4facbce [Andrew Hsieh] fix conftest.py coding style
e4b3e50 [Andrew Hsieh] fix deepfm.py coding style
cb6be07 [Andrew Hsieh] fix ctr.__init__ coding style
2b4eecf [Andrew Hsieh] fix base_pytorch_model coding style
573a4e8 [Andrew Hsieh] fix fileio coding style
5d6dfc0 [Andrew Hsieh] add afm testing
827c785 [Andrew Hsieh] update conftest
b260042 [Andrew Hsieh] add afm example
a7da1c3 [Andrew Hsieh] add afm to ctr
ab7b4b7 [Andrew Hsieh] add afm
fa151e5 [Andrew Hsieh] fix deepfm
380358c [Andrew Hsieh] fix testing
3f80bc6 [Andrew Hsieh] fix fileio
7471408 [Andrew Hsieh] fix data input_fn and fileio
f57d732 [Andrew Hsieh] fix deepfm
fdcda05 [Andrew Hsieh] fix layers/core.py
ce535fc [Andrew Hsieh] fix optimizer zero_grad
  • Loading branch information
ifndef012 authored and xunliu committed Jul 27, 2020
1 parent eb4837d commit e580802
Show file tree
Hide file tree
Showing 15 changed files with 444 additions and 150 deletions.
49 changes: 49 additions & 0 deletions submarine-sdk/pysubmarine/example/pytorch/afm/afm.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"input": {
"train_data": "../../data/tr.libsvm",
"valid_data": "../../data/va.libsvm",
"test_data": "../../data/te.libsvm",
"type": "libsvm"
},
"output": {
"save_model_dir": "./output",
"metric": "roc_auc"
},
"training": {
"batch_size": 512,
"num_epochs": 3,
"log_steps": 10,
"num_threads": 2,
"num_gpus": 0,
"seed": 42,
"mode": "distributed",
"backend": "gloo"
},
"model": {
"name": "ctr.afm",
"kwargs": {
"num_fields": 39,
"num_features": 117581,
"attention_dim": 64,
"out_features": 1,
"embedding_dim": 256,
"hidden_units": [400, 400, 400],
"dropout_rate": 0.3
}
},
"loss": {
"name": "BCEWithLogitsLoss",
"kwargs": {}
},
"optimizer": {
"name": "adam",
"kwargs": {
"lr": 5e-4
}
},
"resource": {
"num_cpus": 4,
"num_gpus": 0,
"num_threads": 2
}
}
41 changes: 41 additions & 0 deletions submarine-sdk/pysubmarine/example/pytorch/afm/run_afm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from submarine.ml.pytorch.model.ctr import AFM

import argparse

if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"-conf", help="a JSON configuration file for AFM", type=str)
parser.add_argument("-task_type", default='train',
help="train or evaluate, by default is train")
args = parser.parse_args()

trainer = AFM(json_path=args.conf)

if args.task_type == 'train':
trainer.fit()
print('[Train Done]')
elif args.task_type == 'evaluate':
score = trainer.evaluate()
print(f'Eval score: {score}')
elif args.task_type == 'predict':
pred = trainer.predict()
print('Predict:', pred)
else:
assert False, args.task_type
41 changes: 41 additions & 0 deletions submarine-sdk/pysubmarine/example/pytorch/afm/run_afm.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


export JAVA_HOME=${JAVA_HOME:-$HOME/workspace/app/java}
export HADOOP_HOME=${HADOOP_HOME:-$HADOOP_HDFS_HOME}
export CLASSPATH=${CLASSPATH:-`hdfs classpath --glob`}
export ARROW_LIBHDFS_DIR=${ARROW_LIBHDFS_DIR:-$HADOOP_HOME/lib/native}

# path to pysubmarine/submarine
PYTHONPATH=$HOME/workspace/submarine/submarine-sdk/pysubmarine

HADOOP_CONF_PATH=${HADOOP_CONF_PATH:-$HADOOP_CONF_DIR}

SUBMARINE_VERSION=0.5.0-SNAPSHOT
SUBMARINE_HADOOP_VERSION=2.9
SUBMARINE_JAR=/opt/submarine-dist-${SUBMARINE_VERSION}-hadoop-${SUBMARINE_HADOOP_VERSION}/submarine-dist-${SUBMARINE_VERSION}-hadoop-${SUBMARINE_HADOOP_VERSION}/submarine-all-${SUBMARINE_VERSION}-hadoop-${SUBMARINE_HADOOP_VERSION}.jar

java -cp $(${HADOOP_COMMON_HOME}/bin/hadoop classpath --glob):${SUBMARINE_JAR}:${HADOOP_CONF_PATH} \
org.apache.submarine.client.cli.Cli job run --name afm-job-001 \
--framework pytorch \
--verbose \
--input_path "" \
--num_workers 2 \
--worker_resources memory=1G,vcores=1 \
--worker_launch_cmd "JAVA_HOME=$JAVA_HOME HADOOP_HOME=$HADOOP_HOME CLASSPATH=$CLASSPATH ARROW_LIBHDFS_DIR=$ARROW_LIBHDFS_DIR PYTHONPATH=$PYTHONPATH sdk.zip/sdk/bin/python run_afm.py --conf ./afm.json --task_type train" \
--insecure \
--conf tony.containers.resources=sdk.zip#archive,${SUBMARINE_JAR},run_afm.py,afm.json

Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
{
"input": {
"train_data": "../data/tr.libsvm",
"valid_data": "../data/va.libsvm",
"test_data": "../data/te.libsvm",
"train_data": "../../data/tr.libsvm",
"valid_data": "../../data/va.libsvm",
"test_data": "../../data/te.libsvm",
"type": "libsvm"
},
"output": {
"save_model_dir": "./output",
"metric": "roc_auc"
},
"training": {
"batch_size": 64,
"num_epochs": 1,
"batch_size": 512,
"num_epochs": 3,
"log_steps": 10,
"num_threads": 0,
"num_threads": 2,
"num_gpus": 0,
"seed": 42,
"mode": "distributed",
Expand All @@ -22,12 +22,12 @@
"model": {
"name": "ctr.deepfm",
"kwargs": {
"field_dims": [15, 52, 30, 19, 111, 51, 26, 19, 53, 5, 13, 8, 23, 21, 77, 25, 39, 11,
8, 61, 15, 3, 34, 75, 30, 79, 11, 85, 37, 10, 94, 19, 5, 32, 6, 12, 42, 18, 23],
"num_fields": 39,
"num_features": 117581,
"out_features": 1,
"embedding_dim": 16,
"hidden_units": [400, 400],
"dropout_rates": [0.2, 0.2]
"embedding_dim": 256,
"hidden_units": [400, 400, 400],
"dropout_rates": [0.3, 0.3, 0.3]
}
},
"loss": {
Expand All @@ -37,12 +37,12 @@
"optimizer": {
"name": "adam",
"kwargs": {
"lr": 1e-3
"lr": 5e-4
}
},
"resource": {
"num_cpus": 4,
"num_gpus": 0,
"num_threads": 0
"num_threads": 2
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ java -cp $(${HADOOP_COMMON_HOME}/bin/hadoop classpath --glob):${SUBMARINE_JAR}:$
--input_path "" \
--num_workers 2 \
--worker_resources memory=1G,vcores=1 \
--worker_launch_cmd "JAVA_HOME=$JAVA_HOME HADOOP_HOME=$HADOOP_HOME CLASSPATH=$CLASSPATH ARROW_LIBHDFS_DIR=$ARROW_LIBHDFS_DIR PYTHONPATH=$PYTHONPATH sdk.zip/sdk/bin/python run_ctr.py --conf ./deepfm.json --task_type train" \
--worker_launch_cmd "JAVA_HOME=$JAVA_HOME HADOOP_HOME=$HADOOP_HOME CLASSPATH=$CLASSPATH ARROW_LIBHDFS_DIR=$ARROW_LIBHDFS_DIR PYTHONPATH=$PYTHONPATH sdk.zip/sdk/bin/python run_deepfm.py --conf ./deepfm.json --task_type train" \
--insecure \
--conf tony.containers.resources=sdk.zip#archive,${SUBMARINE_JAR},run_ctr.py,deepfm.json
--conf tony.containers.resources=sdk.zip#archive,${SUBMARINE_JAR},run_deepfm.py,deepfm.json

101 changes: 70 additions & 31 deletions submarine-sdk/pysubmarine/submarine/ml/pytorch/input/libsvm_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,59 +13,98 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler

from submarine.utils.fileio import read_file
from submarine.utils.fileio import open_buffered_file_reader, file_info

import os
import itertools
import functools
import multiprocessing as mp
from typing import List, Tuple


class LIBSVMDataset(Dataset):

def __init__(self, path):
self.data, self.label = self.preprocess_data(read_file(path))
def __init__(self, data_uri: str, sample_offset: np.ndarray):
self.data_uri = data_uri
self.sample_offset = sample_offset

def __len__(self) -> int:
return len(self.sample_offset)

def __getitem__(self, idx) -> Tuple[torch.Tensor, torch.Tensor, int]:
with open_buffered_file_reader(self.data_uri) as infile:
infile.seek(self.sample_offset[idx], os.SEEK_SET)
sample = infile.readline()
return LIBSVMDataset.parse_sample(sample)

def __getitem__(self, idx):
return self.data.iloc[idx], self.label.iloc[idx]
@classmethod
def parse_sample(cls,
sample: bytes) -> Tuple[torch.Tensor, torch.Tensor, int]:
label, *entries = sample.rstrip(b'\n').split(b' ')
feature_idx = torch.zeros(len(entries), dtype=torch.long)
feature_value = torch.zeros(len(entries), dtype=torch.float)
for i, entry in enumerate(entries):
fidx, fvalue = entry.split(b':')
feature_idx[i], feature_value[i] = int(fidx), float(fvalue)
return feature_idx, feature_value, int(label)

def __len__(self):
return len(self.data)
@classmethod
def prepare_dataset(cls, data_uri: str, n_jobs: int = os.cpu_count()):
sample_offset = LIBSVMDataset._locate_sample_offsets(data_uri=data_uri,
n_jobs=n_jobs)
return LIBSVMDataset(data_uri=data_uri, sample_offset=sample_offset)

def preprocess_data(self, stream):
@classmethod
def _locate_sample_offsets(cls, data_uri: str, n_jobs: int) -> np.ndarray:
finfo = file_info(data_uri)
chunk_size, _ = divmod(finfo.size, n_jobs)

def _convert_line(line):
feat_ids = []
feat_vals = []
for x in line:
feat_id, feat_val = x.split(':')
feat_ids.append(int(feat_id))
feat_vals.append(float(feat_val))
return (torch.as_tensor(feat_ids, dtype=torch.int64),
torch.as_tensor(feat_vals, dtype=torch.float32))
chunk_starts = [0]
with open_buffered_file_reader(data_uri) as infile:
while infile.tell() < finfo.size:
infile.seek(chunk_size, os.SEEK_CUR)
infile.readline()
chunk_starts.append(min(infile.tell(), finfo.size))

df = pd.read_table(stream, header=None)
df = df.loc[:, 0].str.split(n=1, expand=True)
label = df.loc[:, 0].apply(int)
data = df.loc[:, 1].str.split().apply(_convert_line)
return data, label
with mp.Pool(processes=n_jobs,
maxtasksperchild=1) as pool:
return np.asarray(
list(
itertools.chain.from_iterable(
pool.imap(functools.partial(
LIBSVMDataset._locate_sample_offsets_job, data_uri),
iterable=enumerate(
zip(chunk_starts[:-1],
chunk_starts[1:]))))))

def collate_fn(self, batch):
data, label = tuple(zip(*batch))
_, feat_val = tuple(zip(*data))
return (torch.stack(feat_val, dim=0).type(torch.long),
torch.as_tensor(label, dtype=torch.float32).unsqueeze(dim=-1))
@classmethod
def _locate_sample_offsets_job(
cls, data_uri: str, task: Tuple[int, Tuple[int, int]]) -> List[int]:
_, (start, end) = task
offsets = [start]
with open_buffered_file_reader(data_uri) as infile:
infile.seek(start, os.SEEK_SET)
while infile.tell() < end:
infile.readline()
offsets.append(infile.tell())
assert offsets.pop() == end
return offsets


def libsvm_input_fn(filepath, batch_size=256, num_threads=1, **kwargs):

def _input_fn():
dataset = LIBSVMDataset(filepath)
dataset = LIBSVMDataset.prepare_dataset(data_uri=filepath,
n_jobs=num_threads)
sampler = DistributedSampler(dataset)
return DataLoader(dataset=dataset,
batch_size=batch_size,
sampler=sampler,
num_workers=num_threads,
collate_fn=dataset.collate_fn)
num_workers=0) # should be 0 (pytorch bug)

return _input_fn
Loading

0 comments on commit e580802

Please sign in to comment.