Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup usages of stopwatch #16478

Merged
merged 5 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@
import java.util.concurrent.TimeUnit;

/**
* Thread-safe wrapper over {@link com.google.common.base.Stopwatch}.
* <p>
* Thread safety has been limited to the start/stop methods for now as they are
* the only ones that can throw an exception in an illegal state and are thus
* vulnerable to race conditions.
* Wrapper over {@link com.google.common.base.Stopwatch} to provide some utility
* methods such as {@link #millisElapsed()}, {@link #restart()}, {@link #hasElapsed(Duration)}.
*/
public class Stopwatch
{
Expand All @@ -55,30 +52,30 @@ private Stopwatch(com.google.common.base.Stopwatch delegate)
this.delegate = delegate;
}

public synchronized void start()
public void start()
{
delegate.start();
}

public synchronized void stop()
public void stop()
{
delegate.stop();
}

public synchronized void reset()
public void reset()
{
delegate.reset();
}

/**
* Invokes {@code reset().start()} on the underlying {@link com.google.common.base.Stopwatch}.
*/
public synchronized void restart()
public void restart()
{
delegate.reset().start();
}

public synchronized boolean isRunning()
public boolean isRunning()
{
return delegate.isRunning();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public class DruidLeaderClient

private final String leaderRequestPath;

private LifecycleLock lifecycleLock = new LifecycleLock();
private final LifecycleLock lifecycleLock = new LifecycleLock();
private DruidNodeDiscovery druidNodeDiscovery;
private AtomicReference<String> currentKnownLeader = new AtomicReference<>();
private final AtomicReference<String> currentKnownLeader = new AtomicReference<>();

public DruidLeaderClient(
HttpClient httpClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,9 @@
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -1036,42 +1033,26 @@ private void doPollSegments()
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Starting polling of segment table.");

// some databases such as PostgreSQL require auto-commit turned off
// Some databases such as PostgreSQL require auto-commit turned off
// to stream results back, enabling transactions disables auto-commit
//
// setting connection to read-only will allow some database such as MySQL
// to automatically use read-only transaction mode, further optimizing the query
final List<DataSegment> segments = connector.inReadOnlyTransaction(
new TransactionCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
{
return handle
.createQuery(StringUtils.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.map(
new ResultSetMapper<DataSegment>()
{
@Override
public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
return replaceWithExistingSegmentIfPresent(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
// If one entry in database is corrupted doPoll() should continue to work overall. See
// filter by `Objects::nonNull` below in this method.
return null;
}
}
}
)
.list();
}
}
(handle, status) -> handle
.createQuery(StringUtils.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.map((index, r, ctx) -> {
try {
DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
return replaceWithExistingSegmentIfPresent(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
// If one entry in database is corrupted doPoll() should continue to work overall. See
// filter by `Objects::nonNull` below in this method.
return null;
}
}).list()
);

Preconditions.checkNotNull(
Expand All @@ -1082,11 +1063,13 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE
if (segments.isEmpty()) {
log.info("No segments found in the database!");
} else {
log.info("Polled and found [%,d] segments in the database in [%,d] ms.", segments.size(), stopwatch.millisElapsed());
log.info(
"Polled and found [%,d] segments in the database in [%,d] ms.",
segments.size(), stopwatch.millisElapsed()
);
}
stopwatch.restart();

createDatasourcesSnapshot(stopwatch, segments);
createDatasourcesSnapshot(segments);
}

private void doPollSegmentAndSchema()
Expand Down Expand Up @@ -1157,25 +1140,18 @@ public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
(handle, status) -> {
handle.createQuery(schemaPollQuery)
.setFetchSize(connector.getStreamingFetchSize())
.map(
new ResultSetMapper<Void>()
{
@Override
public Void map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
schemaMapBuilder.put(
r.getString("fingerprint"),
jsonMapper.readValue(r.getBytes("payload"), SchemaPayload.class)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read schema from db.").emit();
}
return null;
}
})
.list();
.map((index, r, ctx) -> {
try {
schemaMapBuilder.put(
r.getString("fingerprint"),
jsonMapper.readValue(r.getBytes("payload"), SchemaPayload.class)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read schema from db.").emit();
}
return null;
}).list();

segmentSchemaCache.resetTemporaryPublishedMetadataQueryResultOnDBPoll();
return null;
Expand All @@ -1196,18 +1172,16 @@ public Void map(int index, ResultSet r, StatementContext ctx) throws SQLExceptio
} else {
log.info(
"Polled and found total [%,d] segments and [%,d] schema in the database in [%,d] ms.",
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
segments.size(),
schemaMap.size(),
stopwatch.millisElapsed()
segments.size(), schemaMap.size(), stopwatch.millisElapsed()
);
}
stopwatch.restart();

createDatasourcesSnapshot(stopwatch, segments);
createDatasourcesSnapshot(segments);
}

private void createDatasourcesSnapshot(Stopwatch stopwatch, List<DataSegment> segments)
private void createDatasourcesSnapshot(List<DataSegment> segments)
{
final Stopwatch stopwatch = Stopwatch.createStarted();
// dataSourcesSnapshot is updated only here and the DataSourcesSnapshot object is immutable. If data sources or
// segments are marked as used or unused directly (via markAs...() methods in SegmentsMetadataManager), the
// dataSourcesSnapshot can become invalid until the next database poll.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SchemaPayloadPlus;
Expand Down Expand Up @@ -159,13 +160,11 @@ public void processBatchesDue()
return;
}

Stopwatch stopwatch = Stopwatch.createStarted();
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Backfilling segment schema. Queue size is [%s].", queue.size());

log.info("Backfilling segment schema. Queue size is [%s]", queue.size());

int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());

Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
final int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());
final Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
for (int i = 0; i < itemsToProcess; i++) {
SegmentSchemaMetadataPlus item = queue.poll();
if (item != null) {
Expand All @@ -175,21 +174,29 @@ public void processBatchesDue()

for (Map.Entry<String, List<SegmentSchemaMetadataPlus>> entry : polled.entrySet()) {
try {
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(), entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(
entry.getKey(),
entry.getValue(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
);

// Mark the segments as published in the cache.
for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId());
}
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", entry.getKey())
.setMetric("metadatacache/backfill/count", entry.getValue().size())
.setDimension(DruidMetrics.DATASOURCE, entry.getKey())
.setMetric("metadatacache/backfill/count", entry.getValue().size())
);
}
catch (Exception e) {
log.error(e, "Exception persisting schema and updating segments table for datasource [%s].", entry.getKey());
log.error(e, "Exception persisting schema and updating segments table for datasource[%s].", entry.getKey());
}
}
emitter.emit(ServiceMetricEvent.builder().setMetric("metadatacache/backfill/time", stopwatch.millisElapsed()));
emitter.emit(
ServiceMetricEvent.builder()
.setMetric("metadatacache/backfill/time", stopwatch.millisElapsed())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
Expand Down Expand Up @@ -82,15 +83,20 @@ public class ChangeRequestHttpSyncer<T>
private final CountDownLatch initializationLatch = new CountDownLatch(1);

/**
* This lock is used to ensure proper start-then-stop semantics and making sure after stopping no state update happens
* and {@link #sync} is not again scheduled in {@link #executor} and if there was a previously scheduled sync before
* stopping, it is skipped and also, it is used to ensure that duplicate syncs are never scheduled in the executor.
* Lock to implement proper start-then-stop semantics. Used to ensure that:
* <ul>
* <li>No state update happens after {@link #stop()}.</li>
* <li>No sync is scheduled after {@link #stop()}.</li>
* <li>Any pending sync is skipped when {@link #stop()} has been called.</li>
* <li>Duplicate syncs are not scheduled on the executor.</li>
* </ul>
*/
private final LifecycleLock startStopLock = new LifecycleLock();

private final String logIdentity;
private int consecutiveFailedAttemptCount = 0;

// All stopwatches are guarded by the the startStopLock
kfaraz marked this conversation as resolved.
Show resolved Hide resolved
private final Stopwatch sinceSyncerStart = Stopwatch.createUnstarted();
private final Stopwatch sinceLastSyncRequest = Stopwatch.createUnstarted();
private final Stopwatch sinceLastSyncSuccess = Stopwatch.createUnstarted();
Expand Down Expand Up @@ -220,21 +226,22 @@ public long getUnstableTimeMillis()
*/
public boolean isSyncedSuccessfully()
{
if (consecutiveFailedAttemptCount > 0) {
return false;
} else {
return sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
}
return consecutiveFailedAttemptCount <= 0
&& sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
}

private void sync()
private void sendSyncRequest()
{
if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much all the current usages of awaitStarted is preceded by synchronized (startStopLock). Curious why this one doesn't have. I see this runs in an executor thread, but I'm wondering if the new synchronized block added below in line 242 should be moved up here.

Copy link
Contributor Author

@kfaraz kfaraz May 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure if the startStopLock is really being used in the correct way in this entire class. Calling various methods on this lock (LifecycleLock) doesn't require synchronizing on it. So left this as is until we plan to revisit this code.

log.info("Skipping sync for server[%s] as syncer has not started yet.", logIdentity);
return;
}

sinceLastSyncRequest.restart();
// Synchronized to tackle the remote possibility of two syncs trying to reset
// the stopwatch together
synchronized (startStopLock) {
sinceLastSyncRequest.restart();
}

try {
final String req = getRequestString();
Expand Down Expand Up @@ -335,6 +342,7 @@ public void onFailure(Throwable t)
}
}

@GuardedBy("startStopLock")
private void handleFailure(Throwable t)
{
String logMsg = StringUtils.format(
Expand All @@ -350,7 +358,9 @@ private void handleFailure(Throwable t)
}
catch (Throwable th) {
try {
markServerUnstableAndAlert(th, "Sending Request");
synchronized (startStopLock) {
markServerUnstableAndAlert(th, "Sending Request");
}
}
finally {
addNextSyncToWorkQueue();
Expand Down Expand Up @@ -390,9 +400,9 @@ private void addNextSyncToWorkQueue()
RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount)
);
log.info("Scheduling next sync for server[%s] in [%d] millis.", logIdentity, delayMillis);
executor.schedule(this::sync, delayMillis, TimeUnit.MILLISECONDS);
executor.schedule(this::sendSyncRequest, delayMillis, TimeUnit.MILLISECONDS);
} else {
executor.execute(this::sync);
executor.execute(this::sendSyncRequest);
}
}
catch (Throwable th) {
Expand All @@ -410,6 +420,7 @@ private void addNextSyncToWorkQueue()
}
}

@GuardedBy("startStopLock")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to just have the lock synchronization in this method directly instead of relying on the different callers to do the right thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fair enough. I guess the only part which really needs the synchronization is the update of the failed attempt count and the stopwatch reset.

Btw, @GuardedBy forces the callers to do the right thing as it throws a compile-time error if not called from within the proper synchronization.

private void markServerUnstableAndAlert(Throwable throwable, String action)
{
if (consecutiveFailedAttemptCount++ == 0) {
Expand Down
Loading