Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11] Integrate Spark runner with Beam #42

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions runners/pom.xml
Expand Up @@ -38,6 +38,7 @@

<modules>
<module>flink</module>
<module>spark</module>
</modules>

</project>
272 changes: 133 additions & 139 deletions runners/spark/pom.xml
Expand Up @@ -11,21 +11,136 @@ the specific language governing permissions and limitations under the
License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<name>Dataflow on Spark</name>
<groupId>com.cloudera.dataflow.spark</groupId>
<artifactId>spark-dataflow</artifactId>

<parent>
<groupId>org.apache.beam</groupId>
<artifactId>runners</artifactId>
<version>1.5.0-SNAPSHOT</version>
</parent>

<artifactId>spark-runner</artifactId>
<version>0.4.3-SNAPSHOT</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think many elements in this file can be inherited from the parent instead of being specified here, including:

  • version
  • licenses
  • properties (most if not all)
  • repositories
  • developers
  • scm
  • prerequisites


<name>Spark Beam Runner</name>
<packaging>jar</packaging>

<inceptionYear>2014</inceptionYear>

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.7</java.version>
<spark.version>1.5.2</spark.version>
<google-cloud-dataflow-version>1.3.0</google-cloud-dataflow-version>
<beam.version>1.5.0-SNAPSHOT</beam.version>
</properties>

<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>${beam.version}</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will end up being ${project.version}, or something like that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be ${project.version} if we keep all runners in the same version as the SDK.
For now, I could use ${parent.version}

<exclusions>
<!-- Use Hadoop/Spark's backend logger -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-examples-all</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds fair to me to prefer the runners logger.. Flink runner does the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency is needed because some of the examples are used to test the Spark runner.

<version>${beam.version}</version>
<exclusions>
<!-- Use Hadoop/Spark's backend logger -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.7</version>
<classifier>hadoop2</classifier>
<exclusions>
<!-- exclude old Jetty version of servlet API -->
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
Expand Down Expand Up @@ -231,20 +346,20 @@ License.
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<!-- relocate Guava used by Dataflow (v18) since it conflicts with version used by Hadoop (v11) -->
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>com.cloudera.dataflow.spark.relocated.com.google.common</shadedPattern>
</relocation>
</relocations>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>spark-app</shadedClassifierName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
<!--<configuration>-->
<!--<relocations>-->
<!--&lt;!&ndash; relocate Guava used by Dataflow (v18) since it conflicts with version used by Hadoop (v11) &ndash;&gt;-->
<!--<relocation>-->
<!--<pattern>com.google.common</pattern>-->
<!--<shadedPattern>com.cloudera.dataflow.spark.relocated.com.google.common</shadedPattern>-->
<!--</relocation>-->
<!--</relocations>-->
<!--<shadedArtifactAttached>true</shadedArtifactAttached>-->
<!--<shadedClassifierName>spark-app</shadedClassifierName>-->
<!--<transformers>-->
<!--<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />-->
<!--</transformers>-->
<!--</configuration>-->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guava will still need to be relocated to run properly on a cluster, won't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SDK upgraded to Guava 19 but I guess shading is still necessary for cluster. I'll reinstate the shade configuration.

</execution>
</executions>
</plugin>
Expand Down Expand Up @@ -274,89 +389,6 @@ License.
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>${google-cloud-dataflow-version}</version>
<exclusions>
<!-- Use Hadoop/Spark's backend logger -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-examples-all</artifactId>
<version>${google-cloud-dataflow-version}</version>
<exclusions>
<!-- Use Hadoop/Spark's backend logger -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.7</version>
<classifier>hadoop2</classifier>
<exclusions>
<!-- exclude old Jetty version of servlet API -->
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<reporting>
<plugins>
<plugin>
Expand All @@ -380,25 +412,12 @@ License.
</reporting>


<url>http://github.com/cloudera/spark-dataflow</url>
<inceptionYear>2014</inceptionYear>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<developers>
<developer>
<name>Cloudera, Inc.</name>
</developer>
</developers>

<issueManagement>
<system>GitHub</system>
<url>https://github.com/cloudera/spark-dataflow/issues</url>
</issueManagement>
<scm>
<connection>scm:git:https://github.com/cloudera/spark-dataflow.git</connection>
<developerConnection>scm:git:https://github.com/cloudera/spark-dataflow.git</developerConnection>
Expand All @@ -410,31 +429,6 @@ License.
<maven>3.2.1</maven>
</prerequisites>

<repositories>
<repository>
<id>cloudera.repo</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<name>Cloudera Repositories</name>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<distributionManagement>
<repository>
<id>cloudera.repo</id>
<url>https://repository.cloudera.com/artifactory/libs-release-local</url>
</repository>
<snapshotRepository>
<id>cloudera.snapshots.repo</id>
<url>https://repository.cloudera.com/artifactory/libs-snapshot-local</url>
</snapshotRepository>
</distributionManagement>

<profiles>
<profile>
<id>release-sign-artifacts</id>
Expand Down
Expand Up @@ -13,7 +13,7 @@
* License.
*/

package com.cloudera.dataflow.spark;
package org.apache.beam.runners.spark;

import java.util.Iterator;
import java.util.LinkedList;
Expand All @@ -23,6 +23,7 @@
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.spark.api.java.function.FlatMapFunction;

/**
Expand Down
Expand Up @@ -13,7 +13,7 @@
* License.
*/

package com.cloudera.dataflow.spark;
package org.apache.beam.runners.spark;

import static com.google.common.base.Preconditions.checkArgument;

Expand All @@ -38,6 +38,7 @@
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;

Expand Down
Expand Up @@ -13,7 +13,7 @@
* License.
*/

package com.cloudera.dataflow.spark;
package org.apache.beam.runners.spark;

import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.values.PCollection;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* License.
*/

package com.cloudera.dataflow.spark;
package org.apache.beam.runners.spark;

import java.util.Iterator;
import java.util.Map;
Expand All @@ -25,6 +25,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import org.apache.beam.runners.spark.util.BroadcastHelper;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.joda.time.Instant;
import scala.Tuple2;
Expand Down