From 506fda1dcfa33f4ed71f72c86668ff6cc876c2ab Mon Sep 17 00:00:00 2001 From: Ajay Bhat Date: Wed, 6 Jan 2016 16:12:03 +0530 Subject: [PATCH] [FLINK-2445] Add tests for HadoopOutputFormats --- .../mapreduce/HadoopOutputFormatTest.java | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java new file mode 100644 index 0000000000000..215ca95cb4118 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.hadoop.mapreduce; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.*; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class HadoopOutputFormatTest { + + private static final String PATH = "an/ignored/file/"; + private Map map; + + @Test + public void testWriteRecord() { + OutputFormat dummyOutputFormat = new DummyOutputFormat(); + String key = "Test"; + Long value = 1L; + map = new HashMap<>(); + map.put(key, 0L); + try { + Job job = Job.getInstance(); + Tuple2 tuple = new Tuple2<>(); + tuple.setFields(key, value); + HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(dummyOutputFormat, job); + + hadoopOutputFormat.recordWriter = new DummyRecordWriter(); + hadoopOutputFormat.writeRecord(tuple); + + Long expected = map.get(key); + assertEquals(expected, value); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testOpen() { + OutputFormat dummyOutputFormat = new DummyOutputFormat(); + try { + Job job = Job.getInstance(); + HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(dummyOutputFormat, job); + + hadoopOutputFormat.recordWriter = new DummyRecordWriter(); + hadoopOutputFormat.open(1, 4); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testClose() { + OutputFormat dummyOutputFormat = new DummyOutputFormat(); + try { + Job job = Job.getInstance(); + HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat<>(dummyOutputFormat, job); + + hadoopOutputFormat.recordWriter = new DummyRecordWriter(); + + final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class); + Mockito.when(outputCommitter.needsTaskCommit(Mockito.any(TaskAttemptContext.class))).thenReturn(true); + Mockito.doNothing().when(outputCommitter).commitTask(Mockito.any(TaskAttemptContext.class)); + hadoopOutputFormat.outputCommitter = outputCommitter; + hadoopOutputFormat.configuration = new Configuration(); + hadoopOutputFormat.configuration.set("mapred.output.dir", PATH); + + hadoopOutputFormat.close(); + } catch (IOException e) { + fail(); + } + } + + + class DummyRecordWriter extends RecordWriter { + @Override + public void write(String key, Long value) throws IOException, InterruptedException { + map.put(key, value); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + + } + } + + class DummyOutputFormat extends OutputFormat { + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + return null; + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class); + Mockito.doNothing().when(outputCommitter).setupJob(Mockito.any(JobContext.class)); + + return outputCommitter; + } + } +}