Skip to content

Commit

Permalink
Implement Progress Bar Persistence Read/Write (#20787)
Browse files Browse the repository at this point in the history
Implement the persistence layer changes following #19191.

This PR handles writing and reading stats to the new stream stat_table and columns in the existing sync_stats table.

At the same time we introduce upserts of stats records - i.e. merge updates into a single record - in preparation for real time stats updates vs the current approach where a new stat record is always written.

There will be two remaining PRs after this:
- First PR will be to fully wire up and test the API.
- Second PR will be to actually save stats while jobs are running.
  • Loading branch information
davinchia committed Dec 28, 2022
1 parent 54c0ef1 commit d98ddbb
Show file tree
Hide file tree
Showing 12 changed files with 428 additions and 39 deletions.
2 changes: 2 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3970,6 +3970,8 @@ components:
properties:
streamName:
type: string
streamNamespace:
type: string
stats:
$ref: "#/components/schemas/AttemptStats"
AttemptFailureSummary:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.40.18.002", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.26.001", jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
Expand Down
5 changes: 5 additions & 0 deletions airbyte-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@
If a database has tons of DDL queries, the logs would be filled with such messages-->
<Logger name="io.debezium.relational.history" level="OFF" />

<!--Uncomment the following to debug JOOQ generated SQL queries.-->
<!--<Logger name="org.jooq.tools.LoggerListener" level="debug">-->
<!-- <AppenderRef ref="Console"/>-->
<!--</Logger>-->

</Loggers>

</Configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,23 @@ required:
- bytesEmitted
additionalProperties: true
properties:
recordsEmitted:
type: integer
bytesEmitted:
type: integer
sourceStateMessagesEmitted:
description: Number of State messages emitted by the Source Connector
type: integer
destinationStateMessagesEmitted:
description: Number of State messages emitted by the Destination Connector
type: integer
recordsCommitted:
type: integer # if unset, committed records could not be computed
destinationWriteEndTime:
type: integer
description: The exit time of the destination container/pod
destinationWriteStartTime:
type: integer
description: The boot time of the destination container/pod
estimatedBytes:
type: integer
description: The total estimated number of bytes for the sync
estimatedRecords:
type: integer
description: The total estimated number of records for the sync
meanSecondsBeforeSourceStateMessageEmitted:
type: integer
maxSecondsBeforeSourceStateMessageEmitted:
Expand All @@ -29,21 +34,22 @@ properties:
type: integer
meanSecondsBetweenStateMessageEmittedandCommitted:
type: integer
replicationStartTime:
recordsEmitted:
type: integer
description: The start of the replication activity
recordsCommitted:
type: integer # if unset, committed records could not be computed
replicationEndTime:
type: integer
description: The end of the replication activity
sourceReadStartTime:
replicationStartTime:
type: integer
description: The boot time of the source container/pod
description: The start of the replication activity
sourceReadEndTime:
type: integer
description: The exit time of the source container/pod
destinationWriteStartTime:
sourceReadStartTime:
type: integer
description: The boot time of the destination container/pod
destinationWriteEndTime:
description: The boot time of the source container/pod
sourceStateMessagesEmitted:
description: Number of State messages emitted by the Source Connector
type: integer
description: The exit time of the destination container/pod
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.jobs.migrations;

import static org.jooq.impl.DSL.constraint;

import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_40_26_001__CorrectStreamStatsTable extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_26_001__CorrectStreamStatsTable.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
try (final DSLContext ctx = DSL.using(context.getConnection())) {
// This actually needs to be bigint to match the id column on the attempts table.
String streamStats = "stream_stats";
ctx.alterTable(streamStats).alter("attempt_id").set(SQLDataType.BIGINT.nullable(false)).execute();
// Not all streams provide a namespace.
ctx.alterTable(streamStats).alter("stream_namespace").set(SQLDataType.VARCHAR.nullable(true)).execute();

// The constraint should also take into account the stream namespace. Drop the constraint and
// recreate it.
ctx.alterTable(streamStats).dropUnique("stream_stats_attempt_id_stream_name_key").execute();
ctx.alterTable(streamStats).add(constraint("uniq_stream_attempt").unique("attempt_id", "stream_name", "stream_namespace")).execute();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ create table "public"."normalization_summaries"(
);
create table "public"."stream_stats"(
"id" uuid not null,
"attempt_id" int4 not null,
"stream_namespace" varchar(2147483647) not null,
"attempt_id" int8 not null,
"stream_namespace" varchar(2147483647) null,
"stream_name" varchar(2147483647) not null,
"records_emitted" int8 null,
"bytes_emitted" int8 null,
Expand Down Expand Up @@ -122,10 +122,11 @@ create index "jobs_status_idx" on "public"."jobs"("status" asc);
create unique index "normalization_summaries_pkey" on "public"."normalization_summaries"("id" asc);
create index "normalization_summary_attempt_id_idx" on "public"."normalization_summaries"("attempt_id" asc);
create index "index" on "public"."stream_stats"("attempt_id" asc);
create unique index "stream_stats_attempt_id_stream_name_key" on "public"."stream_stats"(
create unique index "stream_stats_pkey" on "public"."stream_stats"("id" asc);
create unique index "uniq_stream_attempt" on "public"."stream_stats"(
"attempt_id" asc,
"stream_name" asc
"stream_name" asc,
"stream_namespace" asc
);
create unique index "stream_stats_pkey" on "public"."stream_stats"("id" asc);
create index "attempt_id_idx" on "public"."sync_stats"("attempt_id" asc);
create unique index "sync_stats_pkey" on "public"."sync_stats"("id" asc);
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.NORMALIZATION_SUMMARIES;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.STREAM_STATS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -33,7 +34,9 @@
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.persistence.PersistenceHelpers;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
Expand Down Expand Up @@ -337,8 +340,6 @@ public Optional<String> getAttemptTemporalWorkflowId(final long jobId, final int
public void writeOutput(final long jobId, final int attemptNumber, final JobOutput output)
throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
final SyncStats syncStats = output.getSync().getStandardSyncSummary().getTotalStats();
final NormalizationSummary normalizationSummary = output.getSync().getNormalizationSummary();

jobDatabase.transaction(ctx -> {
ctx.update(ATTEMPTS)
Expand All @@ -348,10 +349,12 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp
.execute();
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);

final SyncStats syncStats = output.getSync().getStandardSyncSummary().getTotalStats();
if (syncStats != null) {
writeSyncStats(now, syncStats, attemptId, ctx);
saveToSyncStatsTable(now, syncStats, attemptId, ctx);
}

final NormalizationSummary normalizationSummary = output.getSync().getNormalizationSummary();
if (normalizationSummary != null) {
ctx.insertInto(NORMALIZATION_SUMMARIES)
.set(NORMALIZATION_SUMMARIES.ID, UUID.randomUUID())
Expand All @@ -369,14 +372,65 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp

}

private static void writeSyncStats(final OffsetDateTime now, final SyncStats syncStats, final Long attemptId, final DSLContext ctx) {
@Override
public void writeStats(final long jobId,
final int attemptNumber,
final long estimatedRecords,
final long estimatedBytes,
final long recordsEmitted,
final long bytesEmitted,
final List<StreamSyncStats> streamStats)
throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
jobDatabase.transaction(ctx -> {
final var attemptId = getAttemptId(jobId, attemptNumber, ctx);

final var syncStats = new SyncStats()
.withEstimatedRecords(estimatedRecords)
.withEstimatedBytes(estimatedBytes)
.withRecordsEmitted(recordsEmitted)
.withBytesEmitted(bytesEmitted);
saveToSyncStatsTable(now, syncStats, attemptId, ctx);

saveToStreamStatsTable(now, streamStats, attemptId, ctx);
return null;
});

}

private static void saveToSyncStatsTable(final OffsetDateTime now, final SyncStats syncStats, final Long attemptId, final DSLContext ctx) {
// Although JOOQ supports upsert using the onConflict statement, we cannot use it as the table
// currently has duplicate records and also doesn't contain the unique constraint on the attempt_id
// column JOOQ requires. We are forced to check for existence.
final var isExisting = ctx.fetchExists(SYNC_STATS, SYNC_STATS.ATTEMPT_ID.eq(attemptId));
if (isExisting) {
ctx.update(SYNC_STATS)
.set(SYNC_STATS.UPDATED_AT, now)
.set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted())
.set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted())
.set(SYNC_STATS.ESTIMATED_RECORDS, syncStats.getEstimatedRecords())
.set(SYNC_STATS.ESTIMATED_BYTES, syncStats.getEstimatedBytes())
.set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted())
.set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted())
.set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted())
.set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted())
.set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted())
.where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.execute();
return;
}

ctx.insertInto(SYNC_STATS)
.set(SYNC_STATS.ID, UUID.randomUUID())
.set(SYNC_STATS.UPDATED_AT, now)
.set(SYNC_STATS.CREATED_AT, now)
.set(SYNC_STATS.ATTEMPT_ID, attemptId)
.set(SYNC_STATS.UPDATED_AT, now)
.set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted())
.set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted())
.set(SYNC_STATS.ESTIMATED_RECORDS, syncStats.getEstimatedRecords())
.set(SYNC_STATS.ESTIMATED_BYTES, syncStats.getEstimatedBytes())
.set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted())
.set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted())
.set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted())
Expand All @@ -387,6 +441,50 @@ private static void writeSyncStats(final OffsetDateTime now, final SyncStats syn
.execute();
}

private static void saveToStreamStatsTable(final OffsetDateTime now,
final List<StreamSyncStats> perStreamStats,
final Long attemptId,
final DSLContext ctx) {
Optional.ofNullable(perStreamStats).orElse(Collections.emptyList()).forEach(
streamStats -> {
// We cannot entirely rely on JOOQ's generated SQL for upserts as it does not support null fields
// for conflict detection. We are forced to separately check for existence.
final var stats = streamStats.getStats();
final var isExisting =
ctx.fetchExists(STREAM_STATS, STREAM_STATS.ATTEMPT_ID.eq(attemptId).and(STREAM_STATS.STREAM_NAME.eq(streamStats.getStreamName()))
.and(PersistenceHelpers.isNullOrEquals(STREAM_STATS.STREAM_NAMESPACE, streamStats.getStreamNamespace())));
if (isExisting) {
ctx.update(STREAM_STATS)
.set(STREAM_STATS.UPDATED_AT, now)
.set(STREAM_STATS.BYTES_EMITTED, stats.getBytesEmitted())
.set(STREAM_STATS.RECORDS_EMITTED, stats.getRecordsEmitted())
.set(STREAM_STATS.ESTIMATED_RECORDS, stats.getEstimatedRecords())
.set(STREAM_STATS.ESTIMATED_BYTES, stats.getEstimatedBytes())
.where(STREAM_STATS.ATTEMPT_ID.eq(attemptId))
.execute();
return;
}

ctx.insertInto(STREAM_STATS)
.set(STREAM_STATS.ID, UUID.randomUUID())
.set(STREAM_STATS.ATTEMPT_ID, attemptId)
.set(STREAM_STATS.STREAM_NAME, streamStats.getStreamName())
.set(STREAM_STATS.STREAM_NAMESPACE, streamStats.getStreamNamespace())
.set(STREAM_STATS.CREATED_AT, now)
.set(STREAM_STATS.UPDATED_AT, now)
.set(STREAM_STATS.BYTES_EMITTED, stats.getBytesEmitted())
.set(STREAM_STATS.RECORDS_EMITTED, stats.getRecordsEmitted())
.set(STREAM_STATS.ESTIMATED_BYTES, stats.getEstimatedBytes())
.set(STREAM_STATS.ESTIMATED_RECORDS, stats.getEstimatedRecords())
.set(STREAM_STATS.UPDATED_AT, now)
.set(STREAM_STATS.BYTES_EMITTED, stats.getBytesEmitted())
.set(STREAM_STATS.RECORDS_EMITTED, stats.getRecordsEmitted())
.set(STREAM_STATS.ESTIMATED_BYTES, stats.getEstimatedBytes())
.set(STREAM_STATS.ESTIMATED_RECORDS, stats.getEstimatedRecords())
.execute();
});
}

@Override
public void writeAttemptFailureSummary(final long jobId, final int attemptNumber, final AttemptFailureSummary failureSummary) throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
Expand All @@ -400,14 +498,16 @@ public void writeAttemptFailureSummary(final long jobId, final int attemptNumber
}

@Override
public List<SyncStats> getSyncStats(final long jobId, final int attemptNumber) throws IOException {
public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) throws IOException {
return jobDatabase
.query(ctx -> {
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);
return ctx.select(DSL.asterisk()).from(DSL.table("sync_stats")).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.fetch(getSyncStatsRecordMapper())
.stream()
.toList();
final var syncStats = ctx.select(DSL.asterisk()).from(SYNC_STATS).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.orderBy(SYNC_STATS.UPDATED_AT.desc())
.fetchOne(getSyncStatsRecordMapper());
final var perStreamStats = ctx.select(DSL.asterisk()).from(STREAM_STATS).where(STREAM_STATS.ATTEMPT_ID.eq(attemptId))
.fetch(getStreamStatsRecordsMapper());
return new AttemptStats(syncStats, perStreamStats);
});
}

Expand All @@ -423,7 +523,8 @@ public List<NormalizationSummary> getNormalizationSummary(final long jobId, fina
});
}

private static Long getAttemptId(final long jobId, final int attemptNumber, final DSLContext ctx) {
@VisibleForTesting
static Long getAttemptId(final long jobId, final int attemptNumber, final DSLContext ctx) {
final Optional<Record> record =
ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId,
attemptNumber).stream().findFirst();
Expand All @@ -432,6 +533,7 @@ private static Long getAttemptId(final long jobId, final int attemptNumber, fina

private static RecordMapper<Record, SyncStats> getSyncStatsRecordMapper() {
return record -> new SyncStats().withBytesEmitted(record.get(SYNC_STATS.BYTES_EMITTED)).withRecordsEmitted(record.get(SYNC_STATS.RECORDS_EMITTED))
.withEstimatedBytes(record.get(SYNC_STATS.ESTIMATED_BYTES)).withEstimatedRecords(record.get(SYNC_STATS.ESTIMATED_RECORDS))
.withSourceStateMessagesEmitted(record.get(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED))
.withDestinationStateMessagesEmitted(record.get(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED))
.withRecordsCommitted(record.get(SYNC_STATS.RECORDS_COMMITTED))
Expand All @@ -441,8 +543,19 @@ private static RecordMapper<Record, SyncStats> getSyncStatsRecordMapper() {
.withMaxSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED));
}

private static RecordMapper<Record, StreamSyncStats> getStreamStatsRecordsMapper() {
return record -> {
final var stats = new SyncStats()
.withEstimatedRecords(record.get(STREAM_STATS.ESTIMATED_RECORDS)).withEstimatedBytes(record.get(STREAM_STATS.ESTIMATED_BYTES))
.withRecordsEmitted(record.get(STREAM_STATS.RECORDS_EMITTED)).withBytesEmitted(record.get(STREAM_STATS.BYTES_EMITTED));
return new StreamSyncStats()
.withStreamName(record.get(STREAM_STATS.STREAM_NAME)).withStreamNamespace(record.get(STREAM_STATS.STREAM_NAMESPACE))
.withStats(stats);
};
}

private static RecordMapper<Record, NormalizationSummary> getNormalizationSummaryRecordMapper() {
final RecordMapper<Record, NormalizationSummary> recordMapper = record -> {
return record -> {
try {
return new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli())
.withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli())
Expand All @@ -451,7 +564,6 @@ private static RecordMapper<Record, NormalizationSummary> getNormalizationSummar
throw new RuntimeException(e);
}
};
return recordMapper;
}

private static List<FailureReason> deserializeFailureReasons(final Record record) throws JsonProcessingException {
Expand Down
Loading

0 comments on commit d98ddbb

Please sign in to comment.