Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public static DirectGraph create(
ListMultimap<PInput, AppliedPTransform<?, ?, ?>> perElementConsumers,
Set<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
return new DirectGraph(
producers, viewWriters, perElementConsumers, rootTransforms, stepNames);
return new DirectGraph(producers, viewWriters, perElementConsumers, rootTransforms, stepNames);
}

private DirectGraph(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct.portable;

import org.apache.beam.runners.direct.portable.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.local.StructuralKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;

/**
* A factory that creates {@link UncommittedBundle UncommittedBundles}.
*/
interface BundleFactory {
/**
* Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle do not
* belong to a {@link PCollection}.
*
* <p>For use in creating inputs to root transforms.
*/
<T> UncommittedBundle<T> createRootBundle();

/**
* Create an {@link UncommittedBundle} from the specified input. Elements added to the bundle
* belong to the {@code output} {@link PCollection}.
*/
<T> UncommittedBundle<T> createBundle(PCollection<T> output);

/**
* Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by
* {@link DirectGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
* belong to the {@code output} {@link PCollection}.
*/
<K, T> UncommittedBundle<T> createKeyedBundle(
StructuralKey<K> key, PCollection<T> output);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.direct.portable;

import org.apache.beam.runners.local.Bundle;

/**
* An executor that is capable of processing some bundle of input over some executable stage or
* step.
*/
interface BundleProcessor<
CollectionT, BundleT extends Bundle<?, ? extends CollectionT>, ExecutableT> {
/**
* Execute the provided bundle using the provided Executable, calling back to the {@link
* CompletionCallback} when execution completes.
*/
void process(BundleT bundle, ExecutableT consumer, CompletionCallback onComplete);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct.portable;

import org.joda.time.Instant;

/**
* Access to the current time.
*/
interface Clock {
/**
* Returns the current time as an {@link Instant}.
*/
Instant now();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.direct.portable;

import javax.annotation.Nullable;
import org.apache.beam.runners.local.Bundle;
import org.apache.beam.runners.local.StructuralKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;

/**
* Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
* eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
* that consume the {@link PCollection} this bundle is
* a part of at a later point.
* @param <T> the type of elements contained within this bundle
*/
interface CommittedBundle<T> extends Bundle<T, PCollection<T>> {
/**
* Returns the PCollection that the elements of this bundle belong to.
*/
@Nullable
PCollection<T> getPCollection();

/**
* Returns the key that was output in the most recent {@code GroupByKey} in the
* execution of this bundle.
*/
StructuralKey<?> getKey();

/**
* Returns an {@link Iterable} containing all of the elements that have been added to this
* {@link CommittedBundle}.
*/
Iterable<WindowedValue<T>> getElements();

/**
* Return the minimum timestamp among elements in this bundle.
*
* <p>This should be equivalent to iterating over all of the elements within a bundle and
* selecting the minimum timestamp from among them.
*/
Instant getMinimumTimestamp();

/**
* Returns the processing time output watermark at the time the producing {@code Executable}
* committed this bundle. Downstream synchronized processing time watermarks cannot progress
* past this point before consuming this bundle.
*
* <p>This value is no greater than the earliest incomplete processing time or synchronized
* processing time {@link TimerData timer} at the time this bundle was committed, including any
* timers that fired to produce this bundle.
*/
Instant getSynchronizedProcessingOutputWatermark();
/**
* Return a new {@link CommittedBundle} that is like this one, except calls to
* {@link #getElements()} will return the provided elements. This bundle is unchanged.
*
* <p>The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized
* processing output watermark} of the returned {@link CommittedBundle} is equal to the value
* returned from the current bundle. This is used to ensure a {@link PTransform} that could not
* complete processing on input elements properly holds the synchronized processing time to the
* appropriate value.
*/
CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.direct.portable;

import com.google.auto.value.AutoValue;
import com.google.common.base.Optional;
import java.util.Set;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;

/**
* A {@link TransformResult} that has been committed.
*/
@AutoValue
abstract class CommittedResult<ExecutableT> {
/**
* Returns the {@link AppliedPTransform} that produced this result.
*/
public abstract ExecutableT getExecutable();

/**
* Returns the {@link CommittedBundle} that contains the input elements that could not be
* processed by the evaluation. The returned optional is present if there were any unprocessed
* input elements, and absent otherwise.
*/
public abstract Optional<? extends CommittedBundle<?>> getUnprocessedInputs();

/**
* Returns the outputs produced by the transform.
*/
public abstract Iterable<? extends CommittedBundle<?>> getOutputs();

/**
* Returns if the transform that produced this result produced outputs.
*
* <p>Transforms that produce output via modifying the state of the runner (e.g.
* {@link CreatePCollectionView}) should explicitly set this to true. If {@link #getOutputs()}
* returns a nonempty iterable, this will also return true.
*/
public abstract Set<OutputType> getProducedOutputTypes();

public static CommittedResult<AppliedPTransform<?, ?, ?>> create(
TransformResult<?> original,
Optional<? extends CommittedBundle<?>> unprocessedElements,
Iterable<? extends CommittedBundle<?>> outputs,
Set<OutputType> producedOutputs) {
return new AutoValue_CommittedResult<>(
original.getTransform(), unprocessedElements, outputs, producedOutputs);
}

enum OutputType {
PCOLLECTION_VIEW,
BUNDLE
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct.portable;

import org.apache.beam.sdk.runners.AppliedPTransform;

/**
* A callback for completing a bundle of input.
*/
interface CompletionCallback {
/**
* Handle a successful result, returning the committed outputs of the result.
*/
CommittedResult handleResult(
CommittedBundle<?> inputBundle, TransformResult<?> result);

/**
* Handle an input bundle that did not require processing.
*
* <p>This occurs when a Source has no splits that can currently produce outputs.
*/
void handleEmpty(AppliedPTransform<?, ?, ?> transform);

/**
* Handle a result that terminated abnormally due to the provided {@link Exception}.
*/
void handleException(CommittedBundle<?> inputBundle, Exception t);

/**
* Handle a result that terminated abnormally due to the provided {@link Error}. The pipeline
* should be shut down, and the Error propagated.
*/
void handleError(Error err);
}
Loading