Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-11752] - add support of incremental strategy in Kafka #202

Merged
merged 5 commits into from
Jan 29, 2024

Conversation

maxim-lixakov
Copy link
Contributor

@maxim-lixakov maxim-lixakov commented Jan 24, 2024

Change Summary

  • Add support of Incremental Strategies for Kafka connection
  • Restrict passing * and , to Kafka.source as we handle reading only from single topic (in DBReader concept + HWM support)
  • Add following tests and documentation

Related issue number

Checklist

  • Commit message and PR title is comprehensive
  • Keep the change as small as possible
  • Unit and integration tests for the changes exist
  • Tests pass on CI and coverage does not decrease
  • Documentation reflects the changes where applicable
  • docs/changelog/next_release/<pull request or issue id>.<change type>.rst file added describing change
    (see CONTRIBUTING.rst for details.)
  • My PR is ready to review.

@maxim-lixakov
Copy link
Contributor Author

не нашел в каких случаях "в разные моменты времени в window.start_from.including может быть true, а может быть и false", если мы используем только Incremental strategy, передаваемый window в read_source_as_df всегда формируется одинаково ( в snapshot мы подставляем earliest/latest в starting/endingOffsets)

Copy link

codecov bot commented Jan 24, 2024

Codecov Report

Attention: 4 lines in your changes are missing coverage. Please review.

Comparison is base (0af4836) 93.86% compared to head (1f2ea3e) 94.11%.

Files Patch % Lines
onetl/connection/db_connection/kafka/connection.py 71.42% 1 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           develop     #202      +/-   ##
===========================================
+ Coverage    93.86%   94.11%   +0.24%     
===========================================
  Files          204      204              
  Lines         7780     7799      +19     
  Branches      1379     1387       +8     
===========================================
+ Hits          7303     7340      +37     
+ Misses         358      335      -23     
- Partials       119      124       +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@maxim-lixakov
Copy link
Contributor Author

maxim-lixakov commented Jan 24, 2024

не нашел в каких случаях "в разные моменты времени в window.start_from.including может быть true, а может быть и false", если мы используем только Incremental strategy, передаваемый window в read_source_as_df всегда формируется одинаково ( в snapshot мы подставляем earliest/latest в starting/endingOffsets)

также не нашел в каких случаях может передаваться limit, когда он может быть нужен?

@dolfinus
Copy link
Member

не нашел в каких случаях "в разные моменты времени в window.start_from.including может быть true, а может быть и false", если мы используем только Incremental strategy, передаваемый window в read_source_as_df всегда формируется одинаково ( в snapshot мы подставляем earliest/latest в starting/endingOffsets)

Да, false может быть только в случае Batch стратегий, которые Kafka не поддерживает. Можно добавить комментарий об этом код формирования startingOffsets.

также не нашел в каких случаях может передаваться limit, когда он может быть нужен?

Для остальных СУБД это используется в случае, когда в источнике нет строк, попадающих под where, и min/max возвращают None, как если бы инкремента вообще не было. В таком случае read_source_as_df вызывается с limit=0 чисто для того, чтобы мы всегда в таком случае возвращали пустой dataframe. Иначе при появлении в источнике данных они бы сразу были доступны в df из-за его ленивости, а HWM вычисляется до возврата df, и неизбежно было бы расхождение.

В случае Kafka даже у пустого топика или топика без новых данных есть какой-то offset для каждой из партиций, и get_min_max_values никак не может вернуть None, и соответственно limit=0 не возможен. Но пусть будет для совместимости, может в будущем что-то такое понадобится.

@dolfinus
Copy link
Member

Еще нужно проверить документацию на наличие предупреждений о работе Kafkaс инкрементами:

.. warning::
Currently, Kafka does not support :ref:`strategy`. You can only read the **whole** topic.

@dolfinus
Copy link
Member

Сделай rebase, я пофиксил сборку документации

@maxim-lixakov
Copy link
Contributor Author

Еще нужно проверить документацию на наличие предупреждений о работе Kafkaс инкрементами:

.. warning::
Currently, Kafka does not support :ref:`strategy`. You can only read the **whole** topic.

поправил

@dolfinus dolfinus merged commit 0bc054d into develop Jan 29, 2024
41 of 42 checks passed
@dolfinus dolfinus deleted the feature/DOP-11752 branch January 29, 2024 10:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants