diff --git a/contrib/sorter/README.md b/contrib/sorter/README.md new file mode 100644 index 0000000000..29f45b8900 --- /dev/null +++ b/contrib/sorter/README.md @@ -0,0 +1,23 @@ +#Sorter +This module provides the SortValues transform, which takes a `PCollection>>>` and produces a `PCollection>>>` where, for each primary key `K` the paired `Iterable>` has been sorted by the byte encoding of secondary key (`K2`). It will efficiently and scalably sort the iterables, even if they are large (do not fit in memory). + +##Caveats +* This transform performs value-only sorting; the iterable accompanying each key is sorted, but *there is no relationship between different keys*, as Dataflow does not support any defined relationship between different elements in a PCollection. +* Each `Iterable>` is sorted on a single worker using local memory and disk. This means that `SortValues` may be a performance and/or scalability bottleneck when used in different pipelines. For example, users are discouraged from using `SortValues` on a `PCollection` of a single element to globally sort a large `PCollection`. + +##Options +* The user can customize the temporary location used if sorting requires spilling to disk and the maximum amount of memory to use by creating a custom instance of `BufferedExternalSorter.Options` to pass into `SortValues.create`. + +##Using `SortValues` +~~~~ +PCollection>> input = ... + +// Group by primary key, bringing pairs for the same key together. +PCollection>>> grouped = + input.apply(GroupByKey.>create()); + +// For every primary key, sort the iterable of pairs by secondary key. +PCollection>>> groupedAndSorted = + grouped.apply( + SortValues.create(new BufferedExternalSorter.Options())); +~~~~ diff --git a/contrib/sorter/pom.xml b/contrib/sorter/pom.xml new file mode 100644 index 0000000000..08ac2d9aa0 --- /dev/null +++ b/contrib/sorter/pom.xml @@ -0,0 +1,237 @@ + + + + 4.0.0 + + com.google.cloud.dataflow + google-cloud-dataflow-java-contrib-sorter + Google Cloud Dataflow Sorter Library + Library to sort data from within Dataflow pipelines. + 0.0.1-SNAPSHOT + jar + + + + Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + UTF-8 + [1.2.0,2.0.0) + 2.7.1 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.2 + + 1.7 + 1.7 + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.17 + + + com.puppycrawl.tools + checkstyle + 6.19 + + + + ../../checkstyle.xml + true + true + true + + + + + check + + + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.4 + + + attach-sources + compile + + jar + + + + attach-test-sources + test-compile + + test-jar + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.3 + + Google Cloud Dataflow Sorter Contrib + Google Cloud Dataflow Sorter Contrib + + com.google.cloud.dataflow.contrib.sorter + false + ]]> + + + + https://cloud.google.com/dataflow/java-sdk/JavaDoc/ + ${basedir}/../../javadoc/dataflow-sdk-docs + + + http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/ + ${basedir}/../../javadoc/guava-docs + + + + + + + jar + + package + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + bundle-and-repackage + package + + shade + + + true + + + org.apache.hadoop:hadoop-mapreduce-client-core + org.apache.hadoop:hadoop-common + com.google.guava:guava + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.apache.hadoop + com.google.cloud.dataflow.repackaged.org.apache.hadoop + + + com.google.common + com.google.cloud.dataflow.repackaged.com.google.common + + + com.google.thirdparty + com.google.cloud.dataflow.repackaged.com.google.thirdparty + + + + + + + + + + + + + com.google.cloud.dataflow + google-cloud-dataflow-java-sdk-all + ${google-cloud-dataflow-version} + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + + com.google.guava + guava + 19.0 + + + + + org.hamcrest + hamcrest-all + 1.3 + test + + + + org.mockito + mockito-all + 1.10.19 + test + + + + junit + junit + 4.11 + test + + + diff --git a/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/BufferedExternalSorter.java b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/BufferedExternalSorter.java new file mode 100644 index 0000000000..1108eef2ad --- /dev/null +++ b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/BufferedExternalSorter.java @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.sorter; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.dataflow.sdk.values.KV; +import java.io.IOException; +import java.io.Serializable; + +/** + * {@link Sorter} that will use in memory sorting until the values can't fit into memory and will + * then fall back to external sorting. + */ +public class BufferedExternalSorter implements Sorter { + /** Contains configuration for the sorter. */ + public static class Options implements Serializable { + private String tempLocation = "/tmp"; + private int memoryMB = 100; + + /** Sets the path to a temporary location where the sorter writes intermediate files. */ + public void setTempLocation(String tempLocation) { + checkArgument( + !tempLocation.startsWith("gs://"), + "BufferedExternalSorter does not support GCS temporary location"); + + this.tempLocation = tempLocation; + } + + /** Returns the configured temporary location. */ + public String getTempLocation() { + return tempLocation; + } + + /** + * Sets the size of the memory buffer in megabytes. This controls both the buffer for initial in + * memory sorting and the buffer used when external sorting. Must be greater than zero. + */ + public void setMemoryMB(int memoryMB) { + checkArgument(memoryMB > 0, "memoryMB must be greater than zero"); + this.memoryMB = memoryMB; + } + + /** Returns the configured size of the memory buffer. */ + public int getMemoryMB() { + return memoryMB; + } + } + + private ExternalSorter externalSorter; + private InMemorySorter inMemorySorter; + + boolean inMemorySorterFull; + + BufferedExternalSorter(ExternalSorter externalSorter, InMemorySorter inMemorySorter) { + this.externalSorter = externalSorter; + this.inMemorySorter = inMemorySorter; + } + + public static BufferedExternalSorter create(Options options) { + ExternalSorter.Options externalSorterOptions = new ExternalSorter.Options(); + externalSorterOptions.setMemoryMB(options.getMemoryMB()); + externalSorterOptions.setTempLocation(options.getTempLocation()); + + InMemorySorter.Options inMemorySorterOptions = new InMemorySorter.Options(); + inMemorySorterOptions.setMemoryMB(options.getMemoryMB()); + + return new BufferedExternalSorter( + ExternalSorter.create(externalSorterOptions), InMemorySorter.create(inMemorySorterOptions)); + } + + @Override + public void add(KV record) throws IOException { + if (!inMemorySorterFull) { + if (inMemorySorter.addIfRoom(record)) { + return; + } else { + // Flushing contents of in memory sorter to external sorter so we can rely on external + // from here on out + inMemorySorterFull = true; + transferToExternalSorter(); + } + } + + // In memory sorter is full, so put in external sorter instead + externalSorter.add(record); + } + + /** + * Transfers all of the records loaded so far into the in memory sorter over to the external + * sorter. + */ + private void transferToExternalSorter() throws IOException { + for (KV record : inMemorySorter.sort()) { + externalSorter.add(record); + } + // Allow in memory sorter and its contents to be garbage collected + inMemorySorter = null; + } + + @Override + public Iterable> sort() throws IOException { + if (!inMemorySorterFull) { + return inMemorySorter.sort(); + } else { + return externalSorter.sort(); + } + } +} diff --git a/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/ExternalSorter.java b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/ExternalSorter.java new file mode 100644 index 0000000000..8965b41926 --- /dev/null +++ b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/ExternalSorter.java @@ -0,0 +1,224 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.sorter; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.cloud.dataflow.sdk.values.KV; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.mapred.JobConf; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.UUID; + +/** Does an external sort of the provided values using Hadoop's {@link SequenceFile}. */ +class ExternalSorter implements Sorter { + private Options options; + + /** Whether {@link #sort()} was already called. */ + private boolean sortCalled = false; + + /** SequenceFile Writer for writing all input data to a file. */ + private Writer writer; + + /** Sorter used to sort the input file. */ + private SequenceFile.Sorter sorter; + + /** Temporary directory for input and intermediate files. */ + private Path tempDir; + + /** The list of input files to be sorted. */ + private Path[] paths; + + private boolean initialized = false; + + /** {@link Options} contains configuration of the sorter. */ + public static class Options implements Serializable { + private String tempLocation = "/tmp"; + private int memoryMB = 100; + + /** Sets the path to a temporary location where the sorter writes intermediate files. */ + public void setTempLocation(String tempLocation) { + if (tempLocation.startsWith("gs://")) { + throw new IllegalArgumentException("Sorter doesn't support GCS temporary location."); + } + + this.tempLocation = tempLocation; + } + + /** Returns the configured temporary location. */ + public String getTempLocation() { + return tempLocation; + } + + /** Sets the size of the memory buffer in megabytes. */ + public void setMemoryMB(int memoryMB) { + checkArgument(memoryMB > 0, "memoryMB must be greater than zero"); + this.memoryMB = memoryMB; + } + + /** Returns the configured size of the memory buffer. */ + public int getMemoryMB() { + return memoryMB; + } + } + + /** Returns a {@link Sorter} configured with the given {@link Options}. */ + public static ExternalSorter create(Options options) { + return new ExternalSorter(options); + } + + @Override + public void add(KV record) throws IOException { + checkState(!sortCalled, "Records can only be added before sort()"); + + initHadoopSorter(); + + BytesWritable key = new BytesWritable(record.getKey()); + BytesWritable value = new BytesWritable(record.getValue()); + + writer.append(key, value); + } + + @Override + public Iterable> sort() throws IOException { + checkState(!sortCalled, "sort() can only be called once."); + sortCalled = true; + + initHadoopSorter(); + + writer.close(); + + return new SortedRecordsIterable(); + } + + private ExternalSorter(Options options) { + this.options = options; + } + + /** + * Initializes the hadoop sorter. Does some local file system setup, and is somewhat expensive + * (~20 ms on local machine). Only executed when necessary. + */ + private void initHadoopSorter() throws IOException { + if (!initialized) { + tempDir = new Path(options.getTempLocation(), "tmp" + UUID.randomUUID().toString()); + paths = new Path[] {new Path(tempDir, "test.seq")}; + + JobConf conf = new JobConf(); + writer = + SequenceFile.createWriter( + conf, + Writer.valueClass(BytesWritable.class), + Writer.keyClass(BytesWritable.class), + Writer.file(paths[0]), + Writer.compression(CompressionType.NONE)); + + FileSystem fs = FileSystem.getLocal(conf); + sorter = + new SequenceFile.Sorter( + fs, new BytesWritable.Comparator(), BytesWritable.class, BytesWritable.class, conf); + sorter.setMemory(options.getMemoryMB() * 1024 * 1024); + + initialized = true; + } + } + + /** An {@link Iterable} producing the iterators over sorted data. */ + private class SortedRecordsIterable implements Iterable> { + @Override + public Iterator> iterator() { + return new SortedRecordsIterator(); + } + } + + /** An {@link Iterator} producing the sorted data. */ + private class SortedRecordsIterator implements Iterator> { + private RawKeyValueIterator iterator; + + /** Next {@link KV} to return from {@link #next()}. */ + private KV nextKV = null; + + SortedRecordsIterator() { + try { + this.iterator = sorter.sortAndIterate(paths, tempDir, false); + } catch (IOException e) { + throw new RuntimeException(e); + } + + nextKV = KV.of(null, null); // A dummy value that will be overwritten by next(). + next(); + } + + @Override + public boolean hasNext() { + return nextKV != null; + } + + @Override + public KV next() { + if (nextKV == null) { + throw new NoSuchElementException(); + } + + KV current = nextKV; + + try { + if (iterator.next()) { + // Parse key from DataOutputBuffer. + ByteArrayInputStream keyStream = new ByteArrayInputStream(iterator.getKey().getData()); + BytesWritable key = new BytesWritable(); + key.readFields(new DataInputStream(keyStream)); + + // Parse value from ValueBytes. + ByteArrayOutputStream valOutStream = new ByteArrayOutputStream(); + iterator.getValue().writeUncompressedBytes(new DataOutputStream(valOutStream)); + ByteArrayInputStream valInStream = new ByteArrayInputStream(valOutStream.toByteArray()); + BytesWritable value = new BytesWritable(); + value.readFields(new DataInputStream(valInStream)); + + nextKV = KV.of(key.copyBytes(), value.copyBytes()); + } else { + nextKV = null; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + return current; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Iterator does not support remove"); + } + } +} diff --git a/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/InMemorySorter.java b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/InMemorySorter.java new file mode 100644 index 0000000000..e62fca4057 --- /dev/null +++ b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/InMemorySorter.java @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.sorter; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.common.primitives.UnsignedBytes; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; + +/** + * Sorts {@code } pairs in memory. Based on the configured size of the memory buffer, + * will reject additional pairs. + */ +class InMemorySorter implements Sorter { + /** {@code Options} contains configuration of the sorter. */ + public static class Options implements Serializable { + private int memoryMB = 100; + + /** Sets the size of the memory buffer in megabytes. */ + public void setMemoryMB(int memoryMB) { + checkArgument(memoryMB > 0, "memoryMB must be greater than zero"); + this.memoryMB = memoryMB; + } + + /** Returns the configured size of the memory buffer. */ + public int getMemoryMB() { + return memoryMB; + } + } + + /** The comparator to use to sort the records by key. */ + private static final Comparator COMPARATOR = UnsignedBytes.lexicographicalComparator(); + + /** How many bytes per word in the running JVM. Assumes 64 bit/8 bytes if unknown. */ + private static final int NUM_BYTES_PER_WORD = getNumBytesPerWord(); + + /** + * Estimate of memory overhead per KV record in bytes not including memory associated with keys + * and values. + * + *
    + *
  • Object reference within {@link ArrayList} (1 word), + *
  • A {@link KV} (2 words), + *
  • Two byte arrays (2 words for array lengths), + *
  • Per-object overhead (JVM-specific, guessing 2 words * 3 objects) + *
+ */ + private static final int RECORD_MEMORY_OVERHEAD_ESTIMATE = 11 * NUM_BYTES_PER_WORD; + + /** Maximum size of the buffer in bytes. */ + private int maxBufferSize; + + /** Current number of stored bytes. Including estimated overhead bytes. */ + private int numBytes; + + /** Whether sort has been called. */ + private boolean sortCalled; + + /** The stored records to be sorted. */ + private ArrayList> records = new ArrayList>(); + + /** Private constructor. */ + private InMemorySorter(Options options) { + maxBufferSize = options.getMemoryMB() * 1024 * 1024; + } + + /** Create a new sorter from provided options. */ + public static InMemorySorter create(Options options) { + return new InMemorySorter(options); + } + + @Override + public void add(KV record) { + checkState(addIfRoom(record), "No space remaining for in memory sorting"); + } + + /** Adds the record is there is room and returns true. Otherwise returns false. */ + public boolean addIfRoom(KV record) { + checkState(!sortCalled, "Records can only be added before sort()"); + + int recordBytes = estimateRecordBytes(record); + if (roomInBuffer(numBytes + recordBytes, records.size() + 1)) { + records.add(record); + numBytes += recordBytes; + return true; + } else { + return false; + } + } + + @Override + public Iterable> sort() { + checkState(!sortCalled, "sort() can only be called once."); + + sortCalled = true; + + Comparator> kvComparator = + new Comparator>() { + + @Override + public int compare(KV o1, KV o2) { + return COMPARATOR.compare(o1.getKey(), o2.getKey()); + } + }; + Collections.sort(records, kvComparator); + return Collections.unmodifiableList(records); + } + + /** + * Estimate the number of additional bytes required to store this record. Including the key, the + * value and any overhead for objects and references. + */ + private int estimateRecordBytes(KV record) { + return RECORD_MEMORY_OVERHEAD_ESTIMATE + record.getKey().length + record.getValue().length; + } + + /** + * Check whether we have room to store the provided total number of bytes and total number of + * records. + */ + private boolean roomInBuffer(int numBytes, int numRecords) { + // Collections.sort may allocate up to n/2 extra object references. + // Also, ArrayList grows by a factor of 1.5x, so there might be up to n/2 null object + // references in the backing array. + // And finally, in Java 7, Collections.sort performs a defensive copy to an array in case the + // input list is a LinkedList. + // So assume we need an additional overhead of two words per record in the worst case + return (numBytes + (numRecords * NUM_BYTES_PER_WORD * 2)) < maxBufferSize; + } + + /** + * Returns the number of bytes in a word according to the JVM. Defaults to 8 for 64 bit if answer + * unknown. + */ + private static int getNumBytesPerWord() { + String bitsPerWord = System.getProperty("sun.arch.data.model"); + + try { + return Integer.parseInt(bitsPerWord) / 8; + } catch (Exception e) { + // Can't determine whether 32 or 64 bit, so assume 64 + return 8; + } + } +} diff --git a/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/SortValues.java b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/SortValues.java new file mode 100644 index 0000000000..32c393277c --- /dev/null +++ b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/SortValues.java @@ -0,0 +1,211 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.sorter; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.IterableCoder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.util.CoderUtils; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import java.io.IOException; +import java.util.Iterator; + +/** + * {@code SortValues} takes a {@code PCollection>>>} with elements consisting of a primary key and iterables + * over {@code } pairs, and returns a {@code PCollection>>} of the same elements but with values sorted by a secondary + * key. + * + *

This transform ignores the primary key, there is no guarantee of any relationship between + * elements with different primary keys. The primary key is explicit here only because this + * transform is typically used on a result of a {@link GroupByKey} transform. + * + *

This transform sorts by lexicographic comparison of the byte representations of the secondary + * keys and may write secondary key-value pairs to disk. In order to retrieve the byte + * representations it requires the input PCollection to use a {@link KvCoder} for its input, an + * {@link IterableCoder} for its input values and a {@link KvCoder} for its secondary key-value + * pairs. + */ +public class SortValues + extends PTransform< + PCollection>>>, + PCollection>>>> { + + private BufferedExternalSorter.Options sorterOptions; + + private SortValues(BufferedExternalSorter.Options sorterOptions) { + this.sorterOptions = sorterOptions; + } + + /** + * Returns a {@code SortValues} {@link PTransform}. + * + * @param the type of the primary keys of the input and output {@code PCollection}s + * @param the type of the secondary (sort) keys of the input and output {@code + * PCollection}s + * @param the type of the values of the input and output {@code PCollection}s + */ + public static + SortValues create( + BufferedExternalSorter.Options sorterOptions) { + return new SortValues<>(sorterOptions); + } + + @Override + public PCollection>>> apply( + PCollection>>> input) { + return input.apply( + ParDo.of( + new SortValuesDoFn( + sorterOptions, + getSecondaryKeyCoder(input.getCoder()), + getValueCoder(input.getCoder())))); + } + + @Override + protected Coder>>> getDefaultOutputCoder( + PCollection>>> input) { + return input.getCoder(); + } + + /** Retrieves the {@link Coder} for the secondary key-value pairs. */ + @SuppressWarnings("unchecked") + private static + KvCoder getSecondaryKeyValueCoder( + Coder>>> inputCoder) { + if (!(inputCoder instanceof KvCoder)) { + throw new IllegalStateException("SortValues requires its input to use KvCoder"); + } + @SuppressWarnings("unchecked") + KvCoder>> kvCoder = + (KvCoder>>) (inputCoder); + + if (!(kvCoder.getValueCoder() instanceof IterableCoder)) { + throw new IllegalStateException( + "SortValues requires the values be encoded with IterableCoder"); + } + IterableCoder> iterableCoder = + (IterableCoder>) (kvCoder.getValueCoder()); + + if (!(iterableCoder.getElemCoder() instanceof KvCoder)) { + throw new IllegalStateException( + "SortValues requires the secondary key-value pairs to use KvCoder"); + } + return (KvCoder) (iterableCoder.getElemCoder()); + } + + /** Retrieves the {@link Coder} for the secondary keys. */ + private static Coder getSecondaryKeyCoder( + Coder>>> inputCoder) { + return getSecondaryKeyValueCoder(inputCoder).getKeyCoder(); + } + + /** Returns the {@code Coder} of the values associated with the secondary keys. */ + private static Coder getValueCoder( + Coder>>> inputCoder) { + return getSecondaryKeyValueCoder(inputCoder).getValueCoder(); + } + + private static class SortValuesDoFn + extends DoFn< + KV>>, + KV>>> { + private final BufferedExternalSorter.Options sorterOptions; + private final Coder keyCoder; + private final Coder valueCoder; + + SortValuesDoFn( + BufferedExternalSorter.Options sorterOptions, + Coder keyCoder, + Coder valueCoder) { + this.sorterOptions = sorterOptions; + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + } + + @Override + public void processElement(ProcessContext c) { + Iterable> records = c.element().getValue(); + + try { + Sorter sorter = BufferedExternalSorter.create(sorterOptions); + for (KV record : records) { + sorter.add( + KV.of( + CoderUtils.encodeToByteArray(keyCoder, record.getKey()), + CoderUtils.encodeToByteArray(valueCoder, record.getValue()))); + } + + c.output( + KV.of( + c.element().getKey(), + (Iterable>) (new DecodingIterable(sorter.sort())))); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private class DecodingIterable implements Iterable> { + Iterable> iterable; + + DecodingIterable(Iterable> iterable) { + this.iterable = iterable; + } + + @Override + public Iterator> iterator() { + return new DecodingIterator(iterable.iterator()); + } + } + + private class DecodingIterator implements Iterator> { + Iterator> iterator; + + DecodingIterator(Iterator> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public KV next() { + KV next = iterator.next(); + try { + return KV.of( + CoderUtils.decodeFromByteArray(keyCoder, next.getKey()), + CoderUtils.decodeFromByteArray(valueCoder, next.getValue())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Iterator does not support remove"); + } + } + } +} diff --git a/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/Sorter.java b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/Sorter.java new file mode 100644 index 0000000000..b28b28314c --- /dev/null +++ b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/Sorter.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.sorter; + +import com.google.cloud.dataflow.sdk.values.KV; +import java.io.IOException; + +/** + * Interface for classes which can sort {@code } pairs by the key. + * + *

Records must first be added by calling {@link #add(KV)}. Then {@link #sort()} can be called at + * most once. + * + *

TODO: Support custom comparison functions. + */ +interface Sorter { + + /** + * Adds a given record to the sorter. + * + *

Records can only be added before calling {@link #sort()}. + */ + void add(KV record) throws IOException; + + /** + * Sorts the added elements and returns an {@link Iterable} over the sorted elements. + * + *

Can be called at most once. + */ + Iterable> sort() throws IOException; +} diff --git a/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/package-info.java b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/package-info.java new file mode 100644 index 0000000000..9a06e84b5b --- /dev/null +++ b/contrib/sorter/src/main/java/com/google/cloud/dataflow/contrib/sorter/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Utility for performing local sort of potentially large sets of values. Will sort in memory and + * spill to disk for external sorting if necessary. + */ +package com.google.cloud.dataflow.contrib.sorter; diff --git a/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/BufferedExternalSorterTest.java b/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/BufferedExternalSorterTest.java new file mode 100644 index 0000000000..954642a833 --- /dev/null +++ b/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/BufferedExternalSorterTest.java @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.sorter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.cloud.dataflow.contrib.sorter.SorterTestUtils.SorterGenerator; +import com.google.cloud.dataflow.sdk.values.KV; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; + +/** Tests for {@link BufferedExternalSorter}. */ +@RunWith(JUnit4.class) +public class BufferedExternalSorterTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @SuppressWarnings("unchecked") + @Test + public void testNoFallback() throws Exception { + ExternalSorter mockExternalSorter = mock(ExternalSorter.class); + InMemorySorter mockInMemorySorter = mock(InMemorySorter.class); + BufferedExternalSorter testSorter = + new BufferedExternalSorter(mockExternalSorter, mockInMemorySorter); + + KV[] kvs = + new KV[] { + KV.of(new byte[] {0}, new byte[] {}), + KV.of(new byte[] {0, 1}, new byte[] {}), + KV.of(new byte[] {1}, new byte[] {}) + }; + + when(mockInMemorySorter.addIfRoom(kvs[0])).thenReturn(true); + when(mockInMemorySorter.addIfRoom(kvs[1])).thenReturn(true); + when(mockInMemorySorter.addIfRoom(kvs[2])).thenReturn(true); + when(mockInMemorySorter.sort()).thenReturn(Arrays.asList(kvs[0], kvs[1], kvs[2])); + + testSorter.add(kvs[0]); + testSorter.add(kvs[1]); + testSorter.add(kvs[2]); + + assertEquals(Arrays.asList(kvs[0], kvs[1], kvs[2]), testSorter.sort()); + + // Verify external sorter was never called + verify(mockExternalSorter, never()).add(any(KV.class)); + verify(mockExternalSorter, never()).sort(); + } + + @Test + public void testFallback() throws Exception { + ExternalSorter mockExternalSorter = mock(ExternalSorter.class); + InMemorySorter mockInMemorySorter = mock(InMemorySorter.class); + BufferedExternalSorter testSorter = + new BufferedExternalSorter(mockExternalSorter, mockInMemorySorter); + + @SuppressWarnings("unchecked") + KV[] kvs = + new KV[] { + KV.of(new byte[] {0}, new byte[] {}), + KV.of(new byte[] {0, 1}, new byte[] {}), + KV.of(new byte[] {1}, new byte[] {}) + }; + + when(mockInMemorySorter.addIfRoom(kvs[0])).thenReturn(true); + when(mockInMemorySorter.addIfRoom(kvs[1])).thenReturn(true); + when(mockInMemorySorter.addIfRoom(kvs[2])).thenReturn(false); + when(mockInMemorySorter.sort()).thenReturn(Arrays.asList(kvs[0], kvs[1])); + when(mockExternalSorter.sort()).thenReturn(Arrays.asList(kvs[0], kvs[1], kvs[2])); + + testSorter.add(kvs[0]); + testSorter.add(kvs[1]); + testSorter.add(kvs[2]); + + assertEquals(Arrays.asList(kvs[0], kvs[1], kvs[2]), testSorter.sort()); + + verify(mockExternalSorter, times(1)).add(kvs[0]); + verify(mockExternalSorter, times(1)).add(kvs[1]); + verify(mockExternalSorter, times(1)).add(kvs[2]); + } + + @Test + public void testEmpty() throws Exception { + SorterTestUtils.testEmpty(BufferedExternalSorter.create(new BufferedExternalSorter.Options())); + } + + @Test + public void testSingleElement() throws Exception { + SorterTestUtils.testSingleElement( + BufferedExternalSorter.create(new BufferedExternalSorter.Options())); + } + + @Test + public void testEmptyKeyValueElement() throws Exception { + SorterTestUtils.testEmptyKeyValueElement( + BufferedExternalSorter.create(new BufferedExternalSorter.Options())); + } + + @Test + public void testMultipleIterations() throws Exception { + SorterTestUtils.testMultipleIterations( + BufferedExternalSorter.create(new BufferedExternalSorter.Options())); + } + + @Test + public void testManySortersFewRecords() throws Exception { + SorterTestUtils.testRandom( + new SorterGenerator() { + @Override + public Sorter generateSorter() throws Exception { + return BufferedExternalSorter.create(new BufferedExternalSorter.Options()); + } + }, + 1000000, + 10); + } + + @Test + public void testOneSorterManyRecords() throws Exception { + SorterTestUtils.testRandom( + new SorterGenerator() { + @Override + public Sorter generateSorter() throws Exception { + return BufferedExternalSorter.create(new BufferedExternalSorter.Options()); + } + }, + 1, + 1000000); + } + + @Test + public void testAddAfterSort() throws Exception { + SorterTestUtils.testAddAfterSort( + BufferedExternalSorter.create(new BufferedExternalSorter.Options()), thrown); + fail(); + } + + @Test + public void testSortTwice() throws Exception { + SorterTestUtils.testSortTwice( + BufferedExternalSorter.create(new BufferedExternalSorter.Options()), thrown); + fail(); + } + + @Test + public void testNegativeMemory() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("memoryMB must be greater than zero"); + BufferedExternalSorter.Options options = new BufferedExternalSorter.Options(); + options.setMemoryMB(-1); + } +} diff --git a/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/ExternalSorterTest.java b/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/ExternalSorterTest.java new file mode 100644 index 0000000000..150ffbd633 --- /dev/null +++ b/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/ExternalSorterTest.java @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.sorter; + +import static org.junit.Assert.fail; + +import com.google.cloud.dataflow.contrib.sorter.SorterTestUtils.SorterGenerator; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for Sorter. */ +@RunWith(JUnit4.class) +public class ExternalSorterTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testEmpty() throws Exception { + SorterTestUtils.testEmpty(ExternalSorter.create(new ExternalSorter.Options())); + } + + @Test + public void testSingleElement() throws Exception { + SorterTestUtils.testSingleElement(ExternalSorter.create(new ExternalSorter.Options())); + } + + @Test + public void testEmptyKeyValueElement() throws Exception { + SorterTestUtils.testEmptyKeyValueElement(ExternalSorter.create(new ExternalSorter.Options())); + } + + @Test + public void testMultipleIterations() throws Exception { + SorterTestUtils.testMultipleIterations(ExternalSorter.create(new ExternalSorter.Options())); + } + + @Test + public void testRandom() throws Exception { + SorterTestUtils.testRandom( + new SorterGenerator() { + @Override + public Sorter generateSorter() throws Exception { + return ExternalSorter.create(new ExternalSorter.Options()); + } + }, + 1, + 1000000); + } + + @Test + public void testAddAfterSort() throws Exception { + SorterTestUtils.testAddAfterSort(ExternalSorter.create(new ExternalSorter.Options()), thrown); + fail(); + } + + @Test + public void testSortTwice() throws Exception { + SorterTestUtils.testSortTwice(ExternalSorter.create(new ExternalSorter.Options()), thrown); + fail(); + } + + @Test + public void testNegativeMemory() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("memoryMB must be greater than zero"); + ExternalSorter.Options options = new ExternalSorter.Options(); + options.setMemoryMB(-1); + } +} diff --git a/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/InMemorySorterTest.java b/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/InMemorySorterTest.java new file mode 100644 index 0000000000..1c4e43fc1c --- /dev/null +++ b/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/InMemorySorterTest.java @@ -0,0 +1,142 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.sorter; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.fail; + +import com.google.cloud.dataflow.contrib.sorter.SorterTestUtils.SorterGenerator; +import com.google.cloud.dataflow.sdk.values.KV; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link InMemorySorter}. */ +@RunWith(JUnit4.class) +public class InMemorySorterTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testEmpty() throws Exception { + SorterTestUtils.testEmpty(InMemorySorter.create(new InMemorySorter.Options())); + } + + @Test + public void testSingleElement() throws Exception { + SorterTestUtils.testSingleElement(InMemorySorter.create(new InMemorySorter.Options())); + } + + @Test + public void testEmptyKeyValueElement() throws Exception { + SorterTestUtils.testEmptyKeyValueElement(InMemorySorter.create(new InMemorySorter.Options())); + } + + @Test + public void testMultipleIterations() throws Exception { + SorterTestUtils.testMultipleIterations(InMemorySorter.create(new InMemorySorter.Options())); + } + + @Test + public void testManySorters() throws Exception { + SorterTestUtils.testRandom( + new SorterGenerator() { + @Override + public Sorter generateSorter() throws Exception { + return InMemorySorter.create(new InMemorySorter.Options()); + } + }, + 1000000, + 10); + } + + @Test + public void testAddAfterSort() throws Exception { + SorterTestUtils.testAddAfterSort(InMemorySorter.create(new InMemorySorter.Options()), thrown); + fail(); + } + + @Test + public void testSortTwice() throws Exception { + SorterTestUtils.testSortTwice(InMemorySorter.create(new InMemorySorter.Options()), thrown); + fail(); + } + + /** + * Verify an exception is thrown when the in memory sorter runs out of space. + * + * @throws Exception + */ + @Test + public void testOutOfSpace() throws Exception { + thrown.expect(IllegalStateException.class); + thrown.expectMessage(is("No space remaining for in memory sorting")); + SorterTestUtils.testRandom( + new SorterGenerator() { + @Override + public Sorter generateSorter() throws Exception { + InMemorySorter.Options options = new InMemorySorter.Options(); + options.setMemoryMB(1); + return InMemorySorter.create(options); + } + }, + 1, + 10000000); + } + + @Test + public void testAddIfRoom() throws Exception { + InMemorySorter.Options options = new InMemorySorter.Options(); + options.setMemoryMB(1); + InMemorySorter sorter = InMemorySorter.create(options); + + // Should be a few kb less than what the total buffer supports + KV bigRecord = KV.of(new byte[1024 * 500], new byte[1024 * 500]); + + // First add should succeed, second add should fail due to insufficient room + Assert.assertTrue(sorter.addIfRoom(bigRecord)); + Assert.assertFalse(sorter.addIfRoom(bigRecord)); + } + + @Test + public void testAddIfRoomOverhead() throws Exception { + InMemorySorter.Options options = new InMemorySorter.Options(); + options.setMemoryMB(1); + InMemorySorter sorter = InMemorySorter.create(options); + + // No bytes within record, should still run out of room due to memory overhead of record + KV tinyRecord = KV.of(new byte[0], new byte[0]); + + // Verify we can't insert one million records into this one megabyte buffer + boolean stillRoom = true; + for (int i = 0; (i < 1000000) && stillRoom; i++) { + stillRoom = sorter.addIfRoom(tinyRecord); + } + + Assert.assertFalse(stillRoom); + } + + @Test + public void testNegativeMemory() throws Exception { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("memoryMB must be greater than zero"); + InMemorySorter.Options options = new InMemorySorter.Options(); + options.setMemoryMB(-1); + } +} diff --git a/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/SortValuesTest.java b/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/SortValuesTest.java new file mode 100644 index 0000000000..bb7273bad6 --- /dev/null +++ b/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/SortValuesTest.java @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.sorter; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; + +/** Tests for {@link SortValues} transform. */ +@RunWith(JUnit4.class) +public class SortValuesTest { + + @Test + public void testSecondaryKeySorting() throws Exception { + Pipeline p = TestPipeline.create(); + + // Create a PCollection of > pairs. + PCollection>> input = + p.apply( + Create.of( + Arrays.asList( + KV.of("key1", KV.of("secondaryKey2", 20)), + KV.of("key2", KV.of("secondaryKey2", 200)), + KV.of("key1", KV.of("secondaryKey3", 30)), + KV.of("key1", KV.of("secondaryKey1", 10)), + KV.of("key2", KV.of("secondaryKey1", 100))))); + + // Group by Key, bringing pairs for the same Key together. + PCollection>>> grouped = + input.apply(GroupByKey.>create()); + + // For every Key, sort the iterable of pairs by SecondaryKey. + PCollection>>> groupedAndSorted = + grouped.apply( + SortValues.create(new BufferedExternalSorter.Options())); + + DataflowAssert.that(groupedAndSorted) + .satisfies(new AssertThatHasExpectedContentsForTestSecondaryKeySorting()); + + p.run(); + } + + static class AssertThatHasExpectedContentsForTestSecondaryKeySorting + implements SerializableFunction>>>, Void> { + @SuppressWarnings("unchecked") + @Override + public Void apply(Iterable>>> actual) { + assertThat( + actual, + containsInAnyOrder( + KvMatcher.isKv( + is("key1"), + contains( + KvMatcher.isKv(is("secondaryKey1"), is(10)), + KvMatcher.isKv(is("secondaryKey2"), is(20)), + KvMatcher.isKv(is("secondaryKey3"), is(30)))), + KvMatcher.isKv( + is("key2"), + contains( + KvMatcher.isKv(is("secondaryKey1"), is(100)), + KvMatcher.isKv(is("secondaryKey2"), is(200)))))); + return null; + } + } + + /** Matcher for KVs. Forked from com.google.cloud.dataflow/sdk/TestUtils.java */ + public static class KvMatcher extends TypeSafeMatcher> { + final Matcher keyMatcher; + final Matcher valueMatcher; + + public static KvMatcher isKv(Matcher keyMatcher, Matcher valueMatcher) { + return new KvMatcher<>(keyMatcher, valueMatcher); + } + + public KvMatcher(Matcher keyMatcher, Matcher valueMatcher) { + this.keyMatcher = keyMatcher; + this.valueMatcher = valueMatcher; + } + + @Override + public boolean matchesSafely(KV kv) { + return keyMatcher.matches(kv.getKey()) && valueMatcher.matches(kv.getValue()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("a KV(") + .appendValue(keyMatcher) + .appendText(", ") + .appendValue(valueMatcher) + .appendText(")"); + } + } +} diff --git a/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/SorterTestUtils.java b/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/SorterTestUtils.java new file mode 100644 index 0000000000..9aff1f0c11 --- /dev/null +++ b/contrib/sorter/src/test/java/com/google/cloud/dataflow/contrib/sorter/SorterTestUtils.java @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.contrib.sorter; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.common.primitives.UnsignedBytes; +import org.junit.rules.ExpectedException; + +import java.util.Random; + +/** A set of basic tests for {@link Sorter}s. */ +public class SorterTestUtils { + + public static void testEmpty(Sorter sorter) throws Exception { + assertThat(sorter.sort(), is(emptyIterable())); + } + + @SuppressWarnings("unchecked") + public static void testSingleElement(Sorter sorter) throws Exception { + KV kv = KV.of(new byte[] {4, 7}, new byte[] {1, 2}); + sorter.add(kv); + assertThat(sorter.sort(), contains(kv)); + } + + @SuppressWarnings("unchecked") + public static void testEmptyKeyValueElement(Sorter sorter) throws Exception { + KV kv = KV.of(new byte[] {}, new byte[] {}); + sorter.add(kv); + assertThat(sorter.sort(), contains(kv)); + } + + @SuppressWarnings("unchecked") + public static void testMultipleIterations(Sorter sorter) throws Exception { + KV[] kvs = + new KV[] { + KV.of(new byte[] {0}, new byte[] {}), + KV.of(new byte[] {0, 1}, new byte[] {}), + KV.of(new byte[] {1}, new byte[] {}) + }; + sorter.add(kvs[1]); + sorter.add(kvs[2]); + sorter.add(kvs[0]); + Iterable> sorted = sorter.sort(); + assertThat(sorted, contains(kvs[0], kvs[1], kvs[2])); + // Iterate second time. + assertThat(sorted, contains(kvs[0], kvs[1], kvs[2])); + } + + /** Class that generates a new sorter. Used when performance testing multiple sorter creation. */ + interface SorterGenerator { + Sorter generateSorter() throws Exception; + } + + /** + * Generates random records and executes a test with the provided number of sorters and number of + * records per sorter. + */ + public static void testRandom( + SorterGenerator sorterGenerator, int numSorters, int numRecordsPerSorter) throws Exception { + long start = System.currentTimeMillis(); + for (int i = 0; i < numSorters; ++i) { + Sorter sorter = sorterGenerator.generateSorter(); + Random rnd = new Random(0L); + for (int j = 0; j < numRecordsPerSorter; ++j) { + byte[] key = new byte[8]; + byte[] value = new byte[8]; + rnd.nextBytes(key); + rnd.nextBytes(value); + sorter.add(KV.of(key, value)); + } + + byte[] prevKey = null; + for (KV record : sorter.sort()) { + assertTrue( + prevKey == null + || UnsignedBytes.lexicographicalComparator().compare(prevKey, record.getKey()) < 0); + prevKey = record.getKey(); + } + } + long end = System.currentTimeMillis(); + System.out.println( + "Took " + + (end - start) + + "ms for " + + numRecordsPerSorter * numSorters * 1000.0 / (end - start) + + " records/s"); + } + + /** Tests trying to call add after calling sort. Should throw an exception. */ + public static void testAddAfterSort(Sorter sorter, ExpectedException thrown) throws Exception { + thrown.expect(IllegalStateException.class); + thrown.expectMessage(is("Records can only be added before sort()")); + KV kv = KV.of(new byte[] {4, 7}, new byte[] {1, 2}); + sorter.add(kv); + sorter.sort(); + sorter.add(kv); + } + + /** Tests trying to calling sort twice. Should throw an exception. */ + public static void testSortTwice(Sorter sorter, ExpectedException thrown) throws Exception { + thrown.expect(IllegalStateException.class); + thrown.expectMessage(is("sort() can only be called once.")); + KV kv = KV.of(new byte[] {4, 7}, new byte[] {1, 2}); + sorter.add(kv); + sorter.sort(); + sorter.sort(); + } +}