Skip to content

Commit

Permalink
integrate some new code components, add new standalone class for dire…
Browse files Browse the repository at this point in the history
…ct path work processing (instead of adding more new logic to streamingDataflowWorker
  • Loading branch information
m-trieu committed Apr 2, 2024
1 parent 10c73a0 commit 3c88dfa
Show file tree
Hide file tree
Showing 55 changed files with 3,016 additions and 691 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class DataflowWorkUnitClient implements WorkUnitClient {
public class DataflowWorkUnitClient implements WorkUnitClient {
private final Logger logger;

/**
Expand All @@ -86,7 +86,7 @@ class DataflowWorkUnitClient implements WorkUnitClient {
*
* @param options The pipeline options.
*/
DataflowWorkUnitClient(DataflowWorkerHarnessOptions options, Logger logger) {
public DataflowWorkUnitClient(DataflowWorkerHarnessOptions options, Logger logger) {
this.dataflow = options.getDataflowClient();
this.options = options;
this.logger = logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class HotKeyLogger {
/** Throttles logging the detection to every loggingPeriod */
private final Duration loggingPeriod = Duration.standardMinutes(5);

HotKeyLogger() {}
public HotKeyLogger() {}

HotKeyLogger(Clock clock) {
this.clock = clock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Wrapper around a {@link WindmillServerStub} that tracks metrics for the number of in-flight
Expand All @@ -51,7 +49,6 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class MetricTrackingWindmillServerStub {
private static final Logger LOG = LoggerFactory.getLogger(MetricTrackingWindmillServerStub.class);

private static final int MAX_READS_PER_BATCH = 60;
private static final int MAX_ACTIVE_READS = 10;
Expand Down Expand Up @@ -80,15 +77,15 @@ public abstract static class Builder {

abstract Builder setServer(WindmillServerStub server);

abstract Builder setGcThrashingMonitor(MemoryMonitor gcThrashingMonitor);
public abstract Builder setGcThrashingMonitor(MemoryMonitor gcThrashingMonitor);

abstract Builder setUseStreamingRequests(boolean useStreamingRequests);
public abstract Builder setUseStreamingRequests(boolean useStreamingRequests);

abstract Builder setUseSeparateHeartbeatStreams(boolean useSeparateHeartbeatStreams);
public abstract Builder setUseSeparateHeartbeatStreams(boolean useSeparateHeartbeatStreams);

abstract Builder setNumGetDataStreams(int numGetDataStreams);
public abstract Builder setNumGetDataStreams(int numGetDataStreams);

abstract MetricTrackingWindmillServerStub build();
public abstract MetricTrackingWindmillServerStub build();
}

public static Builder builder(WindmillServerStub server, MemoryMonitor gcThrashingMonitor) {
Expand Down Expand Up @@ -336,33 +333,6 @@ public void refreshActiveWork(Map<String, List<HeartbeatRequest>> heartbeats) {
}
}

public void refreshActiveWorkDirectPath(
Map<GetDataStream, Map<String, List<HeartbeatRequest>>> heartbeats) {
if (heartbeats.isEmpty()) {
return;
}
try {
for (Map.Entry<GetDataStream, Map<String, List<HeartbeatRequest>>> heartbeat :
heartbeats.entrySet()) {
GetDataStream stream = heartbeat.getKey();
Map<String, List<HeartbeatRequest>> heartbeatRequests = heartbeat.getValue();
if (stream.isClosed()) {
LOG.warn(
"Trying to refresh work on stream={} after work has moved off of worker."
+ " computations={}",
stream,
heartbeatRequests);
} else {
activeHeartbeats.set(heartbeat.getValue().size());
stream.refreshActiveWork(heartbeatRequests);
activeHeartbeats.set(0);
}
}
} finally {
activeHeartbeats.set(0);
}
}

public void printHtml(PrintWriter writer) {
writer.println("Active Fetches:");
writer.println(" Side Inputs: " + activeSideInputs.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class ReaderCache {
public class ReaderCache {

private static final Logger LOG = LoggerFactory.getLogger(ReaderCache.class);
private final Executor invalidationExecutor;
Expand Down Expand Up @@ -67,7 +67,7 @@ private static class CacheEntry {
private final Cache<WindmillComputationKey, CacheEntry> cache;

/** Cache reader for {@code cacheDuration}. Readers will be closed on {@code executor}. */
ReaderCache(Duration cacheDuration, Executor invalidationExecutor) {
public ReaderCache(Duration cacheDuration, Executor invalidationExecutor) {
this.invalidationExecutor = invalidationExecutor;
this.cache =
CacheBuilder.newBuilder()
Expand Down Expand Up @@ -137,7 +137,7 @@ void cacheReader(
}

/** If a reader is cached for this key, remove and close it. */
void invalidateReader(WindmillComputationKey computationKey) {
public void invalidateReader(WindmillComputationKey computationKey) {
// use an invalid cache token that will trigger close.
acquireReader(computationKey, -1L, -1);
}
Expand Down

0 comments on commit 3c88dfa

Please sign in to comment.