Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions contrib/sorter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#Sorter
This module provides the SortValues transform, which takes a `PCollection<KV<K, Iterable<KV<K2, V>>>>` and produces a `PCollection<KV<K, Iterable<KV<K2, V>>>>` where, for each primary key `K` the paired `Iterable<KV<K2, V>>` has been sorted by the byte encoding of secondary key (`K2`). It will efficiently and scalably sort the iterables, even if they are large (do not fit in memory).

##Caveats
* This transform performs value-only sorting; the iterable accompanying each key is sorted, but *there is no relationship between different keys*, as Dataflow does not support any defined relationship between different elements in a PCollection.
* Each `Iterable<KV<K2, V>>` is sorted on a single worker using local memory and disk. This means that `SortValues` may be a performance and/or scalability bottleneck when used in different pipelines. For example, users are discouraged from using `SortValues` on a `PCollection` of a single element to globally sort a large `PCollection`.

##Options
* The user can customize the temporary location used if sorting requires spilling to disk and the maximum amount of memory to use by creating a custom instance of `BufferedExternalSorter.Options` to pass into `SortValues.create`.

##Using `SortValues`
~~~~
PCollection<KV<String, KV<String, Integer>>> input = ...

// Group by primary key, bringing <SecondaryKey, Value> pairs for the same key together.
PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped =
input.apply(GroupByKey.<String, KV<String, Integer>>create());

// For every primary key, sort the iterable of <SecondaryKey, Value> pairs by secondary key.
PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
grouped.apply(
SortValues.<String, String, Integer>create(new BufferedExternalSorter.Options()));
~~~~
237 changes: 237 additions & 0 deletions contrib/sorter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~ Copyright (C) 2016 Google Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-contrib-sorter</artifactId>
<name>Google Cloud Dataflow Sorter Library</name>
<description>Library to sort data from within Dataflow pipelines.</description>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<google-cloud-dataflow-version>[1.2.0,2.0.0)</google-cloud-dataflow-version>
<hadoop.version>2.7.1</hadoop.version>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>6.19</version>
</dependency>
</dependencies>
<configuration>
<configLocation>../../checkstyle.xml</configLocation>
<consoleOutput>true</consoleOutput>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Source plugin for generating source and test-source JARs. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
<execution>
<id>attach-test-sources</id>
<phase>test-compile</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.3</version>
<configuration>
<windowtitle>Google Cloud Dataflow Sorter Contrib</windowtitle>
<doctitle>Google Cloud Dataflow Sorter Contrib</doctitle>

<subpackages>com.google.cloud.dataflow.contrib.sorter</subpackages>
<use>false</use>
<bottom><![CDATA[<br>]]></bottom>

<offlineLinks>
<offlineLink>
<url>https://cloud.google.com/dataflow/java-sdk/JavaDoc/</url>
<location>${basedir}/../../javadoc/dataflow-sdk-docs</location>
</offlineLink>
<offlineLink>
<url>http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/</url>
<location>${basedir}/../../javadoc/guava-docs</location>
</offlineLink>
</offlineLinks>
</configuration>
<executions>
<execution>
<goals>
<goal>jar</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>

<!-- Shading Hadoop dependency so that users may use their own version
of Hadoop without interference from this module. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>bundle-and-repackage</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>true</shadeTestJar>
<artifactSet>
<includes>
<include>org.apache.hadoop:hadoop-mapreduce-client-core</include>
<include>org.apache.hadoop:hadoop-common</include>
<include>com.google.guava:guava</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.hadoop</pattern>
<shadedPattern>com.google.cloud.dataflow.repackaged.org.apache.hadoop</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>com.google.cloud.dataflow.repackaged.com.google.common</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.thirdparty</pattern>
<shadedPattern>com.google.cloud.dataflow.repackaged.com.google.thirdparty</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>

<dependencies>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>${google-cloud-dataflow-version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.dataflow.contrib.sorter;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;
import java.io.Serializable;

/**
* {@link Sorter} that will use in memory sorting until the values can't fit into memory and will
* then fall back to external sorting.
*/
public class BufferedExternalSorter implements Sorter {
/** Contains configuration for the sorter. */
public static class Options implements Serializable {
private String tempLocation = "/tmp";
private int memoryMB = 100;

/** Sets the path to a temporary location where the sorter writes intermediate files. */
public void setTempLocation(String tempLocation) {
checkArgument(
!tempLocation.startsWith("gs://"),
"BufferedExternalSorter does not support GCS temporary location");

this.tempLocation = tempLocation;
}

/** Returns the configured temporary location. */
public String getTempLocation() {
return tempLocation;
}

/**
* Sets the size of the memory buffer in megabytes. This controls both the buffer for initial in
* memory sorting and the buffer used when external sorting. Must be greater than zero.
*/
public void setMemoryMB(int memoryMB) {
checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
this.memoryMB = memoryMB;
}

/** Returns the configured size of the memory buffer. */
public int getMemoryMB() {
return memoryMB;
}
}

private ExternalSorter externalSorter;
private InMemorySorter inMemorySorter;

boolean inMemorySorterFull;

BufferedExternalSorter(ExternalSorter externalSorter, InMemorySorter inMemorySorter) {
this.externalSorter = externalSorter;
this.inMemorySorter = inMemorySorter;
}

public static BufferedExternalSorter create(Options options) {
ExternalSorter.Options externalSorterOptions = new ExternalSorter.Options();
externalSorterOptions.setMemoryMB(options.getMemoryMB());
externalSorterOptions.setTempLocation(options.getTempLocation());

InMemorySorter.Options inMemorySorterOptions = new InMemorySorter.Options();
inMemorySorterOptions.setMemoryMB(options.getMemoryMB());

return new BufferedExternalSorter(
ExternalSorter.create(externalSorterOptions), InMemorySorter.create(inMemorySorterOptions));
}

@Override
public void add(KV<byte[], byte[]> record) throws IOException {
if (!inMemorySorterFull) {
if (inMemorySorter.addIfRoom(record)) {
return;
} else {
// Flushing contents of in memory sorter to external sorter so we can rely on external
// from here on out
inMemorySorterFull = true;
transferToExternalSorter();
}
}

// In memory sorter is full, so put in external sorter instead
externalSorter.add(record);
}

/**
* Transfers all of the records loaded so far into the in memory sorter over to the external
* sorter.
*/
private void transferToExternalSorter() throws IOException {
for (KV<byte[], byte[]> record : inMemorySorter.sort()) {
externalSorter.add(record);
}
// Allow in memory sorter and its contents to be garbage collected
inMemorySorter = null;
}

@Override
public Iterable<KV<byte[], byte[]>> sort() throws IOException {
if (!inMemorySorterFull) {
return inMemorySorter.sort();
} else {
return externalSorter.sort();
}
}
}
Loading