Skip to content

Commit

Permalink
adjust data gen and feed logic (PaddlePaddle#126)
Browse files Browse the repository at this point in the history
* test

* adjust data gen and feed logic

* reset run_on_ipu to True

* replace 1 with micro_bs
  • Loading branch information
XBWGC committed Sep 3, 2021
1 parent 5041b50 commit da525e3
Showing 1 changed file with 55 additions and 49 deletions.
104 changes: 55 additions & 49 deletions python/paddle/fluid/tests/unittests/ipu/ernie_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# refrenece : https://github.com/PaddlePaddle/PaddleNLP/tree/develop/examples/language_model/ernie

import os
import copy
import argparse
from contextlib import contextmanager
from functools import partial
Expand Down Expand Up @@ -738,15 +739,15 @@ def get_task_output(self, task, task_labels):
parser.add_argument(
"--num_ipus", type=int, default=2, help="Number of ipus")
parser.add_argument(
"--enable_pipelining", type=bool, default=True, help="Pipelining")
"--enable_pipelining", type=bool, default=False, help="Pipelining")
parser.add_argument(
"--save_model", type=bool, default=False, help="Save model or not")
parser.add_argument(
"--model_path", type=str, default="ernie", help="Save model to where")
parser.add_argument(
"--model_name", type=str, default="ernie", help="Save model name")
parser.add_argument(
"--run_steps", type=int, default=10, help="Number steps exe.run()")
"--ipu_run_steps", type=int, default=10, help="Number steps exe.run()")
parser.add_argument(
"--export_ops", type=bool, default=False, help="Export ops to ops.txt")
parser.add_argument(
Expand All @@ -758,21 +759,30 @@ def get_task_output(self, task, task_labels):
paddle.static.default_startup_program().random_seed = SEED
paddle.static.default_main_program().random_seed = SEED

# IPU doesn't support int64, so we change here
INT_DTYPE = "int32" if args.run_on_ipu else "int64"

# input data
seq = ernie_config["seq_len"]
# paddle input placeholder, batch_size = 1
micro_bs = 1
seq_len = ernie_config["seq_len"]
input_shape = [micro_bs, seq_len, 1]
input_fields = {
'names': [
'src_ids', 'sent_ids', 'pos_ids', 'input_mask', 'mask_label',
'mask_pos'
'src_ids', 'sent_ids', 'pos_ids',
'input_mask', 'mask_label', 'mask_pos'
],
'shapes':
[[1, seq, 1], [1, seq, 1], [1, seq, 1], [1, seq, 1], [1, 1], [1, 1]],
'dtypes':
[INT_DTYPE, INT_DTYPE, INT_DTYPE, 'float32', INT_DTYPE, INT_DTYPE],
'shapes': [
input_shape, input_shape, input_shape,
input_shape, [micro_bs, 1], [micro_bs, 1]],
'dtypes': [
INT_DTYPE, INT_DTYPE, INT_DTYPE,
'float32', INT_DTYPE, INT_DTYPE],
'range': [
[0, seq_len], [0, 4], [0, seq_len],
None, [0, seq_len], [0, seq_len]],
'lod_levels': [0, 0, 0, 0, 0, 0],
}

inputs = [
fluid.data(
name=input_fields['names'][i],
Expand All @@ -781,29 +791,38 @@ def get_task_output(self, task, task_labels):
lod_level=input_fields['lod_levels'][i])
for i in range(len(input_fields['names']))
]

# total_samples: assum disable pipelining
batches_per_step = 1
if args.enable_pipelining:
batches_per_step = \
((args.num_ipus+1) if args.is_training else args.num_ipus)
total_samples = args.ipu_run_steps * batches_per_step

total_steps = args.ipu_run_steps
if not args.run_on_ipu: # run on cpu
total_steps = total_samples // micro_bs

# synthetic data
np_inputs = []
for i in range(len(input_fields['names'])):
if input_fields['names'][i] == 'input_mask':
field_name = input_fields['names'][i]
if field_name == 'input_mask':
src_ids = np_inputs[0]
dtype = input_fields['dtypes'][i]
data = np.where(src_ids > 0,
np.ones_like(src_ids),
np.zeros_like(src_ids)).astype(dtype)
else:
shape = copy.copy(input_fields['shapes'][i])
shape[0] = total_samples
min_val, max_val = input_fields['range'][i]
data = np.random.randint(
0,
4,
input_fields['shapes'][i],
min_val, max_val, shape,
dtype=input_fields['dtypes'][i])

if args.run_on_ipu and args.enable_pipelining \
and input_fields['names'][i] != 'input_mask':
if args.is_training:
data = np.tile(data, [args.num_ipus + 1, 1, 1])
else:
data = np.tile(data, [args.num_ipus, 1, 1])
np_inputs.append(data)

# paddle input placeholder
(src_ids, sent_ids, pos_ids, input_mask, mask_label, mask_pos) = inputs

# ernie model
Expand All @@ -823,23 +842,24 @@ def get_task_output(self, task, task_labels):
place = paddle.CPUPlace()
executor = paddle.static.Executor(place)

# graph
# feed & fetch list
if args.is_training:
feed_list = input_fields['names']
else:
feed_list = input_fields['names'][:4]
fetch_list = [fetch_node.name]

# program
startup_prog = paddle.static.default_startup_program()
executor.run(startup_prog)

main_prog = paddle.static.default_main_program()

if args.run_on_ipu:
ipu_strategy = compiler.get_ipu_strategy()
ipu_strategy.num_ipus = args.num_ipus
ipu_strategy.enable_manual_shard = args.num_ipus > 1
# RuntimeError: Copying to tensor that is not writable
# https://graphcore.zendesk.com/agent/tickets/3208
ipu_strategy.enable_pipelining = args.enable_pipelining
if args.enable_pipelining:
if args.is_training:
Expand All @@ -853,35 +873,21 @@ def get_task_output(self, task, task_labels):
else:
program = main_prog

# train
if args.is_training:
feed_dict = {
src_ids.name: np_inputs[0],
sent_ids.name: np_inputs[1],
pos_ids.name: np_inputs[2],
input_mask.name: np_inputs[3],
mask_label.name: np_inputs[4],
mask_pos.name: np_inputs[5]
}
else:
# executor run
results = []
for i in range(total_steps):
start = i * (batches_per_step if args.run_on_ipu else 1)
end = start + (batches_per_step if args.run_on_ipu else 1)
feed_dict = {
src_ids.name: np_inputs[0],
sent_ids.name: np_inputs[1],
pos_ids.name: np_inputs[2],
input_mask.name: np_inputs[3]
src_ids.name : np_inputs[0][start : end],
sent_ids.name: np_inputs[1][start : end],
pos_ids.name: np_inputs[2][start : end],
input_mask.name: np_inputs[3][start : end]
}
if args.is_training:
feed_dict[mask_label.name] = np_inputs[4][start : end]
feed_dict[mask_pos.name] = np_inputs[5][start : end]

# enable_pipelining vs. disable_pipelining
total_steps = args.run_steps
if args.run_on_ipu and not args.enable_pipelining or not args.run_on_ipu:
if args.is_training: # training: num_stages = num_ipus + 1
total_steps *= (args.num_ipus + 1)
else: # inference: num_stages = num_ipus
total_steps *= args.num_ipus


results = []
for i in range(total_steps):
res = executor.run(program, feed=feed_dict, fetch_list=[fetch_node])
results.append(res)

Expand Down

0 comments on commit da525e3

Please sign in to comment.