Skip to content

Commit

Permalink
Merge branch 'master' into query_temp_dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
iht committed Oct 28, 2019
2 parents fe1dfa0 + 98eb81d commit 330626d
Show file tree
Hide file tree
Showing 125 changed files with 4,302 additions and 1,216 deletions.
11 changes: 7 additions & 4 deletions .test-infra/dataproc/flink_cluster.sh
Expand Up @@ -24,6 +24,7 @@
# JOB_SERVER_IMAGE: Url to job server docker image to pull on dataproc master (optional)
# ARTIFACTS_DIR: Url to bucket where artifacts will be stored for staging (optional)
# FLINK_DOWNLOAD_URL: Url to Flink .tar archive to be installed on the cluster
# HADOOP_DOWNLOAD_URL: Url to a pre-packaged Hadoop jar
# FLINK_NUM_WORKERS: Number of Flink workers
# FLINK_TASKMANAGER_SLOTS: Number of slots per Flink task manager
# DETACHED_MODE: Detached mode: should the SSH tunnel run in detached mode?
Expand All @@ -34,7 +35,8 @@
# HARNESS_IMAGES_TO_PULL='gcr.io/<IMAGE_REPOSITORY>/python:latest gcr.io/<IMAGE_REPOSITORY>/java:latest' \
# JOB_SERVER_IMAGE=gcr.io/<IMAGE_REPOSITORY>/job-server-flink:latest \
# ARTIFACTS_DIR=gs://<bucket-for-artifacts> \
# FLINK_DOWNLOAD_URL=http://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.12.tgz \
# FLINK_DOWNLOAD_URL=https://archive.apache.org/dist/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz \
# HADOOP_DOWNLOAD_URL=https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar \
# FLINK_NUM_WORKERS=2 \
# FLINK_TASKMANAGER_SLOTS=1 \
# DETACHED_MODE=false \
Expand Down Expand Up @@ -118,7 +120,8 @@ function start_tunnel() {
function create_cluster() {
local metadata="flink-snapshot-url=${FLINK_DOWNLOAD_URL},"
metadata+="flink-start-yarn-session=true,"
metadata+="flink-taskmanager-slots=${FLINK_TASKMANAGER_SLOTS}"
metadata+="flink-taskmanager-slots=${FLINK_TASKMANAGER_SLOTS},"
metadata+="hadoop-jar-url=${HADOOP_DOWNLOAD_URL}"

[[ -n "${HARNESS_IMAGES_TO_PULL:=}" ]] && metadata+=",beam-sdk-harness-images-to-pull=${HARNESS_IMAGES_TO_PULL}"
[[ -n "${JOB_SERVER_IMAGE:=}" ]] && metadata+=",beam-job-server-image=${JOB_SERVER_IMAGE}"
Expand All @@ -131,7 +134,7 @@ function create_cluster() {

# Docker init action restarts yarn so we need to start yarn session after this restart happens.
# This is why flink init action is invoked last.
gcloud dataproc clusters create $CLUSTER_NAME --num-workers=$num_dataproc_workers --initialization-actions $DOCKER_INIT,$BEAM_INIT,$FLINK_INIT --metadata "${metadata}", --image-version=$image_version --zone=$GCLOUD_ZONE --quiet
gcloud dataproc clusters create $CLUSTER_NAME --region=global --num-workers=$num_dataproc_workers --initialization-actions $DOCKER_INIT,$BEAM_INIT,$FLINK_INIT --metadata "${metadata}", --image-version=$image_version --zone=$GCLOUD_ZONE --quiet
}

# Runs init actions for Docker, Portability framework (Beam) and Flink cluster
Expand All @@ -152,7 +155,7 @@ function restart() {

# Deletes a Flink cluster.
function delete() {
gcloud dataproc clusters delete $CLUSTER_NAME --quiet
gcloud dataproc clusters delete $CLUSTER_NAME --region=global --quiet
}

"$@"
9 changes: 8 additions & 1 deletion .test-infra/dataproc/init-actions/flink.sh
Expand Up @@ -56,6 +56,9 @@ readonly START_FLINK_YARN_SESSION_DEFAULT=true
# Set this to install flink from a snapshot URL instead of apt
readonly FLINK_SNAPSHOT_URL_METADATA_KEY='flink-snapshot-url'

# Set this to install pre-packaged Hadoop jar
readonly HADOOP_JAR_URL_METADATA_KEY='hadoop-jar-url'

# Set this to define how many task slots are there per flink task manager
readonly FLINK_TASKMANAGER_SLOTS_METADATA_KEY='flink-taskmanager-slots'

Expand Down Expand Up @@ -88,6 +91,7 @@ function install_apt_get() {
function install_flink_snapshot() {
local work_dir="$(mktemp -d)"
local flink_url="$(/usr/share/google/get_metadata_value "attributes/${FLINK_SNAPSHOT_URL_METADATA_KEY}")"
local hadoop_url="$(/usr/share/google/get_metadata_value "attributes/${HADOOP_JAR_URL_METADATA_KEY}")"
local flink_local="${work_dir}/flink.tgz"
local flink_toplevel_pattern="${work_dir}/flink-*"

Expand All @@ -103,6 +107,9 @@ function install_flink_snapshot() {

popd # work_dir

if [[ ! -z "${hadoop_url}" ]]; then
cd "${FLINK_INSTALL_DIR}/lib"; curl -O "${hadoop_url}"
fi
}

function configure_flink() {
Expand Down Expand Up @@ -205,4 +212,4 @@ function main() {
fi
}

main
main
4 changes: 3 additions & 1 deletion .test-infra/jenkins/Flink.groovy
Expand Up @@ -17,7 +17,8 @@
*/

class Flink {
private static final String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz'
private static final String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz'
private static final String hadoopDownloadUrl = 'https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar'
private static final String FLINK_DIR = '"$WORKSPACE/src/.test-infra/dataproc"'
private static final String FLINK_SCRIPT = 'flink_cluster.sh'
private def job
Expand Down Expand Up @@ -53,6 +54,7 @@ class Flink {
env("CLUSTER_NAME", clusterName)
env("GCS_BUCKET", gcsBucket)
env("FLINK_DOWNLOAD_URL", flinkDownloadUrl)
env("HADOOP_DOWNLOAD_URL", hadoopDownloadUrl)
env("FLINK_NUM_WORKERS", workerCount)
env("FLINK_TASKMANAGER_SLOTS", slotsPerTaskmanager)
env("DETACHED_MODE", 'true')
Expand Down
Expand Up @@ -105,7 +105,7 @@ def batchLoadTestJob = { scope, triggeringContext ->
List<Map> testScenarios = scenarios(datasetName, pythonHarnessImageTag)

publisher.publish(':sdks:python:container:py2:docker', 'python2.7_sdk')
publisher.publish(':runners:flink:1.7:job-server-container:docker', 'flink-job-server')
publisher.publish(':runners:flink:1.9:job-server-container:docker', 'flink-job-server')
def flink = new Flink(scope, 'beam_LoadTests_Python_Combine_Flink_Batch')
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('flink-job-server'))

Expand Down
2 changes: 1 addition & 1 deletion .test-infra/jenkins/job_LoadTests_GBK_Flink_Python.groovy
Expand Up @@ -172,7 +172,7 @@ def loadTest = { scope, triggeringContext ->
List<Map> testScenarios = scenarios(datasetName, pythonHarnessImageTag)

publisher.publish(':sdks:python:container:py2:docker', 'python2.7_sdk')
publisher.publish(':runners:flink:1.7:job-server-container:docker', 'flink-job-server')
publisher.publish(':runners:flink:1.9:job-server-container:docker', 'flink-job-server')
def flink = new Flink(scope, 'beam_LoadTests_Python_GBK_Flink_Batch')
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('flink-job-server'))

Expand Down
Expand Up @@ -129,7 +129,7 @@ def loadTest = { scope, triggeringContext ->
List<Map> testScenarios = scenarios(datasetName, pythonHarnessImageTag)

publisher.publish(':sdks:python:container:py2:docker', 'python2.7_sdk')
publisher.publish(':runners:flink:1.7:job-server-container:docker', 'flink-job-server')
publisher.publish(':runners:flink:1.9:job-server-container:docker', 'flink-job-server')
Flink flink = new Flink(scope, 'beam_LoadTests_Python_ParDo_Flink_Batch')
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('flink-job-server'))

Expand Down
Expand Up @@ -157,7 +157,7 @@ def loadTest = { scope, triggeringContext ->
List<Map> testScenarios = scenarios(datasetName, pythonHarnessImageTag)

publisher.publish(':sdks:python:container:py2:docker', 'python2.7_sdk')
publisher.publish('runners:flink:1.7:job-server-container:docker', 'flink-job-server')
publisher.publish(':runners:flink:1.9:job-server-container:docker', 'flink-job-server')
def flink = new Flink(scope, 'beam_LoadTests_Python_CoGBK_Flink_Batch')
flink.setUp([pythonHarnessImageTag], numberOfWorkers, publisher.getFullImageName('flink-job-server'))

Expand Down
2 changes: 1 addition & 1 deletion .test-infra/jenkins/job_PostCommit_Python37.groovy
Expand Up @@ -27,7 +27,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python37', 'Run Python 3.7 P
previousNames('/beam_PostCommit_Python3_Verify/')

// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate)
commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 120)

publishers {
archiveJunit('**/nosetests*.xml')
Expand Down
Expand Up @@ -300,7 +300,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Automatically use the official release version if we are performing a release
// otherwise append '-SNAPSHOT'
project.version = '2.17.0'
project.version = '2.18.0'
if (!isRelease(project)) {
project.version += '-SNAPSHOT'
}
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Expand Up @@ -23,5 +23,5 @@ offlineRepositoryRoot=offline-repository
signing.gnupg.executable=gpg
signing.gnupg.useLegacyGpg=true

version=2.17.0-SNAPSHOT
python_sdk_version=2.17.0.dev
version=2.18.0-SNAPSHOT
python_sdk_version=2.18.0.dev
20 changes: 10 additions & 10 deletions learning/katas/java/build.gradle
Expand Up @@ -18,14 +18,14 @@

buildscript {
ext {
beamVersion = '2.13.0'
guavaVersion = '27.1-jre'
jodaTimeVersion = '2.10.3'
slf4jVersion = '1.7.26'
log4jSlf4jImpl = '2.11.2'

assertjVersion = '3.12.2'
hamcrestVersion = '1.3'
beamVersion = '2.16.0'
guavaVersion = '28.1-jre'
jodaTimeVersion = '2.10.4'
slf4jVersion = '1.7.28'
log4jSlf4jImpl = '2.12.1'

assertjVersion = '3.13.2'
hamcrestVersion = '2.1'
junitVersion = '4.12'
}

Expand Down Expand Up @@ -113,6 +113,6 @@ configure(subprojects.findAll { it.name != 'util' }) {
}
}

task wrapper(type: Wrapper) {
gradleVersion = '4.8'
wrapper {
gradleVersion = '5.0'
}
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;

import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.TimerInternals;
Expand Down Expand Up @@ -80,6 +81,12 @@ public TimerUpdate getTimerUpdate() {
return timerUpdateBuilder.build();
}

public boolean containsUpdateForTimeBefore(Instant time) {
TimerUpdate update = timerUpdateBuilder.build();
return hasTimeBefore(update.getSetTimers(), time)
|| hasTimeBefore(update.getDeletedTimers(), time);
}

@Override
public Instant currentProcessingTime() {
return processingTimeClock.now();
Expand All @@ -101,4 +108,9 @@ public Instant currentInputWatermarkTime() {
public Instant currentOutputWatermarkTime() {
return watermarks.getOutputWatermark();
}

private boolean hasTimeBefore(Iterable<? extends TimerData> timers, Instant time) {
return StreamSupport.stream(timers.spliterator(), false)
.anyMatch(td -> td.getTimestamp().isBefore(time));
}
}
Expand Up @@ -140,6 +140,7 @@ public void initialize(
* null} if the transform that produced the result is a root transform
* @param completedTimers the timers that were delivered to produce the {@code completedBundle},
* or an empty iterable if no timers were delivered
* @param pushedBackTimers timers that have been pushed back during processing
* @param result the result of evaluating the input bundle
* @return the committed bundles contained within the handled {@code result}
*/
Expand Down Expand Up @@ -226,7 +227,11 @@ private void fireAllAvailableCallbacks() {
private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransform) {
TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform);
Instant outputWatermark = watermarks.getOutputWatermark();
callbackExecutor.fireForWatermark(producingTransform, outputWatermark);
try {
callbackExecutor.fireForWatermark(producingTransform, outputWatermark);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}

/** Create a {@link UncommittedBundle} for use by a source. */
Expand Down Expand Up @@ -369,7 +374,7 @@ void forceRefresh() {
* <p>This is a destructive operation. Timers will only appear in the result of this method once
* for each time they are set.
*/
public Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> extractFiredTimers() {
Collection<FiredTimers<AppliedPTransform<?, ?, ?>>> extractFiredTimers() {
forceRefresh();
return watermarkManager.extractFiredTimers();
}
Expand Down
Expand Up @@ -249,6 +249,7 @@ private enum ExecutorState {
* Exception)}.
*/
private class TimerIterableCompletionCallback implements CompletionCallback {

private final Iterable<TimerData> timers;

TimerIterableCompletionCallback(Iterable<TimerData> timers) {
Expand All @@ -258,8 +259,9 @@ private class TimerIterableCompletionCallback implements CompletionCallback {
@Override
public final CommittedResult handleResult(
CommittedBundle<?> inputBundle, TransformResult<?> result) {
CommittedResult<AppliedPTransform<?, ?, ?>> committedResult =
evaluationContext.handleResult(inputBundle, timers, result);

final CommittedResult<AppliedPTransform<?, ?, ?>> committedResult;
committedResult = evaluationContext.handleResult(inputBundle, timers, result);
for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
pendingWork.offer(
WorkUpdate.fromBundle(
Expand Down
Expand Up @@ -20,10 +20,14 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.StateNamespace;
Expand All @@ -34,6 +38,7 @@
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.runners.local.StructuralKey;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -56,6 +61,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Instant;

/** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */
final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements TransformEvaluatorFactory {
Expand Down Expand Up @@ -232,10 +238,13 @@ private static class StatefulParDoEvaluator<K, InputT>
implements TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> {

private final DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator;
private final List<TimerData> pushedBackTimers = new ArrayList<>();
private final DirectTimerInternals timerInternals;

public StatefulParDoEvaluator(
DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator) {
this.delegateEvaluator = delegateEvaluator;
this.timerInternals = delegateEvaluator.getParDoEvaluator().getStepContext().timerInternals();
}

@Override
Expand All @@ -245,7 +254,12 @@ public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkRes
delegateEvaluator.processElement(windowedValue);
}

for (TimerData timer : gbkResult.getValue().timersIterable()) {
Instant currentInputWatermark = timerInternals.currentInputWatermarkTime();
PriorityQueue<TimerData> toBeFiredTimers =
new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
gbkResult.getValue().timersIterable().forEach(toBeFiredTimers::add);
while (!toBeFiredTimers.isEmpty()) {
TimerData timer = toBeFiredTimers.poll();
checkState(
timer.getNamespace() instanceof WindowNamespace,
"Expected Timer %s to be in a %s, but got %s",
Expand All @@ -255,17 +269,23 @@ public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkRes
WindowNamespace<?> windowNamespace = (WindowNamespace) timer.getNamespace();
BoundedWindow timerWindow = windowNamespace.getWindow();
delegateEvaluator.onTimer(timer, timerWindow);
if (timerInternals.containsUpdateForTimeBefore(currentInputWatermark)) {
break;
}
}
pushedBackTimers.addAll(toBeFiredTimers);
}

@Override
public TransformResult<KeyedWorkItem<K, KV<K, InputT>>> finishBundle() throws Exception {
TransformResult<KV<K, InputT>> delegateResult = delegateEvaluator.finishBundle();

TimerUpdate timerUpdate =
delegateResult.getTimerUpdate().withPushedBackTimers(pushedBackTimers);
pushedBackTimers.clear();
StepTransformResult.Builder<KeyedWorkItem<K, KV<K, InputT>>> regroupedResult =
StepTransformResult.<KeyedWorkItem<K, KV<K, InputT>>>withHold(
delegateResult.getTransform(), delegateResult.getWatermarkHold())
.withTimerUpdate(delegateResult.getTimerUpdate())
.withTimerUpdate(timerUpdate)
.withState(delegateResult.getState())
.withMetricUpdates(delegateResult.getLogicalMetricUpdates())
.addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));
Expand Down

0 comments on commit 330626d

Please sign in to comment.