diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java new file mode 100644 index 000000000000..6cc195dc2ffd --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderRegistrar.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import java.util.Map; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; + +/** Coder registrar for AvroCoder. */ +@AutoService(CoderTranslatorRegistrar.class) +public class AvroCoderRegistrar implements CoderTranslatorRegistrar { + public static final String AVRO_CODER_URN = "beam:coder:avro:v1"; + + @Override + public Map, String> getCoderURNs() { + return ImmutableMap.of(AvroCoder.class, AVRO_CODER_URN); + } + + @Override + public Map, CoderTranslator> getCoderTranslators() { + return ImmutableMap.of(AvroCoder.class, new AvroCoderTranslator()); + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java new file mode 100644 index 000000000000..0a59c35e45f0 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/AvroCoderTranslator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import java.util.Collections; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets; + +/** Coder translator for AvroCoder. */ +public class AvroCoderTranslator implements CoderTranslator> { + @Override + public List> getComponents(AvroCoder from) { + return Collections.emptyList(); + } + + @Override + public byte[] getPayload(AvroCoder from) { + return from.getSchema().toString().getBytes(Charsets.UTF_8); + } + + @Override + public AvroCoder fromComponents(List> components, byte[] payload) { + Schema schema = new Schema.Parser().parse(new String(payload, Charsets.UTF_8)); + return AvroCoder.of(schema); + } +} diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java index a19555bdbaaa..b1bc3af9c41b 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java @@ -28,6 +28,7 @@ import java.io.Serializable; import java.util.HashSet; import java.util.Set; +import org.apache.avro.SchemaBuilder; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.sdk.coders.AtomicCoder; @@ -126,7 +127,9 @@ public static Iterable> data() { StringUtf8Coder.of(), SerializableCoder.of(Record.class), new RecordCoder(), - KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class))) + KvCoder.of( + new RecordCoder(), + AvroCoder.of(SchemaBuilder.record("record").fields().endRecord()))) .build(); } diff --git a/sdks/python/apache_beam/coders/avro_coder.py b/sdks/python/apache_beam/coders/avro_coder.py new file mode 100644 index 000000000000..f0bb24f4ceb2 --- /dev/null +++ b/sdks/python/apache_beam/coders/avro_coder.py @@ -0,0 +1,99 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Coder for AvroRecord serialization/deserialization.""" + +from __future__ import absolute_import + +import json +from io import BytesIO + +from fastavro import parse_schema +from fastavro import schemaless_reader +from fastavro import schemaless_writer + +from apache_beam.coders.coder_impl import SimpleCoderImpl +from apache_beam.coders.coders import Coder +from apache_beam.coders.coders import FastCoder + +AVRO_CODER_URN = "beam:coder:avro:v1" + +__all__ = ['AvroCoder', 'AvroRecord'] + + +class AvroCoder(FastCoder): + """A coder used for AvroRecord values.""" + + def __init__(self, schema): + self.schema = schema + + def _create_impl(self): + return AvroCoderImpl(self.schema) + + def is_deterministic(self): + # TODO: need to confirm if it's deterministic + return False + + def __eq__(self, other): + return (type(self) == type(other) + and self.schema == other.schema) + + def __hash__(self): + return hash(self.schema) + + def to_type_hint(self): + return AvroRecord + + def to_runner_api_parameter(self, context): + return AVRO_CODER_URN, self.schema, () + + @Coder.register_urn(AVRO_CODER_URN, bytes) + def from_runner_api_parameter(payload, unused_components, unused_context): + return AvroCoder(payload) + + +class AvroCoderImpl(SimpleCoderImpl): + """For internal use only; no backwards-compatibility guarantees.""" + + def __init__(self, schema): + self.parsed_schema = parse_schema(json.loads(schema)) + + def encode(self, value): + assert issubclass(type(value), AvroRecord) + with BytesIO() as buf: + schemaless_writer(buf, self.parsed_schema, value.record) + return buf.getvalue() + + def decode(self, encoded): + with BytesIO(encoded) as buf: + return AvroRecord(schemaless_reader(buf, self.parsed_schema)) + + +class AvroRecord(object): + """Simple wrapper class for dictionary records.""" + + def __init__(self, value): + self.record = value + + def __eq__(self, other): + return ( + issubclass(type(other), AvroRecord) and + self.record == other.record + ) + + def __hash__(self): + return hash(self.record) diff --git a/sdks/python/apache_beam/coders/avro_coder_test.py b/sdks/python/apache_beam/coders/avro_coder_test.py new file mode 100644 index 000000000000..3f060a66b585 --- /dev/null +++ b/sdks/python/apache_beam/coders/avro_coder_test.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import + +import logging +import unittest + +from apache_beam.coders.avro_coder import AvroCoder +from apache_beam.coders.avro_coder import AvroRecord +from apache_beam.coders.typecoders import registry as coders_registry + + +class AvroTestCoder(AvroCoder): + SCHEMA = """ + { + "type": "record", "name": "testrecord", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"} + ] + } + """ + + def __init__(self): + super(AvroTestCoder, self).__init__(self.SCHEMA) + + +class AvroTestRecord(AvroRecord): + pass + + +coders_registry.register_coder(AvroTestRecord, AvroTestCoder) + + +class CodersTest(unittest.TestCase): + + def test_avro_record_coder(self): + real_coder = coders_registry.get_coder(AvroTestRecord) + expected_coder = AvroTestCoder() + self.assertEqual( + real_coder.encode( + AvroTestRecord({"name": "Daenerys targaryen", "age": 23})), + expected_coder.encode( + AvroTestRecord({"name": "Daenerys targaryen", "age": 23})) + ) + self.assertEqual( + AvroTestRecord({"name": "Jon Snow", "age": 23}), + real_coder.decode( + real_coder.encode( + AvroTestRecord({"name": "Jon Snow", "age": 23})) + ) + ) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main()