Skip to content

Commit

Permalink
Cleanup usages of stopwatch
Browse files Browse the repository at this point in the history
  • Loading branch information
kfaraz committed May 21, 2024
1 parent 15d27f3 commit c91f5da
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@
import java.util.concurrent.TimeUnit;

/**
* Thread-safe wrapper over {@link com.google.common.base.Stopwatch}.
* <p>
* Thread safety has been limited to the start/stop methods for now as they are
* the only ones that can throw an exception in an illegal state and are thus
* vulnerable to race conditions.
* Wrapper over {@link com.google.common.base.Stopwatch} to provide some utility
* methods such as {@link #millisElapsed()}, {@link #restart()}, {@link #hasElapsed(Duration)}.
*/
public class Stopwatch
{
Expand All @@ -55,30 +52,30 @@ private Stopwatch(com.google.common.base.Stopwatch delegate)
this.delegate = delegate;
}

public synchronized void start()
public void start()
{
delegate.start();
}

public synchronized void stop()
public void stop()
{
delegate.stop();
}

public synchronized void reset()
public void reset()
{
delegate.reset();
}

/**
* Invokes {@code reset().start()} on the underlying {@link com.google.common.base.Stopwatch}.
*/
public synchronized void restart()
public void restart()
{
delegate.reset().start();
}

public synchronized boolean isRunning()
public boolean isRunning()
{
return delegate.isRunning();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public class DruidLeaderClient

private final String leaderRequestPath;

private LifecycleLock lifecycleLock = new LifecycleLock();
private final LifecycleLock lifecycleLock = new LifecycleLock();
private DruidNodeDiscovery druidNodeDiscovery;
private AtomicReference<String> currentKnownLeader = new AtomicReference<>();
private final AtomicReference<String> currentKnownLeader = new AtomicReference<>();

public DruidLeaderClient(
HttpClient httpClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,36 +1042,21 @@ private void doPollSegments()
// setting connection to read-only will allow some database such as MySQL
// to automatically use read-only transaction mode, further optimizing the query
final List<DataSegment> segments = connector.inReadOnlyTransaction(
new TransactionCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
{
return handle
.createQuery(StringUtils.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.map(
new ResultSetMapper<DataSegment>()
{
@Override
public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
return replaceWithExistingSegmentIfPresent(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
// If one entry in database is corrupted doPoll() should continue to work overall. See
// filter by `Objects::nonNull` below in this method.
return null;
}
}
}
)
.list();
}
}
(handle, status) -> handle
.createQuery(StringUtils.format("SELECT payload FROM %s WHERE used=true", getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.map((index, r, ctx) -> {
try {
DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
return replaceWithExistingSegmentIfPresent(segment);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
// If one entry in database is corrupted doPoll() should continue to work overall. See
// filter by `Objects::nonNull` below in this method.
return null;
}
}).list()
);

Preconditions.checkNotNull(
Expand Down Expand Up @@ -1157,25 +1142,18 @@ public List<DataSegment> inTransaction(Handle handle, TransactionStatus status)
(handle, status) -> {
handle.createQuery(schemaPollQuery)
.setFetchSize(connector.getStreamingFetchSize())
.map(
new ResultSetMapper<Void>()
{
@Override
public Void map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
schemaMapBuilder.put(
r.getString("fingerprint"),
jsonMapper.readValue(r.getBytes("payload"), SchemaPayload.class)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read schema from db.").emit();
}
return null;
}
})
.list();
.map((index, r, ctx) -> {
try {
schemaMapBuilder.put(
r.getString("fingerprint"),
jsonMapper.readValue(r.getBytes("payload"), SchemaPayload.class)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read schema from db.").emit();
}
return null;
}).list();

segmentSchemaCache.resetTemporaryPublishedMetadataQueryResultOnDBPoll();
return null;
Expand All @@ -1196,18 +1174,16 @@ public Void map(int index, ResultSet r, StatementContext ctx) throws SQLExceptio
} else {
log.info(
"Polled and found total [%,d] segments and [%,d] schema in the database in [%,d] ms.",
segments.size(),
schemaMap.size(),
stopwatch.millisElapsed()
segments.size(), schemaMap.size(), stopwatch.millisElapsed()
);
}
stopwatch.restart();

createDatasourcesSnapshot(stopwatch, segments);
createDatasourcesSnapshot(segments);
}

private void createDatasourcesSnapshot(Stopwatch stopwatch, List<DataSegment> segments)
private void createDatasourcesSnapshot(List<DataSegment> segments)
{
final Stopwatch stopwatch = Stopwatch.createStarted();
// dataSourcesSnapshot is updated only here and the DataSourcesSnapshot object is immutable. If data sources or
// segments are marked as used or unused directly (via markAs...() methods in SegmentsMetadataManager), the
// dataSourcesSnapshot can become invalid until the next database poll.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.SchemaPayloadPlus;
Expand Down Expand Up @@ -159,13 +160,11 @@ public void processBatchesDue()
return;
}

Stopwatch stopwatch = Stopwatch.createStarted();
final Stopwatch stopwatch = Stopwatch.createStarted();
log.info("Backfilling segment schema. Queue size is [%s].", queue.size());

log.info("Backfilling segment schema. Queue size is [%s]", queue.size());

int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());

Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
final int itemsToProcess = Math.min(MAX_BATCH_SIZE, queue.size());
final Map<String, List<SegmentSchemaMetadataPlus>> polled = new HashMap<>();
for (int i = 0; i < itemsToProcess; i++) {
SegmentSchemaMetadataPlus item = queue.poll();
if (item != null) {
Expand All @@ -175,21 +174,29 @@ public void processBatchesDue()

for (Map.Entry<String, List<SegmentSchemaMetadataPlus>> entry : polled.entrySet()) {
try {
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(entry.getKey(), entry.getValue(), CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
segmentSchemaManager.persistSchemaAndUpdateSegmentsTable(
entry.getKey(),
entry.getValue(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
);

// Mark the segments as published in the cache.
for (SegmentSchemaMetadataPlus plus : entry.getValue()) {
segmentSchemaCache.markMetadataQueryResultPublished(plus.getSegmentId());
}
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", entry.getKey())
.setMetric("metadatacache/backfill/count", entry.getValue().size())
.setDimension(DruidMetrics.DATASOURCE, entry.getKey())
.setMetric("metadatacache/backfill/count", entry.getValue().size())
);
}
catch (Exception e) {
log.error(e, "Exception persisting schema and updating segments table for datasource [%s].", entry.getKey());
log.error(e, "Exception persisting schema and updating segments table for datasource[%s].", entry.getKey());
}
}
emitter.emit(ServiceMetricEvent.builder().setMetric("metadatacache/backfill/time", stopwatch.millisElapsed()));
emitter.emit(
ServiceMetricEvent.builder()
.setMetric("metadatacache/backfill/time", stopwatch.millisElapsed())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,22 @@ public class ChangeRequestHttpSyncer<T>
private final CountDownLatch initializationLatch = new CountDownLatch(1);

/**
* This lock is used to ensure proper start-then-stop semantics and making sure after stopping no state update happens
* and {@link #sync} is not again scheduled in {@link #executor} and if there was a previously scheduled sync before
* stopping, it is skipped and also, it is used to ensure that duplicate syncs are never scheduled in the executor.
* Lock to implement proper start-then-stop semantics. Used to ensure the following:
* <ul>
* <li>No state update happens after {@link #stop()}</li>
* <li>sync is not scheduled after {@link #stop()}</li>
* <li>Any pending sync is skipped when {@link #stop()} has been called</li>
* <li>Duplicate syncs are not scheduled</li>
* </ul>
*/
private final LifecycleLock startStopLock = new LifecycleLock();

private final String logIdentity;
private int consecutiveFailedAttemptCount = 0;

private final Stopwatch sinceSyncerStart = Stopwatch.createUnstarted();
private final Stopwatch sinceSyncerStart = Stopwatch.createUnstarted(); // done, does not need sync
private final Stopwatch sinceLastSyncRequest = Stopwatch.createUnstarted();
private final Stopwatch sinceLastSyncSuccess = Stopwatch.createUnstarted();
private final Stopwatch sinceLastSyncSuccess = Stopwatch.createUnstarted(); // done, does not need sync
private final Stopwatch sinceUnstable = Stopwatch.createUnstarted();

@Nullable
Expand Down Expand Up @@ -220,11 +224,8 @@ public long getUnstableTimeMillis()
*/
public boolean isSyncedSuccessfully()
{
if (consecutiveFailedAttemptCount > 0) {
return false;
} else {
return sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
}
return consecutiveFailedAttemptCount <= 0
&& sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
}

private void sync()
Expand All @@ -234,6 +235,7 @@ private void sync()
return;
}

// Can two syncs happen together?
sinceLastSyncRequest.restart();

try {
Expand Down

0 comments on commit c91f5da

Please sign in to comment.