Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup usages of stopwatch #16478

Merged
merged 5 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@
import java.util.concurrent.TimeUnit;

/**
* Thread-safe wrapper over {@link com.google.common.base.Stopwatch}.
* <p>
* Thread safety has been limited to the start/stop methods for now as they are
* the only ones that can throw an exception in an illegal state and are thus
* vulnerable to race conditions.
* Wrapper over {@link com.google.common.base.Stopwatch} to provide some utility
* methods such as {@link #millisElapsed()}, {@link #restart()}, {@link #hasElapsed(Duration)}.
*/
public class Stopwatch
{
Expand All @@ -55,30 +52,30 @@ private Stopwatch(com.google.common.base.Stopwatch delegate)
this.delegate = delegate;
}

public synchronized void start()
public void start()
{
delegate.start();
}

public synchronized void stop()
public void stop()
{
delegate.stop();
}

public synchronized void reset()
public void reset()
{
delegate.reset();
}

/**
* Invokes {@code reset().start()} on the underlying {@link com.google.common.base.Stopwatch}.
*/
public synchronized void restart()
public void restart()
{
delegate.reset().start();
}

public synchronized boolean isRunning()
public boolean isRunning()
{
return delegate.isRunning();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public class DruidLeaderClient

private final String leaderRequestPath;

private LifecycleLock lifecycleLock = new LifecycleLock();
private final LifecycleLock lifecycleLock = new LifecycleLock();
private DruidNodeDiscovery druidNodeDiscovery;
private AtomicReference<String> currentKnownLeader = new AtomicReference<>();
private final AtomicReference<String> currentKnownLeader = new AtomicReference<>();

public DruidLeaderClient(
HttpClient httpClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,9 @@
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -1036,42 +1033,26 @@ private void doPollSegments()
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Starting polling of segment table.");

// some databases such as PostgreSQL require auto-commit turned off
// Some databases such as PostgreSQL require auto-commit turned off
// to stream results back, enabling transactions disables auto-commit
//
// setting connection to read-only will allow some database such as MySQL
// to automatically use read-only transaction mode, further optimizing the query
final List<DataSegment> segments = connector.inReadOnlyTransaction(
new TransactionCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
{
return handle
.createQuery(StringUtils.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.map(
new ResultSetMapper<DataSegment>()
{
@Override
public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
return replaceWithExistingSegmentIfPresent(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
// If one entry in database is corrupted doPoll() should continue to work overall. See
// filter by `Objects::nonNull` below in this method.
return null;
}
}
}
)
.list();
}
}
(handle, status) -> handle
.createQuery(StringUtils.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.map((index, r, ctx) -> {
try {
DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
return replaceWithExistingSegmentIfPresent(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
// If one entry in database is corrupted doPoll() should continue to work overall. See
// filter by `Objects::nonNull` below in this method.
return null;
}
}).list()
);

Preconditions.checkNotNull(
Expand All @@ -1082,11 +1063,13 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE
if (segments.isEmpty()) {
log.info("No segments found in the database!");
} else {
log.info("Polled and found [%,d] segments in the database in [%,d] ms.", segments.size(), stopwatch.millisElapsed());
log.info(
"Polled and found [%,d] segments in the database in [%,d] ms.",
segments.size(), stopwatch.millisElapsed()
);
}
stopwatch.restart();

createDatasourcesSnapshot(stopwatch, segments);
createDatasourcesSnapshot(segments);
}

private void doPollSegmentAndSchema()
Expand Down Expand Up @@ -1157,25 +1140,18 @@ public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
(handle, status) -> {
handle.createQuery(schemaPollQuery)
.setFetchSize(connector.getStreamingFetchSize())
.map(
new ResultSetMapper<Void>()
{
@Override
public Void map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
schemaMapBuilder.put(
r.getString("fingerprint"),
jsonMapper.readValue(r.getBytes("payload"), SchemaPayload.class)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read schema from db.").emit();
}
return null;
}
})
.list();
.map((index, r, ctx) -> {
try {
schemaMapBuilder.put(
r.getString("fingerprint"),
jsonMapper.readValue(r.getBytes("payload"), SchemaPayload.class)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read schema from db.").emit();
}
return null;
}).list();

segmentSchemaCache.resetTemporaryPublishedMetadataQueryResultOnDBPoll();
return null;
Expand All @@ -1195,19 +1171,17 @@ public Void map(int index, ResultSet r, StatementContext ctx) throws SQLExceptio
log.info("No segments found in the database!");
} else {
log.info(
"Polled and found total [%,d] segments and [%,d] schema in the database in [%,d] ms.",
segments.size(),
schemaMap.size(),
stopwatch.millisElapsed()
"Polled and found [%,d] segments and [%,d] schemas in the database in [%,d] ms.",
segments.size(), schemaMap.size(), stopwatch.millisElapsed()
);
}
stopwatch.restart();

createDatasourcesSnapshot(stopwatch, segments);
createDatasourcesSnapshot(segments);
}

private void createDatasourcesSnapshot(Stopwatch stopwatch, List<DataSegment> segments)
private void createDatasourcesSnapshot(List<DataSegment> segments)
{
final Stopwatch stopwatch = Stopwatch.createStarted();
// dataSourcesSnapshot is updated only here and the DataSourcesSnapshot object is immutable. If data sources or
// segments are marked as used or unused directly (via markAs...() methods in SegmentsMetadataManager), the
// dataSourcesSnapshot can become invalid until the next database poll.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SchemaPayloadPlus;
Expand Down Expand Up @@ -159,13 +160,11 @@ public void processBatchesDue()
return;
}

Stopwatch stopwatch = Stopwatch.createStarted();
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Backfilling segment schema. Queue size is [%s].", queue.size());

log.info("Backfilling segment schema. Queue size is [%s]", queue.size());

int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());

Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
final int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());
final Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
for (int i = 0; i < itemsToProcess; i++) {
SegmentSchemaMetadataPlus item = queue.poll();
if (item != null) {
Expand All @@ -175,21 +174,29 @@ public void processBatchesDue()

for (Map.Entry<String, List<SegmentSchemaMetadataPlus>> entry : polled.entrySet()) {
try {
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(), entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(
entry.getKey(),
entry.getValue(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
);

// Mark the segments as published in the cache.
for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId());
}
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", entry.getKey())
.setMetric("metadatacache/backfill/count", entry.getValue().size())
.setDimension(DruidMetrics.DATASOURCE, entry.getKey())
.setMetric("metadatacache/backfill/count", entry.getValue().size())
);
}
catch (Exception e) {
log.error(e, "Exception persisting schema and updating segments table for datasource [%s].", entry.getKey());
log.error(e, "Exception persisting schema and updating segments table for datasource[%s].", entry.getKey());
}
}
emitter.emit(ServiceMetricEvent.builder().setMetric("metadatacache/backfill/time", stopwatch.millisElapsed()));
emitter.emit(
ServiceMetricEvent.builder()
.setMetric("metadatacache/backfill/time", stopwatch.millisElapsed())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,13 @@ public class ChangeRequestHttpSyncer<T>
private final CountDownLatch initializationLatch = new CountDownLatch(1);

/**
* This lock is used to ensure proper start-then-stop semantics and making sure after stopping no state update happens
* and {@link #sync} is not again scheduled in {@link #executor} and if there was a previously scheduled sync before
* stopping, it is skipped and also, it is used to ensure that duplicate syncs are never scheduled in the executor.
* Lock to implement proper start-then-stop semantics. Used to ensure that:
* <ul>
* <li>No state update happens after {@link #stop()}.</li>
* <li>No sync is scheduled after {@link #stop()}.</li>
* <li>Any pending sync is skipped when {@link #stop()} has been called.</li>
* <li>Duplicate syncs are not scheduled on the executor.</li>
* </ul>
*/
private final LifecycleLock startStopLock = new LifecycleLock();

Expand Down Expand Up @@ -141,7 +145,7 @@ public void start()
startStopLock.exitStart();
}

sinceSyncerStart.restart();
safeRestart(sinceSyncerStart);
addNextSyncToWorkQueue();
}
}
Expand Down Expand Up @@ -220,21 +224,18 @@ public long getUnstableTimeMillis()
*/
public boolean isSyncedSuccessfully()
{
if (consecutiveFailedAttemptCount > 0) {
return false;
} else {
return sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
}
return consecutiveFailedAttemptCount <= 0
&& sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
}

private void sync()
private void sendSyncRequest()
{
if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much all the current usages of awaitStarted is preceded by synchronized (startStopLock). Curious why this one doesn't have. I see this runs in an executor thread, but I'm wondering if the new synchronized block added below in line 242 should be moved up here.

Copy link
Contributor Author

@kfaraz kfaraz May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure if the startStopLock is really being used in the correct way in this entire class. Calling various methods on this lock (LifecycleLock) doesn't require synchronizing on it. So left this as is until we plan to revisit this code.

log.info("Skipping sync for server[%s] as syncer has not started yet.", logIdentity);
return;
}

sinceLastSyncRequest.restart();
safeRestart(sinceLastSyncRequest);

try {
final String req = getRequestString();
Expand Down Expand Up @@ -270,7 +271,7 @@ public void onSuccess(InputStream stream)
final int responseCode = responseHandler.getStatus();
if (responseCode == HttpServletResponse.SC_NO_CONTENT) {
log.debug("Received NO CONTENT from server[%s]", logIdentity);
sinceLastSyncSuccess.restart();
safeRestart(sinceLastSyncSuccess);
return;
} else if (responseCode != HttpServletResponse.SC_OK) {
handleFailure(new ISE("Received sync response [%d]", responseCode));
Expand Down Expand Up @@ -306,7 +307,7 @@ public void onSuccess(InputStream stream)
log.info("Server[%s] synced successfully.", logIdentity);
}

sinceLastSyncSuccess.restart();
safeRestart(sinceLastSyncSuccess);
}
catch (Exception ex) {
markServerUnstableAndAlert(ex, "Processing Response");
Expand Down Expand Up @@ -390,9 +391,9 @@ private void addNextSyncToWorkQueue()
RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount)
);
log.info("Scheduling next sync for server[%s] in [%d] millis.", logIdentity, delayMillis);
executor.schedule(this::sync, delayMillis, TimeUnit.MILLISECONDS);
executor.schedule(this::sendSyncRequest, delayMillis, TimeUnit.MILLISECONDS);
} else {
executor.execute(this::sync);
executor.execute(this::sendSyncRequest);
}
}
catch (Throwable th) {
Expand All @@ -410,10 +411,17 @@ private void addNextSyncToWorkQueue()
}
}

private void safeRestart(Stopwatch stopwatch)
{
synchronized (startStopLock) {
stopwatch.restart();
}
}

private void markServerUnstableAndAlert(Throwable throwable, String action)
{
if (consecutiveFailedAttemptCount++ == 0) {
sinceUnstable.restart();
safeRestart(sinceUnstable);
}

final long unstableSeconds = getUnstableTimeMillis() / 1000;
Expand Down
Loading