Skip to content

Commit

Permalink
[FLINK-14018][python] Add Python building blocks to make sure the bas…
Browse files Browse the repository at this point in the history
…ic functionality of Python ScalarFunction could work
  • Loading branch information
dianfu committed Sep 25, 2019
1 parent abaf8eb commit 8fffbbc
Show file tree
Hide file tree
Showing 20 changed files with 1,103 additions and 10 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -35,6 +35,8 @@ flink-python/dev/download
flink-python/dev/.conda/
flink-python/dev/log/
flink-python/dev/.stage.txt
flink-python/pyflink/.eggs/
flink-python/pyflink/fn_execution/*_pb2.py
atlassian-ide-plugin.xml
out/
/docs/api
Expand Down
19 changes: 16 additions & 3 deletions flink-dist/src/main/flink-bin/bin/pyflink-udf-runner.sh
Expand Up @@ -32,11 +32,24 @@ if [[ "$python" = "" ]]; then
python="python"
fi

# Add pyflink & py4j to PYTHONPATH
PYFLINK_ZIP="$FLINK_OPT_DIR/python/pyflink.zip"
if [[ ! ${PYTHONPATH} =~ ${PYFLINK_ZIP} ]]; then
CURRENT_DIR=`pwd -P`
FLINK_SOURCE_ROOT_DIR=`cd $bin/../../; pwd -P`
cd $CURRENT_DIR

# Add pyflink to PYTHONPATH
FLINK_PYTHON="${FLINK_SOURCE_ROOT_DIR}/flink-python"
if [[ ! -f "${FLINK_PYTHON}/pyflink/fn_execution/boot.py" ]]; then
# Add pyflink.zip to PYTHONPATH if directory pyflink does not exist
PYFLINK_ZIP="$FLINK_OPT_DIR/python/pyflink.zip"
if [[ ! ${PYTHONPATH} =~ ${PYFLINK_ZIP} ]]; then
export PYTHONPATH="$PYFLINK_ZIP:$PYTHONPATH"
fi
else
# Add flink-python/pyflink to PYTHONPATH if directory pyflink exists
export PYTHONPATH="$FLINK_PYTHON:$PYTHONPATH"
fi

# Add py4j to PYTHONPATH
PY4J_ZIP=`echo "$FLINK_OPT_DIR"/python/py4j-*-src.zip`
if [[ ! ${PYTHONPATH} =~ ${PY4J_ZIP} ]]; then
export PYTHONPATH="$PY4J_ZIP:$PYTHONPATH"
Expand Down
1 change: 1 addition & 0 deletions flink-python/MANIFEST.in
Expand Up @@ -29,3 +29,4 @@ include README.md
include pyflink/LICENSE
include pyflink/NOTICE
include pyflink/README.txt
graft pyflink/proto
8 changes: 8 additions & 0 deletions flink-python/pom.xml
Expand Up @@ -281,6 +281,14 @@ under the License.
<exclude>org/apache/beam/repackaged/core/org/apache/commons/lang3/**</exclude>
</excludes>
</filter>
<!-- org.apache.beam:beam-vendor-grpc-1_21_0 brings its own LICENSE.txt and LICENSE-junit.txt which we don't need -->
<filter>
<artifact>org.apache.beam:beam-vendor-grpc-1_21_0</artifact>
<excludes>
<exclude>LICENSE.txt</exclude>
<exclude>LICENSE-junit.txt</exclude>
</excludes>
</filter>
</filters>
<relocations combine.children="append">
<relocation>
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/fn_execution/boot.py
Expand Up @@ -145,5 +145,5 @@ def check_not_empty(check_str, error_message):
if "FLINK_BOOT_TESTING" in os.environ and os.environ["FLINK_BOOT_TESTING"] == "1":
exit(0)

call([sys.executable, "-m", "apache_beam.runners.worker.sdk_worker_main"],
call([sys.executable, "-m", "pyflink.fn_execution.sdk_worker_main"],
stdout=sys.stdout, stderr=sys.stderr, env=env)
88 changes: 88 additions & 0 deletions flink-python/pyflink/fn_execution/coder_impl.py
@@ -0,0 +1,88 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from apache_beam.coders.coder_impl import StreamCoderImpl


class RowCoderImpl(StreamCoderImpl):

def __init__(self, field_coders):
self._field_coders = field_coders

def encode_to_stream(self, value, out_stream, nested):
self.write_null_mask(value, out_stream)
for i in range(len(self._field_coders)):
self._field_coders[i].encode_to_stream(value[i], out_stream, nested)

def decode_from_stream(self, in_stream, nested):
from pyflink.table import Row
null_mask = self.read_null_mask(len(self._field_coders), in_stream)
assert len(null_mask) == len(self._field_coders)
return Row(*[None if null_mask[idx] else self._field_coders[idx].decode_from_stream(
in_stream, nested) for idx in range(0, len(null_mask))])

@staticmethod
def write_null_mask(value, out_stream):
field_pos = 0
field_count = len(value)
while field_pos < field_count:
b = 0x00
# set bits in byte
num_pos = min(8, field_count - field_pos)
byte_pos = 0
while byte_pos < num_pos:
b = b << 1
# set bit if field is null
if value[field_pos + byte_pos] is None:
b |= 0x01
byte_pos += 1
field_pos += num_pos
# shift bits if last byte is not completely filled
b <<= (8 - byte_pos)
# write byte
out_stream.write_byte(b)

@staticmethod
def read_null_mask(field_count, in_stream):
null_mask = []
field_pos = 0
while field_pos < field_count:
b = in_stream.read_byte()
num_pos = min(8, field_count - field_pos)
byte_pos = 0
while byte_pos < num_pos:
null_mask.append((b & 0x80) > 0)
b = b << 1
byte_pos += 1
field_pos += num_pos
return null_mask

def __repr__(self):
return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in self._field_coders)


class BigIntCoderImpl(StreamCoderImpl):

def encode_to_stream(self, value, out_stream, nested):
out_stream.write_var_int64(value)

def decode_from_stream(self, in_stream, nested):
v = in_stream.read_var_int64()
return v

def __repr__(self):
return 'BigIntCoderImpl'
85 changes: 85 additions & 0 deletions flink-python/pyflink/fn_execution/coders.py
@@ -0,0 +1,85 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from apache_beam.coders import Coder
from apache_beam.coders.coders import FastCoder

from pyflink.fn_execution import coder_impl
from pyflink.fn_execution import flink_fn_execution_pb2

FLINK_SCHEMA_CODER_URN = "flink:coder:schema:v1"


class RowCoder(FastCoder):
"""
Coder for Row.
"""

def __init__(self, field_coders):
self._field_coders = field_coders

def _create_impl(self):
return coder_impl.RowCoderImpl([c.get_impl() for c in self._field_coders])

def is_deterministic(self):
return all(c.is_deterministic() for c in self._field_coders)

def to_type_hint(self):
from pyflink.table import Row
return Row

def __repr__(self):
return 'RowCoder[%s]' % ', '.join(str(c) for c in self._field_coders)


class BigIntCoder(FastCoder):
"""
Coder for BigInt.
"""

def _create_impl(self):
return coder_impl.BigIntCoderImpl()

def is_deterministic(self):
return True

def to_type_hint(self):
return int

def __repr__(self):
return 'BigIntCoder'


@Coder.register_urn(FLINK_SCHEMA_CODER_URN, flink_fn_execution_pb2.Schema)
def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
return RowCoder([from_proto(f.type) for f in schema_proto.fields])


def from_proto(field_type):
"""
Creates the corresponding :class:`Coder` given the protocol representation of the field type.
:param field_type: the protocol representation of the field type
:return: :class:`Coder`
"""
if field_type.type_name == flink_fn_execution_pb2.Schema.TypeName.BIGINT:
return BigIntCoder()
elif field_type.type_name == flink_fn_execution_pb2.Schema.TypeName.ROW:
return RowCoder([from_proto(f.type) for f in field_type.row_schema.fields])
else:
raise ValueError("field_type %s is not supported." % field_type)

0 comments on commit 8fffbbc

Please sign in to comment.