Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encapsulate and test triggers for assigning flush workers #26706

Merged
merged 1 commit into from
May 31, 2023

Conversation

cgardens
Copy link
Contributor

@cgardens cgardens commented May 28, 2023

What

  • The logic for which stream we assign a flush worker is one of the places where we will do the most tuning. That means as we change it, we need to be well encapsulate and well tested.
  • This PR is designed to be readable to get someone up to speed on when we trigger. It's good context for the destinations team as the logic for when we decide to flush is a little different than the existing one.

How

  • Moves logic for triggers into its own class.
  • Adds rigorous javadoc comments.
  • Adds rigorous testing.

Recommended reading order

  1. DetectStreamToFlush.java
  2. RunningFlushWorkers.java

@cgardens cgardens requested a review from a team as a code owner May 28, 2023 05:09
@github-actions
Copy link
Contributor

github-actions bot commented May 28, 2023

Affected Connector Report

NOTE ⚠️ Changes in this PR affect the following connectors. Make sure to do the following as needed:

  • Run integration tests
  • Bump connector or module version
  • Add changelog
  • Publish the new version

✅ Sources (33)

Connector Version Changelog Publish
source-alloydb 2.0.28
source-alloydb-strict-encrypt 2.0.28 🔵
(ignored)
🔵
(ignored)
source-azure-blob-storage 0.1.0
source-bigquery 0.2.3
source-clickhouse 0.1.17
source-clickhouse-strict-encrypt 0.1.17 🔵
(ignored)
🔵
(ignored)
source-cockroachdb 0.1.22
source-cockroachdb-strict-encrypt 0.1.22 🔵
(ignored)
🔵
(ignored)
source-db2 0.1.19
source-db2-strict-encrypt 0.1.19 🔵
(ignored)
🔵
(ignored)
source-dynamodb 0.1.2
source-e2e-test 2.1.4
source-e2e-test-cloud 2.1.4 🔵
(ignored)
🔵
(ignored)
source-elasticsearch 0.1.1
source-jdbc 0.3.5 🔵
(ignored)
🔵
(ignored)
source-kafka 0.2.3
source-mongodb-strict-encrypt 0.1.19 🔵
(ignored)
🔵
(ignored)
source-mongodb-v2 0.1.19
source-mssql 1.0.17
source-mssql-strict-encrypt 1.0.17 🔵
(ignored)
🔵
(ignored)
source-mysql 2.0.24
source-mysql-strict-encrypt 2.0.24 🔵
(ignored)
🔵
(ignored)
source-oracle 0.3.24
source-oracle-strict-encrypt 0.3.24 🔵
(ignored)
🔵
(ignored)
source-postgres 2.0.31
source-postgres-strict-encrypt 2.0.31 🔵
(ignored)
🔵
(ignored)
source-redshift 0.3.16
source-relational-db 0.3.1 🔵
(ignored)
🔵
(ignored)
source-scaffold-java-jdbc 0.1.0 🔵
(ignored)
🔵
(ignored)
source-sftp 0.1.2
source-snowflake 0.1.34
source-teradata 0.1.0
source-tidb 0.2.4
  • See "Actionable Items" below for how to resolve warnings and errors.

❌ Destinations (50)

Connector Version Changelog Publish
destination-azure-blob-storage 0.2.0
destination-bigquery 1.4.4
destination-bigquery-denormalized 1.4.1
destination-cassandra 0.1.4
destination-clickhouse 0.2.3
destination-clickhouse-strict-encrypt 0.2.3 🔵
(ignored)
🔵
(ignored)
destination-csv 1.0.0
destination-databricks 1.0.2
destination-dev-null 0.3.0 🔵
(ignored)
🔵
(ignored)
destination-doris 0.1.0
destination-dynamodb 0.1.7
destination-e2e-test 0.3.0
destination-elasticsearch 0.1.6
destination-elasticsearch-strict-encrypt 0.1.6 🔵
(ignored)
🔵
(ignored)
destination-exasol 0.1.1
destination-gcs 0.3.0
destination-iceberg 0.1.0
destination-kafka 0.1.10
destination-keen 0.2.4
destination-kinesis 0.1.5
destination-local-json 0.2.11
destination-mariadb-columnstore 0.1.7
destination-mongodb 0.1.9
destination-mongodb-strict-encrypt 0.1.9 🔵
(ignored)
🔵
(ignored)
destination-mqtt 0.1.3
destination-mssql 0.1.23
destination-mssql-strict-encrypt 0.1.23 🔵
(ignored)
🔵
(ignored)
destination-mysql 0.1.20
destination-mysql-strict-encrypt 0.1.21
(mismatch: 0.1.20)
🔵
(ignored)
🔵
(ignored)
destination-oracle 0.1.19
destination-oracle-strict-encrypt 0.1.19 🔵
(ignored)
🔵
(ignored)
destination-postgres 0.3.27
destination-postgres-strict-encrypt 0.3.27 🔵
(ignored)
🔵
(ignored)
destination-pubsub 0.2.0
destination-pulsar 0.1.3
destination-r2 0.1.0
destination-redis 0.1.4
destination-redpanda 0.1.0
destination-redshift 0.4.8
destination-rockset 0.1.4
destination-s3 0.4.1
destination-s3-glue 0.1.7
destination-scylla 0.1.3
destination-selectdb 0.1.0
destination-snowflake 1.0.4
destination-starburst-galaxy 0.0.1
destination-teradata 0.1.1
destination-tidb 0.1.1
(diff seed version)
destination-vertica 0.1.0
destination-yugabytedb 0.1.1
  • See "Actionable Items" below for how to resolve warnings and errors.

✅ Other Modules (0)

Actionable Items

(click to expand)

Category Status Actionable Item
Version
mismatch
The version of the connector is different from its normal variant. Please bump the version of the connector.

doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.
Changelog
doc not found
The connector does not seem to have a documentation file. This can be normal (e.g. basic connector like source-jdbc is not published or documented). Please double-check to make sure that it is not a bug.

changelog missing
There is no chnagelog for the current version of the connector. If you are the author of the current version, please add a changelog.
Publish
not in seed
The connector is not in the cloud or oss registry, so its publication status cannot be checked. This can be normal (e.g. some connectors are cloud-specific, and only listed in the cloud seed file). Please double-check to make sure that you have added a metadata.yaml file and the expected registries are enabled.

*
* @return best, next stream to flush. If no stream is ready to be flushed, return empty.
*/
public Optional<StreamDescriptor> getNextStreamToFlush() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this isn't wired up in FlushWorkers - is the plan to do that in a follow up PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup. there are so many diffs in flush workers i couldn't wire it in in this PR without causing a big mess.

Copy link
Contributor

@davinchia davinchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff! Super love the comments + tests.

I only skimmed the tests as they felt very similar to our previous branch.

.map(time -> time.isBefore(Instant.now().minus(MAX_TIME_BETWEEN_REC_MIN, ChronoUnit.MINUTES)))
.orElse(false);

final String debugString = String.format("time trigger: %s", isTimeTriggered);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How useful is this for debugging? I can only see this being useful since getNextStreamToFlush doesn't differentiate between isSizedTriggered or isTimeTriggered to log the universal debugString and you want to know if it was time or size triggered

There would be greater value with the Instant.now() and bufferDequeue.getTimeOfLastRecord(stream).get() values imo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point. in practice, this is a simple trigger and we haven't seen a bug here yet. i'm sure we will one day and then want to log more, but i'm inclined to wait until then. the tradeoff is how huge the log lines gets so if we go with what you're suggesting you really need now: 2023/05/30 12:00.000+06 record added at: 2023/05/30 11:00.000+06.

* @param stream the stream that is being flushed
* @param flushWorkerId flush worker id
*/
public void trackFlushWorker(final StreamDescriptor stream, final UUID flushWorkerId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this class intended to be wired in later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor

@evantahler evantahler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great comments and tests!
There are a few things not-wired up (as others have pointed out), but I'm going to assume a later PR uses them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comments in this file are
swedish_chef

}

/**
* For a stream, determines how many bytes will be processed by CURRENTLY running workers. For the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for a stream to have more than one active worker? (I think so)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!

@cgardens cgardens force-pushed the cgardens/async-snowflake-boring branch from 8016643 to 6dc7de7 Compare May 31, 2023 15:25
@cgardens cgardens requested a review from a team as a code owner May 31, 2023 15:25
Base automatically changed from cgardens/async-snowflake-boring to master May 31, 2023 15:53
@cgardens cgardens force-pushed the cgardens/flush-worker-trigger branch from 92991c7 to 6ceca72 Compare May 31, 2023 15:54
@cgardens cgardens merged commit 223f698 into master May 31, 2023
22 checks passed
@cgardens cgardens deleted the cgardens/flush-worker-trigger branch May 31, 2023 16:59
marcosmarxm pushed a commit to natalia-miinto/airbyte that referenced this pull request Jun 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants