Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions examples/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,18 @@
</dependencies>
</profile>

<!-- Include the Apache Samza runner -P samza-runner -->
<profile>
<id>samza-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-samza_2.11</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>

<!-- Custom set up for our Jenkins precommit tests that comment on GitHub -->
<profile>
<id>jenkins-precommit</id>
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-samza_2.11</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-java</artifactId>
Expand Down
1 change: 0 additions & 1 deletion runners/samza/.gitignore

This file was deleted.

60 changes: 49 additions & 11 deletions runners/samza/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
<version>2.3.0-SNAPSHOT</version>
<version>2.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>beam-runners-samza_2.10</artifactId>
<artifactId>beam-runners-samza_2.11</artifactId>
<name>Apache Beam :: Runners :: Samza</name>
<packaging>jar</packaging>

<properties>
<samza.version>0.14.0</samza.version>
<samza.version>0.15.0-SNAPSHOT</samza.version>
</properties>

<profiles>
Expand All @@ -57,12 +57,12 @@
<groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
<excludedGroups>
org.apache.beam.sdk.testing.LargeKeys$Above100MB,
org.apache.beam.sdk.testing.UsesStatefulParDo,
org.apache.beam.sdk.testing.UsesTimersInParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo,
org.apache.beam.sdk.testing.UsesAttemptedMetrics,
org.apache.beam.sdk.testing.UsesCommittedMetrics,
org.apache.beam.sdk.testing.UsesTestStream
org.apache.beam.sdk.testing.UsesTestStream,
org.apache.beam.sdk.testing.UsesImpulse
</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
Expand Down Expand Up @@ -170,24 +170,38 @@

<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.10</artifactId>
<artifactId>samza-core_2.11</artifactId>
<version>${samza.version}</version>
</dependency>

<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kafka_2.10</artifactId>
<artifactId>samza-kafka_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv-rocksdb_2.10</artifactId>
<artifactId>samza-kv_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv-rocksdb_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv-inmemory_2.11</artifactId>
<version>${samza.version}</version>
<scope>test</scope>
</dependency>

<!-- Beam -->
<dependency>
<groupId>org.apache.beam</groupId>
Expand Down Expand Up @@ -222,6 +236,13 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.2</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -258,6 +279,12 @@
<artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>

<!-- Auto register samza runner -->
<dependency>
<groupId>com.google.auto.service</groupId>
Expand All @@ -274,13 +301,19 @@

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

Expand All @@ -301,7 +334,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-model-fn-execution</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

Expand All @@ -312,6 +344,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<scope>test</scope>
</dependency>

<!-- Other transitive test dependencies -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.beam.runners.samza;

import java.util.Map;

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -29,22 +28,33 @@
*/
public interface SamzaPipelineOptions extends PipelineOptions {

@Description("The config for Samza runner.")
Map<String, String> getSamzaConfig();
void setSamzaConfig(Map<String, String> configs);
@Description("The config for Samza using a properties file. It is *optional*. "
+ "Without a config file, Samza uses a default config for local execution.")
String getConfigFilePath();
void setConfigFilePath(String filePath);

@Description("The config override to set programmatically. It will be applied on "
+ "top of config file if it exits, otherwise used directly as the config.")
Map<String, String> getConfigOverride();
void setConfigOverride(Map<String, String> configs);

@Description("The interval to check for watermarks in milliseconds")
@Description("The interval to check for watermarks in milliseconds.")
@Default.Long(1000)
long getWatermarkInterval();
void setWatermarkInterval(long interval);

@Description("The maximum number of messages to buffer for a given system")
@Description("The maximum number of messages to buffer for a given system.")
@Default.Integer(5000)
int getSystemBufferSize();
void setSystemBufferSize(int consumerBufferSize);

@Description("The maximum parallelism allowed for a given data source")
@Description("The maximum parallelism allowed for any data source.")
@Default.Integer(1)
int getMaxSourceParallelism();
void setMaxSourceParallelism(int maxSourceParallelism);

@Description("The batch get size limit for the state store.")
@Default.Integer(10000)
int getStoreBatchGetSize();
void setStoreBatchGetSize(int storeBatchGetSize);
}
Loading