Skip to content

Commit

Permalink
【PIR API adaptor No.22】Migrate paddle.nn.functional.bilinear into pir (
Browse files Browse the repository at this point in the history
  • Loading branch information
GreatV authored and SecretXV committed Nov 28, 2023
1 parent b818b6c commit 88877ba
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 105 deletions.
2 changes: 1 addition & 1 deletion python/paddle/nn/functional/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ def bilinear(x1, x2, weight, bias=None, name=None):
[5, 1000]
"""

if in_dynamic_mode():
if in_dynamic_or_pir_mode():
return _C_ops.bilinear(x1, x2, weight, bias)
else:
check_variable_and_dtype(x1, 'x1', ['float32', 'float64'], 'bilinear')
Expand Down
12 changes: 7 additions & 5 deletions test/legacy_test/test_bilinear_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import paddle
from paddle import base
from paddle.base import core
from paddle.pir_utils import test_with_pir_api


class TestBilinearAPI(unittest.TestCase):
@test_with_pir_api
def test_api(self):
with base.program_guard(
base.default_startup_program(), base.default_main_program()
):
main = paddle.static.Program()
startup = paddle.static.Program()
with paddle.static.program_guard(startup, main):
if core.is_compiled_with_cuda():
place = core.CUDAPlace(0)
else:
Expand All @@ -43,9 +45,9 @@ def test_api(self):
)
ret = bilinear(data1, data2)

exe.run(base.default_startup_program())
exe.run(main)
ret_fetch = exe.run(
feed={'X1': layer1, 'X2': layer2}, fetch_list=[ret.name]
feed={'X1': layer1, 'X2': layer2}, fetch_list=[ret]
)
self.assertEqual(ret_fetch[0].shape, (5, 1000))

Expand Down
230 changes: 131 additions & 99 deletions test/legacy_test/test_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,116 +498,148 @@ def test_type():
self.assertRaises(TypeError, test_type)

def test_bilinear_tensor_product(self):
inp_np_x = np.array([[1, 2, 3]]).astype('float32')
inp_np_y = np.array([[4, 5, 6]]).astype('float32')
def _test_static_specific(inp_np_x, inp_np_y):
with self.static_graph():
data_x = paddle.static.data(
name='x', shape=[1, 3], dtype="float32"
)
data_y = paddle.static.data(
name='y', shape=[1, 3], dtype="float32"
)
out = paddle.static.nn.common.bilinear_tensor_product(
data_x,
data_y,
6,
bias_attr=paddle.nn.initializer.Constant(value=1),
act='sigmoid',
)

with self.static_graph():
data_x = paddle.static.data(name='x', shape=[1, 3], dtype="float32")
data_y = paddle.static.data(name='y', shape=[1, 3], dtype="float32")
out = paddle.static.nn.common.bilinear_tensor_product(
data_x,
data_y,
6,
bias_attr=paddle.nn.initializer.Constant(value=1),
act='sigmoid',
)
static_rlt = self.get_static_graph_result(
feed={'x': inp_np_x, 'y': inp_np_y}, fetch_list=[out]
)[0]

static_rlt = self.get_static_graph_result(
feed={'x': inp_np_x, 'y': inp_np_y}, fetch_list=[out]
)[0]
return static_rlt

with self.static_graph():
data_x = paddle.static.data(name='x', shape=[1, 3], dtype="float32")
data_y = paddle.static.data(name='y', shape=[1, 3], dtype="float32")
btp = paddle.nn.Bilinear(
3,
3,
6,
bias_attr=paddle.nn.initializer.Constant(value=1),
)
out = btp(data_x, data_y)
out = paddle.nn.functional.sigmoid(out)
static_rlt2 = self.get_static_graph_result(
feed={'x': inp_np_x, 'y': inp_np_y}, fetch_list=[out]
)[0]
with self.dynamic_graph():
btp = paddle.nn.Bilinear(
3,
3,
6,
bias_attr=paddle.nn.initializer.Constant(value=1),
)
dy_rlt = btp(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
dy_rlt = paddle.nn.functional.sigmoid(dy_rlt)
dy_rlt_value = dy_rlt.numpy()
def _test_static(inp_np_x, inp_np_y):
with self.static_graph():
data_x = paddle.static.data(
name='x', shape=[1, 3], dtype="float32"
)
data_y = paddle.static.data(
name='y', shape=[1, 3], dtype="float32"
)
btp = paddle.nn.Bilinear(
3,
3,
6,
bias_attr=paddle.nn.initializer.Constant(value=1),
)
out = btp(data_x, data_y)
out = paddle.nn.functional.sigmoid(out)
static_rlt2 = self.get_static_graph_result(
feed={'x': inp_np_x, 'y': inp_np_y}, fetch_list=[out]
)[0]

with self.dynamic_graph():
btp2 = paddle.nn.Bilinear(3, 3, 6)
dy_rlt2 = btp2(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
dy_rlt2 = paddle.nn.functional.sigmoid(dy_rlt2)
dy_rlt2_value = dy_rlt2.numpy()
return static_rlt2

with self.static_graph():
data_x2 = paddle.static.data(
name='x', shape=[1, 3], dtype="float32"
)
data_y2 = paddle.static.data(
name='y', shape=[1, 3], dtype="float32"
)
out2 = paddle.static.nn.common.bilinear_tensor_product(
data_x2, data_y2, 6, act='sigmoid'
)
def _test_dygraph_1(inp_np_x, inp_np_y):
with self.dynamic_graph():
btp = paddle.nn.Bilinear(
3,
3,
6,
bias_attr=paddle.nn.initializer.Constant(value=1),
)
dy_rlt = btp(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
dy_rlt = paddle.nn.functional.sigmoid(dy_rlt)
dy_rlt_value = dy_rlt.numpy()

static_rlt3 = self.get_static_graph_result(
feed={'x': inp_np_x, 'y': inp_np_y}, fetch_list=[out2]
)[0]
with self.dynamic_graph():
btp2 = paddle.nn.Bilinear(3, 3, 6)
dy_rlt2 = btp2(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
dy_rlt2 = paddle.nn.functional.sigmoid(dy_rlt2)
dy_rlt2_value = dy_rlt2.numpy()

with self.static_graph():
data_x2 = paddle.static.data(
name='x', shape=[1, 3], dtype="float32"
)
data_y2 = paddle.static.data(
name='y', shape=[1, 3], dtype="float32"
)
out2 = paddle.static.nn.common.bilinear_tensor_product(
data_x2, data_y2, 6, act='sigmoid'
)

static_rlt3 = self.get_static_graph_result(
feed={'x': inp_np_x, 'y': inp_np_y}, fetch_list=[out2]
)[0]

return dy_rlt_value, dy_rlt2_value, static_rlt3

def _test_dygraph_2(inp_np_x, inp_np_y):
with self.dynamic_graph():
custom_weight = np.random.randn(6, 3, 3).astype("float32")
weight_attr = base.ParamAttr(
initializer=paddle.nn.initializer.Assign(custom_weight)
)
btp1 = paddle.nn.Bilinear(3, 3, 6)
btp2 = paddle.nn.Bilinear(3, 3, 6, weight_attr=weight_attr)
dy_rlt1 = btp1(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
dy_rlt1 = paddle.nn.functional.sigmoid(dy_rlt1)
dy_rlt2 = btp2(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
dy_rlt2 = paddle.nn.functional.sigmoid(dy_rlt2)
self.assertFalse(
np.array_equal(dy_rlt1.numpy(), dy_rlt2.numpy())
)
btp2.weight.set_value(btp1.weight.numpy())
btp2.bias.set_value(btp1.bias)
dy_rlt1 = btp1(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
dy_rlt2 = btp2(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
np.testing.assert_array_equal(dy_rlt1.numpy(), dy_rlt2.numpy())

btp2.weight = btp1.weight
btp2.bias = btp1.bias
np.testing.assert_array_equal(
btp1.weight.numpy(), btp2.weight.numpy()
)
np.testing.assert_array_equal(
btp1.bias.numpy(), btp2.bias.numpy()
)

inp_np_x = np.array([[1, 2, 3]]).astype('float32')
inp_np_y = np.array([[4, 5, 6]]).astype('float32')

static_rlt = _test_static_specific(inp_np_x, inp_np_y)
static_rlt2 = _test_static(inp_np_x, inp_np_y)
dy_rlt_value, dy_rlt2_value, static_rlt3 = _test_dygraph_1(
inp_np_x, inp_np_y
)
np.testing.assert_array_equal(dy_rlt2_value, static_rlt3)
np.testing.assert_array_equal(static_rlt2, static_rlt)
np.testing.assert_array_equal(dy_rlt_value, static_rlt)

with self.dynamic_graph():
custom_weight = np.random.randn(6, 3, 3).astype("float32")
weight_attr = base.ParamAttr(
initializer=paddle.nn.initializer.Assign(custom_weight)
)
btp1 = paddle.nn.Bilinear(3, 3, 6)
btp2 = paddle.nn.Bilinear(3, 3, 6, weight_attr=weight_attr)
dy_rlt1 = btp1(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
dy_rlt1 = paddle.nn.functional.sigmoid(dy_rlt1)
dy_rlt2 = btp2(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
dy_rlt2 = paddle.nn.functional.sigmoid(dy_rlt2)
self.assertFalse(np.array_equal(dy_rlt1.numpy(), dy_rlt2.numpy()))
btp2.weight.set_value(btp1.weight.numpy())
btp2.bias.set_value(btp1.bias)
dy_rlt1 = btp1(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
dy_rlt2 = btp2(
to_variable(inp_np_x),
to_variable(inp_np_y),
)
np.testing.assert_array_equal(dy_rlt1.numpy(), dy_rlt2.numpy())

btp2.weight = btp1.weight
btp2.bias = btp1.bias
np.testing.assert_array_equal(
btp1.weight.numpy(), btp2.weight.numpy()
)
np.testing.assert_array_equal(btp1.bias.numpy(), btp2.bias.numpy())
with paddle.pir_utils.IrGuard():
static_pir_result = _test_static(inp_np_x, inp_np_y)
np.testing.assert_array_equal(static_pir_result, static_rlt)

def test_embeding(self):
inp_word = np.array([[[1]]]).astype('int64')
Expand Down

0 comments on commit 88877ba

Please sign in to comment.