Skip to content

Commit

Permalink
Cleanup usages of stopwatch (#16478)
Browse files Browse the repository at this point in the history
Changes:
- Remove synchronized methods from `Stopwatch`
- Access stopwatch methods in `ChangeRequestHttpSyncer` inside a lock
  • Loading branch information
kfaraz committed May 27, 2024
1 parent 4e1de50 commit 9d77ef0
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@
import java.util.concurrent.TimeUnit;

/**
* Thread-safe wrapper over {@link com.google.common.base.Stopwatch}.
* <p>
* Thread safety has been limited to the start/stop methods for now as they are
* the only ones that can throw an exception in an illegal state and are thus
* vulnerable to race conditions.
* Wrapper over {@link com.google.common.base.Stopwatch} to provide some utility
* methods such as {@link #millisElapsed()}, {@link #restart()}, {@link #hasElapsed(Duration)}.
*/
public class Stopwatch
{
Expand All @@ -55,30 +52,30 @@ private Stopwatch(com.google.common.base.Stopwatch delegate)
this.delegate = delegate;
}

public synchronized void start()
public void start()
{
delegate.start();
}

public synchronized void stop()
public void stop()
{
delegate.stop();
}

public synchronized void reset()
public void reset()
{
delegate.reset();
}

/**
* Invokes {@code reset().start()} on the underlying {@link com.google.common.base.Stopwatch}.
*/
public synchronized void restart()
public void restart()
{
delegate.reset().start();
}

public synchronized boolean isRunning()
public boolean isRunning()
{
return delegate.isRunning();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public class DruidLeaderClient

private final String leaderRequestPath;

private LifecycleLock lifecycleLock = new LifecycleLock();
private final LifecycleLock lifecycleLock = new LifecycleLock();
private DruidNodeDiscovery druidNodeDiscovery;
private AtomicReference<String> currentKnownLeader = new AtomicReference<>();
private final AtomicReference<String> currentKnownLeader = new AtomicReference<>();

public DruidLeaderClient(
HttpClient httpClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,9 @@
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -1036,42 +1033,26 @@ private void doPollSegments()
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Starting polling of segment table.");

// some databases such as PostgreSQL require auto-commit turned off
// Some databases such as PostgreSQL require auto-commit turned off
// to stream results back, enabling transactions disables auto-commit
//
// setting connection to read-only will allow some database such as MySQL
// to automatically use read-only transaction mode, further optimizing the query
final List<DataSegment> segments = connector.inReadOnlyTransaction(
new TransactionCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
{
return handle
.createQuery(StringUtils.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.map(
new ResultSetMapper<DataSegment>()
{
@Override
public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
return replaceWithExistingSegmentIfPresent(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
// If one entry in database is corrupted doPoll() should continue to work overall. See
// filter by `Objects::nonNull` below in this method.
return null;
}
}
}
)
.list();
}
}
(handle, status) -> handle
.createQuery(StringUtils.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.map((index, r, ctx) -> {
try {
DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
return replaceWithExistingSegmentIfPresent(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
// If one entry in database is corrupted doPoll() should continue to work overall. See
// filter by `Objects::nonNull` below in this method.
return null;
}
}).list()
);

Preconditions.checkNotNull(
Expand All @@ -1082,11 +1063,13 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE
if (segments.isEmpty()) {
log.info("No segments found in the database!");
} else {
log.info("Polled and found [%,d] segments in the database in [%,d] ms.", segments.size(), stopwatch.millisElapsed());
log.info(
"Polled and found [%,d] segments in the database in [%,d] ms.",
segments.size(), stopwatch.millisElapsed()
);
}
stopwatch.restart();

createDatasourcesSnapshot(stopwatch, segments);
createDatasourcesSnapshot(segments);
}

private void doPollSegmentAndSchema()
Expand Down Expand Up @@ -1157,25 +1140,18 @@ public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
(handle, status) -> {
handle.createQuery(schemaPollQuery)
.setFetchSize(connector.getStreamingFetchSize())
.map(
new ResultSetMapper<Void>()
{
@Override
public Void map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
schemaMapBuilder.put(
r.getString("fingerprint"),
jsonMapper.readValue(r.getBytes("payload"), SchemaPayload.class)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read schema from db.").emit();
}
return null;
}
})
.list();
.map((index, r, ctx) -> {
try {
schemaMapBuilder.put(
r.getString("fingerprint"),
jsonMapper.readValue(r.getBytes("payload"), SchemaPayload.class)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read schema from db.").emit();
}
return null;
}).list();

segmentSchemaCache.resetTemporaryPublishedMetadataQueryResultOnDBPoll();
return null;
Expand All @@ -1195,19 +1171,17 @@ public Void map(int index, ResultSet r, StatementContext ctx) throws SQLExceptio
log.info("No segments found in the database!");
} else {
log.info(
"Polled and found total [%,d] segments and [%,d] schema in the database in [%,d] ms.",
segments.size(),
schemaMap.size(),
stopwatch.millisElapsed()
"Polled and found [%,d] segments and [%,d] schemas in the database in [%,d] ms.",
segments.size(), schemaMap.size(), stopwatch.millisElapsed()
);
}
stopwatch.restart();

createDatasourcesSnapshot(stopwatch, segments);
createDatasourcesSnapshot(segments);
}

private void createDatasourcesSnapshot(Stopwatch stopwatch, List<DataSegment> segments)
private void createDatasourcesSnapshot(List<DataSegment> segments)
{
final Stopwatch stopwatch = Stopwatch.createStarted();
// dataSourcesSnapshot is updated only here and the DataSourcesSnapshot object is immutable. If data sources or
// segments are marked as used or unused directly (via markAs...() methods in SegmentsMetadataManager), the
// dataSourcesSnapshot can become invalid until the next database poll.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SchemaPayloadPlus;
Expand Down Expand Up @@ -159,13 +160,11 @@ public void processBatchesDue()
return;
}

Stopwatch stopwatch = Stopwatch.createStarted();
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Backfilling segment schema. Queue size is [%s].", queue.size());

log.info("Backfilling segment schema. Queue size is [%s]", queue.size());

int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());

Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
final int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());
final Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
for (int i = 0; i < itemsToProcess; i++) {
SegmentSchemaMetadataPlus item = queue.poll();
if (item != null) {
Expand All @@ -175,21 +174,29 @@ public void processBatchesDue()

for (Map.Entry<String, List<SegmentSchemaMetadataPlus>> entry : polled.entrySet()) {
try {
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(), entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(
entry.getKey(),
entry.getValue(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
);

// Mark the segments as published in the cache.
for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId());
}
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", entry.getKey())
.setMetric("metadatacache/backfill/count", entry.getValue().size())
.setDimension(DruidMetrics.DATASOURCE, entry.getKey())
.setMetric("metadatacache/backfill/count", entry.getValue().size())
);
}
catch (Exception e) {
log.error(e, "Exception persisting schema and updating segments table for datasource [%s].", entry.getKey());
log.error(e, "Exception persisting schema and updating segments table for datasource[%s].", entry.getKey());
}
}
emitter.emit(ServiceMetricEvent.builder().setMetric("metadatacache/backfill/time", stopwatch.millisElapsed()));
emitter.emit(
ServiceMetricEvent.builder()
.setMetric("metadatacache/backfill/time", stopwatch.millisElapsed())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,13 @@ public class ChangeRequestHttpSyncer<T>
private final CountDownLatch initializationLatch = new CountDownLatch(1);

/**
* This lock is used to ensure proper start-then-stop semantics and making sure after stopping no state update happens
* and {@link #sync} is not again scheduled in {@link #executor} and if there was a previously scheduled sync before
* stopping, it is skipped and also, it is used to ensure that duplicate syncs are never scheduled in the executor.
* Lock to implement proper start-then-stop semantics. Used to ensure that:
* <ul>
* <li>No state update happens after {@link #stop()}.</li>
* <li>No sync is scheduled after {@link #stop()}.</li>
* <li>Any pending sync is skipped when {@link #stop()} has been called.</li>
* <li>Duplicate syncs are not scheduled on the executor.</li>
* </ul>
*/
private final LifecycleLock startStopLock = new LifecycleLock();

Expand Down Expand Up @@ -141,7 +145,7 @@ public void start()
startStopLock.exitStart();
}

sinceSyncerStart.restart();
safeRestart(sinceSyncerStart);
addNextSyncToWorkQueue();
}
}
Expand Down Expand Up @@ -220,21 +224,18 @@ public long getUnstableTimeMillis()
*/
public boolean isSyncedSuccessfully()
{
if (consecutiveFailedAttemptCount > 0) {
return false;
} else {
return sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
}
return consecutiveFailedAttemptCount <= 0
&& sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
}

private void sync()
private void sendSyncRequest()
{
if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
log.info("Skipping sync for server[%s] as syncer has not started yet.", logIdentity);
return;
}

sinceLastSyncRequest.restart();
safeRestart(sinceLastSyncRequest);

try {
final String req = getRequestString();
Expand Down Expand Up @@ -270,7 +271,7 @@ public void onSuccess(InputStream stream)
final int responseCode = responseHandler.getStatus();
if (responseCode == HttpServletResponse.SC_NO_CONTENT) {
log.debug("Received NO CONTENT from server[%s]", logIdentity);
sinceLastSyncSuccess.restart();
safeRestart(sinceLastSyncSuccess);
return;
} else if (responseCode != HttpServletResponse.SC_OK) {
handleFailure(new ISE("Received sync response [%d]", responseCode));
Expand Down Expand Up @@ -306,7 +307,7 @@ public void onSuccess(InputStream stream)
log.info("Server[%s] synced successfully.", logIdentity);
}

sinceLastSyncSuccess.restart();
safeRestart(sinceLastSyncSuccess);
}
catch (Exception ex) {
markServerUnstableAndAlert(ex, "Processing Response");
Expand Down Expand Up @@ -390,9 +391,9 @@ private void addNextSyncToWorkQueue()
RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount)
);
log.info("Scheduling next sync for server[%s] in [%d] millis.", logIdentity, delayMillis);
executor.schedule(this::sync, delayMillis, TimeUnit.MILLISECONDS);
executor.schedule(this::sendSyncRequest, delayMillis, TimeUnit.MILLISECONDS);
} else {
executor.execute(this::sync);
executor.execute(this::sendSyncRequest);
}
}
catch (Throwable th) {
Expand All @@ -410,10 +411,17 @@ private void addNextSyncToWorkQueue()
}
}

private void safeRestart(Stopwatch stopwatch)
{
synchronized (startStopLock) {
stopwatch.restart();
}
}

private void markServerUnstableAndAlert(Throwable throwable, String action)
{
if (consecutiveFailedAttemptCount++ == 0) {
sinceUnstable.restart();
safeRestart(sinceUnstable);
}

final long unstableSeconds = getUnstableTimeMillis() / 1000;
Expand Down

0 comments on commit 9d77ef0

Please sign in to comment.