Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6431] Move state sampler files to runners-core so they can be used in the Java SDK as well #7634

Merged
merged 1 commit into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.util.common.worker;
package org.apache.beam.runners.core.metrics;

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -28,6 +28,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.DateTimeUtils.MillisProvider;
Expand Down Expand Up @@ -66,7 +67,7 @@ public static ExecutionStateSampler newForTest(MillisProvider clock) {

private static final long PERIOD_MS = 200;

private Future<Void> executionSamplerFuture = null;
@Nullable private Future<Void> executionSamplerFuture = null;

/**
* Called to start the ExecutionStateSampler. Until the returned {@link Closeable} is closed, the
Expand Down Expand Up @@ -141,7 +142,7 @@ void addTracker(ExecutionStateTracker tracker) {
}

/**
* Deregister tracker after MapTask completes
* Deregister tracker after MapTask completes.
*
* <p>This method needs to be synchronized to prevent race condition with sampling thread
*/
Expand All @@ -161,7 +162,7 @@ synchronized void removeTracker(ExecutionStateTracker tracker) {
}

/**
* Attributing sampling time to trackers
* Attributing sampling time to trackers.
*
* <p>This method needs to be synchronized to prevent race condition with removing tracker
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.util.common.worker;
package org.apache.beam.runners.core.metrics;

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Closeable;
import java.util.Map;
import java.util.Objects;
Expand All @@ -29,6 +30,7 @@
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;

/** Tracks the current state of a single execution thread. */
@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "Intentional for performance.")
public class ExecutionStateTracker implements Comparable<ExecutionStateTracker> {

/**
Expand Down Expand Up @@ -102,15 +104,15 @@ public String getDescription() {
private final ExecutionStateSampler sampler;

/** The thread being managed by this {@link ExecutionStateTracker}. */
private Thread trackedThread = null;
@Nullable private Thread trackedThread = null;

/**
* The current state of the thread managed by this {@link ExecutionStateTracker}.
*
* <p>This variable is written by the Execution thread, and read by the sampling and progress
* reporting threads, thus it being marked volatile.
*/
private volatile ExecutionState currentState;
@Nullable private volatile ExecutionState currentState;

/**
* The current number of times that this {@link ExecutionStateTracker} has transitioned state.
Expand Down Expand Up @@ -140,9 +142,19 @@ public static ExecutionStateTracker newForTest() {
return new ExecutionStateTracker(ExecutionStateSampler.newForTest());
}

@Override
public boolean equals(Object o) {
return this == o;
}

@Override
public int hashCode() {
return System.identityHashCode(this);
}

@Override
public int compareTo(ExecutionStateTracker o) {
if (this == o) {
if (this.equals(o)) {
return 0;
} else {
return System.identityHashCode(this) - System.identityHashCode(o);
Expand Down Expand Up @@ -215,6 +227,9 @@ public ExecutionState getCurrentState() {
* from the execution thread.
*/
@SuppressWarnings("NonAtomicVolatileUpdate")
@SuppressFBWarnings(
value = "VO_VOLATILE_INCREMENT",
justification = "Intentional for performance.")
public Closeable enterState(ExecutionState newState) {
// WARNING: This method is called in the hottest path, and must be kept as efficient as
// possible. Avoid blocking, synchronizing, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.util.common.worker;
package org.apache.beam.runners.core.metrics;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;

import java.io.Closeable;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.joda.time.DateTimeUtils.MillisProvider;
import org.junit.Before;
import org.junit.Test;

/** Tests for {@link ExecutionStateSampler}. */
/** Tests for {@link org.apache.beam.runners.core.metrics.ExecutionStateSampler}. */
public class ExecutionStateSamplerTest {

private MillisProvider clock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.SdkHarnessRegistry.SdkWorkerHarness;
Expand All @@ -47,7 +48,6 @@
import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.util.Weighted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.io.IOException;
import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ByteArrayShufflePosition;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleBatchReader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShufflePosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.google.auto.service.AutoService;
import java.io.Closeable;
import java.util.ServiceLoader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.sdk.annotations.Experimental;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.runners.dataflow.worker;

import java.util.List;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker;

import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import java.util.logging.LogRecord;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.SimpleDoFnRunner;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.ExperimentContext.Experiment;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ByteArrayShufflePosition;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.util.common.worker.GroupingShuffleEntryIterator;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ShuffleEntry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.google.api.services.dataflow.model.CounterUpdate;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.google.auto.service.AutoService;
import java.io.Closeable;
import java.io.IOException;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.internal.CustomSources;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
Expand Down Expand Up @@ -103,8 +105,6 @@
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StepContext;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.dataflow.util.TimeUtil;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.google.api.services.dataflow.model.SourceSplitRequest;
import java.io.Closeable;
import java.util.Collections;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker.ExecutionState;
import org.apache.beam.sdk.options.PipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Loading