From 5eb24223afadd1ebb169426a188796068631793d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sat, 25 Feb 2017 05:20:58 +0100 Subject: [PATCH] [BEAM-111] Move WritableCoder to hadoop-common --- runners/spark/pom.xml | 6 + .../spark/coders/NullWritableCoder.java | 76 ----------- .../runners/spark/coders/WritableCoder.java | 122 ------------------ .../spark/coders/WritableCoderTest.java | 45 ------- .../hadoop/HadoopFileFormatPipelineTest.java | 2 +- sdks/java/io/hadoop-common/pom.xml | 10 ++ .../beam/sdk/io/hadoop}/WritableCoder.java | 2 +- .../sdk/io/hadoop}/WritableCoderTest.java | 2 +- sdks/java/io/hdfs/pom.xml | 5 - .../beam/sdk/io/hdfs/HDFSFileSource.java | 1 + 10 files changed, 20 insertions(+), 251 deletions(-) delete mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java delete mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java delete mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java rename sdks/java/io/{hdfs/src/main/java/org/apache/beam/sdk/io/hdfs => hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop}/WritableCoder.java (98%) rename sdks/java/io/{hdfs/src/test/java/org/apache/beam/sdk/io/hdfs => hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop}/WritableCoderTest.java (97%) diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 409fc277ffd6..8c35178c164a 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -305,6 +305,12 @@ + + org.apache.beam + beam-sdks-java-io-hadoop-common + test + + org.mockito mockito-all diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java deleted file mode 100644 index ebbab1a235b9..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/NullWritableCoder.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.coders; - -import com.fasterxml.jackson.annotation.JsonCreator; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.beam.sdk.coders.Coder; -import org.apache.hadoop.io.NullWritable; - -/** - * Simple writable coder for Null. - */ -public final class NullWritableCoder extends WritableCoder { - private static final long serialVersionUID = 1L; - - @JsonCreator - public static NullWritableCoder of() { - return INSTANCE; - } - - private static final NullWritableCoder INSTANCE = new NullWritableCoder(); - - private NullWritableCoder() { - super(NullWritable.class); - } - - @Override - public void encode(NullWritable value, OutputStream outStream, Context context) { - // nothing to write - } - - @Override - public NullWritable decode(InputStream inStream, Context context) { - return NullWritable.get(); - } - - @Override - public boolean consistentWithEquals() { - return true; - } - - /** - * Returns true since registerByteSizeObserver() runs in constant time. - */ - @Override - public boolean isRegisterByteSizeObserverCheap(NullWritable value, Context context) { - return true; - } - - @Override - protected long getEncodedElementByteSize(NullWritable value, Context context) { - return 0; - } - - @Override - public void verifyDeterministic() throws Coder.NonDeterministicException { - // NullWritableCoder is deterministic - } -} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java deleted file mode 100644 index 40c26275934a..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/WritableCoder.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.coders; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.InvocationTargetException; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.util.CloudObject; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; - -/** - * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}. - * - *

To use, specify the coder type on a PCollection: - *

- * {@code
- *   PCollection records =
- *       foo.apply(...).setCoder(WritableCoder.of(MyRecord.class));
- * }
- * 
- * - * @param the type of elements handled by this coder - */ -public class WritableCoder extends StandardCoder { - private static final long serialVersionUID = 0L; - - /** - * Returns a {@code WritableCoder} instance for the provided element class. - * @param the element type - * @param clazz the element class - * @return a {@code WritableCoder} instance for the provided element class - */ - public static WritableCoder of(Class clazz) { - if (clazz.equals(NullWritable.class)) { - @SuppressWarnings("unchecked") - WritableCoder result = (WritableCoder) NullWritableCoder.of(); - return result; - } - return new WritableCoder<>(clazz); - } - - @JsonCreator - @SuppressWarnings("unchecked") - public static WritableCoder of(@JsonProperty("type") String classType) - throws ClassNotFoundException { - Class clazz = Class.forName(classType); - if (!Writable.class.isAssignableFrom(clazz)) { - throw new ClassNotFoundException( - "Class " + classType + " does not implement Writable"); - } - return of((Class) clazz); - } - - private final Class type; - - public WritableCoder(Class type) { - this.type = type; - } - - @Override - public void encode(T value, OutputStream outStream, Context context) throws IOException { - value.write(new DataOutputStream(outStream)); - } - - @Override - public T decode(InputStream inStream, Context context) throws IOException { - try { - T t = type.getConstructor().newInstance(); - t.readFields(new DataInputStream(inStream)); - return t; - } catch (NoSuchMethodException | InstantiationException | IllegalAccessException e) { - throw new CoderException("unable to deserialize record", e); - } catch (InvocationTargetException ite) { - throw new CoderException("unable to deserialize record", ite.getCause()); - } - } - - @Override - public List> getCoderArguments() { - return null; - } - - @Override - protected CloudObject initializeCloudObject() { - CloudObject result = CloudObject.forClass(getClass()); - result.put("type", type.getName()); - return result; - } - - @Override - public void verifyDeterministic() throws Coder.NonDeterministicException { - throw new NonDeterministicException(this, - "Hadoop Writable may be non-deterministic."); - } - -} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java deleted file mode 100644 index 538fd97a8bb0..000000000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/WritableCoderTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.coders; - -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.NullWritable; -import org.junit.Test; - -/** - * Tests for WritableCoder. - */ -public class WritableCoderTest { - - @Test - public void testIntWritableEncoding() throws Exception { - IntWritable value = new IntWritable(42); - WritableCoder coder = WritableCoder.of(IntWritable.class); - - CoderProperties.coderDecodeEncodeEqual(coder, value); - } - - @Test - public void testNullWritableEncoding() throws Exception { - WritableCoder coder = WritableCoder.of(NullWritable.class); - - CoderProperties.coderDecodeEncodeEqual(coder, NullWritable.get()); - } -} diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index a5072d63d2b6..48b54339f8b1 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -23,9 +23,9 @@ import java.io.File; import java.io.IOException; import org.apache.beam.runners.spark.PipelineRule; -import org.apache.beam.runners.spark.coders.WritableCoder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.hadoop.WritableCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.hadoop.conf.Configuration; diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml index 13e159c6f9d0..fcd984fb4d57 100644 --- a/sdks/java/io/hadoop-common/pom.xml +++ b/sdks/java/io/hadoop-common/pom.xml @@ -31,6 +31,16 @@ Library to add shared Hadoop classes among Beam IOs. + + org.apache.beam + beam-sdks-java-core + + + + com.fasterxml.jackson.core + jackson-annotations + + org.apache.hadoop hadoop-client diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java similarity index 98% rename from sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java rename to sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java index d958cda88923..0ba367dd77da 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/WritableCoder.java +++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.hdfs; +package org.apache.beam.sdk.io.hadoop; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java similarity index 97% rename from sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java rename to sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java index e78f850c6635..8127773e03e6 100644 --- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/WritableCoderTest.java +++ b/sdks/java/io/hadoop-common/src/test/java/org/apache/beam/sdk/io/hadoop/WritableCoderTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.hdfs; +package org.apache.beam.sdk.io.hadoop; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.hadoop.io.IntWritable; diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml index 1212b0ef2449..f3a1a27bb9b9 100644 --- a/sdks/java/io/hdfs/pom.xml +++ b/sdks/java/io/hdfs/pom.xml @@ -93,11 +93,6 @@ beam-sdks-java-io-hadoop-common - - com.fasterxml.jackson.core - jackson-annotations - - com.google.auto.service auto-service diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java index 2a731fb12004..0e3146fc2961 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; +import org.apache.beam.sdk.io.hadoop.WritableCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.CoderUtils;