Skip to content

Commit

Permalink
[FLINK-14018][python] Add Python building blocks to make sure the bas…
Browse files Browse the repository at this point in the history
…ic functionality of Python ScalarFunction could work
  • Loading branch information
dianfu committed Sep 26, 2019
1 parent 12b2e99 commit 8d3ff9a
Show file tree
Hide file tree
Showing 24 changed files with 1,080 additions and 38 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -35,6 +35,8 @@ flink-python/dev/download
flink-python/dev/.conda/
flink-python/dev/log/
flink-python/dev/.stage.txt
flink-python/.eggs/
flink-python/pyflink/fn_execution/*_pb2.py
atlassian-ide-plugin.xml
out/
/docs/api
Expand Down
1 change: 1 addition & 0 deletions flink-python/MANIFEST.in
Expand Up @@ -29,3 +29,4 @@ include README.md
include pyflink/LICENSE
include pyflink/NOTICE
include pyflink/README.txt
graft pyflink/proto
20 changes: 20 additions & 0 deletions flink-python/pom.xml
Expand Up @@ -327,6 +327,26 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<artifactId>exec-maven-plugin</artifactId>
<groupId>org.codehaus.mojo</groupId>
<version>1.5.0</version>
<executions>
<execution>
<id>Protos Generation</id>
<phase>generate-sources</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>python</executable>
<arguments>
<argument>${basedir}/pyflink/gen_protos.py</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2 changes: 1 addition & 1 deletion flink-python/pyflink/fn_execution/boot.py
Expand Up @@ -145,5 +145,5 @@ def check_not_empty(check_str, error_message):
if "FLINK_BOOT_TESTING" in os.environ and os.environ["FLINK_BOOT_TESTING"] == "1":
exit(0)

call([sys.executable, "-m", "apache_beam.runners.worker.sdk_worker_main"],
call([sys.executable, "-m", "pyflink.fn_execution.sdk_worker_main"],
stdout=sys.stdout, stderr=sys.stderr, env=env)
75 changes: 75 additions & 0 deletions flink-python/pyflink/fn_execution/coder_impl.py
@@ -0,0 +1,75 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from apache_beam.coders.coder_impl import StreamCoderImpl


class RowCoderImpl(StreamCoderImpl):

def __init__(self, field_coders):
self._field_coders = field_coders

def encode_to_stream(self, value, out_stream, nested):
self.write_null_mask(value, out_stream)
for i in range(len(self._field_coders)):
self._field_coders[i].encode_to_stream(value[i], out_stream, nested)

def decode_from_stream(self, in_stream, nested):
from pyflink.table import Row
null_mask = self.read_null_mask(len(self._field_coders), in_stream)
assert len(null_mask) == len(self._field_coders)
return Row(*[None if null_mask[idx] else self._field_coders[idx].decode_from_stream(
in_stream, nested) for idx in range(0, len(null_mask))])

@staticmethod
def write_null_mask(value, out_stream):
field_pos = 0
field_count = len(value)
while field_pos < field_count:
b = 0x00
# set bits in byte
num_pos = min(8, field_count - field_pos)
byte_pos = 0
while byte_pos < num_pos:
b = b << 1
# set bit if field is null
if value[field_pos + byte_pos] is None:
b |= 0x01
byte_pos += 1
field_pos += num_pos
# shift bits if last byte is not completely filled
b <<= (8 - byte_pos)
# write byte
out_stream.write_byte(b)

@staticmethod
def read_null_mask(field_count, in_stream):
null_mask = []
field_pos = 0
while field_pos < field_count:
b = in_stream.read_byte()
num_pos = min(8, field_count - field_pos)
byte_pos = 0
while byte_pos < num_pos:
null_mask.append((b & 0x80) > 0)
b = b << 1
byte_pos += 1
field_pos += num_pos
return null_mask

def __repr__(self):
return 'RowCoderImpl[%s]' % ', '.join(str(c) for c in self._field_coders)
67 changes: 67 additions & 0 deletions flink-python/pyflink/fn_execution/coders.py
@@ -0,0 +1,67 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from apache_beam.coders import Coder, VarIntCoder
from apache_beam.coders.coders import FastCoder

from pyflink.fn_execution import coder_impl
from pyflink.fn_execution import flink_fn_execution_pb2

FLINK_SCHEMA_CODER_URN = "flink:coder:schema:v1"


class RowCoder(FastCoder):
"""
Coder for Row.
"""

def __init__(self, field_coders):
self._field_coders = field_coders

def _create_impl(self):
return coder_impl.RowCoderImpl([c.get_impl() for c in self._field_coders])

def is_deterministic(self):
return all(c.is_deterministic() for c in self._field_coders)

def to_type_hint(self):
from pyflink.table import Row
return Row

def __repr__(self):
return 'RowCoder[%s]' % ', '.join(str(c) for c in self._field_coders)


@Coder.register_urn(FLINK_SCHEMA_CODER_URN, flink_fn_execution_pb2.Schema)
def _pickle_from_runner_api_parameter(schema_proto, unused_components, unused_context):
return RowCoder([from_proto(f.type) for f in schema_proto.fields])


def from_proto(field_type):
"""
Creates the corresponding :class:`Coder` given the protocol representation of the field type.
:param field_type: the protocol representation of the field type
:return: :class:`Coder`
"""
if field_type.type_name == flink_fn_execution_pb2.Schema.TypeName.BIGINT:
return VarIntCoder()
elif field_type.type_name == flink_fn_execution_pb2.Schema.TypeName.ROW:
return RowCoder([from_proto(f.type) for f in field_type.row_schema.fields])
else:
raise ValueError("field_type %s is not supported." % field_type)

0 comments on commit 8d3ff9a

Please sign in to comment.