Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate sendgrid to config-based #15257

Merged
merged 73 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 65 commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
897ecc5
fix spec
girarda Aug 3, 2022
6099e22
read records from lists stream
girarda Aug 3, 2022
aa31326
campaigns
girarda Aug 3, 2022
a05e5b0
contacts
girarda Aug 3, 2022
b193739
stats_automations
girarda Aug 3, 2022
0cb7db5
segments
girarda Aug 3, 2022
778e3c3
single_sends
girarda Aug 3, 2022
2675e7d
templates
girarda Aug 3, 2022
1d23578
suppressions_global
girarda Aug 3, 2022
d9ec5b3
suppression groups
girarda Aug 3, 2022
a1b2e08
suppression group memebers
girarda Aug 3, 2022
9c8d83f
blocks
girarda Aug 3, 2022
018d165
bounces
girarda Aug 3, 2022
6f1fbbd
invalid emails and spam reports
girarda Aug 3, 2022
7342626
bump cdk version
girarda Aug 3, 2022
b5e5ba6
fix paths
girarda Aug 3, 2022
842f515
bump cdk version
girarda Aug 4, 2022
c6adde7
Merge branch 'master' into alex/configbasedsendgrid
girarda Aug 5, 2022
2885ab2
only define cursor field in one place
girarda Aug 5, 2022
53ebb79
move to definitions
girarda Aug 5, 2022
8e32764
move bounces inside the streams array
girarda Aug 5, 2022
01a300e
move all streams within the streams array
girarda Aug 5, 2022
5f0a51e
Merge branch 'master' into alex/configbasedsendgrid
girarda Aug 9, 2022
992f3a8
Merge branch 'master' into alex/configbasedsendgrid
girarda Aug 9, 2022
936ccc6
update sendgrid config
girarda Aug 9, 2022
9ad8824
fix
girarda Aug 9, 2022
4b73912
derp
girarda Aug 9, 2022
028bdfb
rename field
girarda Aug 9, 2022
3c76c5a
fix parse
girarda Aug 9, 2022
b9277d0
Revert "fix parse"
girarda Aug 9, 2022
a584b84
fix parse timestamp
girarda Aug 9, 2022
9957303
extract datetime parser
girarda Aug 9, 2022
5f8e7d5
remove print
girarda Aug 9, 2022
72880ce
use parser
girarda Aug 9, 2022
4cd75ea
top level docstring
girarda Aug 9, 2022
28f0588
rename variable
girarda Aug 9, 2022
99caa58
Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsend…
girarda Aug 9, 2022
8d55afa
Revert "Merge branch 'alex/datetimeFormatTimestamp' into alex/configb…
girarda Aug 9, 2022
9b70a3b
Revert "Revert "Merge branch 'alex/datetimeFormatTimestamp' into alex…
girarda Aug 9, 2022
4a9876d
Revert "Revert "Revert "Merge branch 'alex/datetimeFormatTimestamp' i…
girarda Aug 9, 2022
016cb69
do not use timestamp()
girarda Aug 10, 2022
b984d20
Revert "do not use timestamp()"
girarda Aug 10, 2022
42af817
Handle extracting no records from root
girarda Aug 10, 2022
5f0bcf2
Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsend…
girarda Aug 10, 2022
f9c12a0
Merge branch 'master' into alex/configbasedsendgrid
girarda Aug 10, 2022
b36596c
bump cdk version
girarda Aug 10, 2022
03ed92e
handle empty record
girarda Aug 10, 2022
1fcd68f
Merge branch 'master' into alex/configbasedsendgrid
girarda Aug 11, 2022
965b2cb
Merge branch 'master' into alex/configbasedsendgrid
girarda Aug 11, 2022
53f3e68
update unit test
girarda Aug 11, 2022
30920d2
messages stream needs a different slicer
girarda Aug 11, 2022
52d28fe
handle missing keys
girarda Aug 11, 2022
e10d6b9
Update unit test
girarda Aug 11, 2022
7975a89
Merge branch 'master' into alex/selectNoRecords
girarda Aug 11, 2022
b582e87
record extractor interface
girarda Aug 11, 2022
1b34c19
dpath extractor
girarda Aug 11, 2022
3dadc32
docstring
girarda Aug 11, 2022
ac92374
Merge branch 'alex/selectNoRecords' into alex/configbasedsendgrid
girarda Aug 11, 2022
f94a4a4
use dpath
girarda Aug 11, 2022
6708c46
Revert "Merge branch 'alex/selectNoRecords' into alex/configbasedsend…
girarda Aug 11, 2022
b0a2d2b
Merge branch 'master' into alex/configbasedsendgrid
girarda Aug 11, 2022
ec94936
bump cdk version
girarda Aug 11, 2022
ffefa97
use dpath
girarda Aug 11, 2022
8d10fa9
missing cursor field
girarda Aug 11, 2022
3256a2f
Merge branch 'master' into alex/configbasedsendgrid
girarda Aug 11, 2022
421f72e
start DRYing the config
girarda Aug 12, 2022
b70e518
delete more cruff
girarda Aug 12, 2022
a5c2734
DRY
girarda Aug 12, 2022
971ab82
get start time from config
girarda Aug 12, 2022
28fec5e
delete custom streams
girarda Aug 12, 2022
2e103f3
step=30days
girarda Aug 12, 2022
5ef1817
bump version
girarda Aug 12, 2022
04be9c4
auto-bump connector version [ci skip]
octavia-squidington-iii Aug 12, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-sendgrid/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=["airbyte-cdk~=0.1", "backoff", "requests", "pytest==6.1.2", "pytest-mock"],
install_requires=["airbyte-cdk>=0.1.74", "backoff", "requests", "pytest==6.1.2", "pytest-mock"],
package_data={"": ["*.json", "schemas/*.json"]},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,342 @@
definitions:
page_size: 50

schema_loader:
type: JsonSchema
file_path: "./source_sendgrid/schemas/{{ options.name }}.json"
result_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_pointer:
- "result"
results_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_pointer:
- "results"
selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_pointer: []
requester:
type: HttpRequester
name: "{{ options['name'] }}"
url_base: "https://api.sendgrid.com"
http_method: "GET"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
cursor_paginator:
type: LimitPaginator
url_base: "*ref(definitions.requester.url_base)"
page_size: "*ref(definitions.page_size)"
limit_option:
inject_into: "request_parameter"
field_name: "page_size"
page_token_option:
inject_into: "path"
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no stop condition?

offset_paginator:
type: LimitPaginator
$options:
url_base: "*ref(definitions.requester.url_base)"
page_size: "*ref(definitions.page_size)"
limit_option:
inject_into: "request_parameter"
field_name: "limit"
page_token_option:
inject_into: "request_parameter"
field_name: "offset"
pagination_strategy:
type: "OffsetIncrement"
retriever:
type: SimpleRetriever
name: "{{ options['name'] }}"
primary_key: "{{ options['primary_key'] }}"
stream_slicer:
type: "DatetimeStreamSlicer"
start_datetime:
datetime: "2017-01-01T00:00:00.0Z"
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
end_datetime:
datetime: "2022-08-01T00:00:00.0Z"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't hardcode this right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
step: "10000d"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like a pretty huge step size no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to 30d

cursor_field: "{{ options.stream_cursor_field }}"
start_time_option:
field_name: "start_time"
inject_into: "request_parameter"
end_time_option:
field_name: "end_time"
inject_into: "request_parameter"
datetime_format: "%s"
messages_stream_slicer:
type: "DatetimeStreamSlicer"
start_datetime:
datetime: "2017-01-01T00:00:00.0Z"
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
end_datetime:
datetime: "2022-08-01T00:00:00.0Z"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
step: "10000d"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

cursor_field: "{{ options.stream_cursor_field }}"
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"

streams:
- type: DeclarativeStream
$options:
name: "lists"
primary_key: "id"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.result_selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/marketing/lists"
paginator:
$ref: "*ref(definitions.cursor_paginator)"
- type: DeclarativeStream
$options:
name: "campaigns"
primary_key: "id"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.result_selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/marketing/campaigns"
paginator:
$ref: "*ref(definitions.cursor_paginator)"
- type: DeclarativeStream
$options:
name: "contacts"
primary_key: "id"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.result_selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/marketing/contacts"
paginator:
type: "NoPagination"
- type: DeclarativeStream
$options:
name: "stats_automations"
primary_key: "id"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.results_selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/marketing/stats/automations"
paginator:
$ref: "*ref(definitions.cursor_paginator)"
- type: DeclarativeStream
$options:
name: "segments"
primary_key: "id"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.results_selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/marketing/segments"
paginator:
type: "NoPagination"
- type: DeclarativeStream
$options:
name: "single_sends"
primary_key: "id"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.results_selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/marketing/stats/singlesends"
paginator:
$ref: "*ref(definitions.cursor_paginator)"
- type: DeclarativeStream
$options:
name: "templates"
primary_key: "id"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.result_selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/templates"
request_options_provider:
request_parameters:
generations: "legacy,dynamic"
paginator:
$ref: "*ref(definitions.cursor_paginator)"
- type: DeclarativeStream
$options:
name: "bounces"
primary_key: "email"
stream_cursor_field: "created"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/suppression/bounces"
paginator:
$ref: "*ref(definitions.offset_paginator)"
stream_slicer:
$ref: "*ref(definitions.stream_slicer)"
- type: DeclarativeStream
$options:
name: "global_suppressions"
primary_key: "email"
stream_cursor_field: "created"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/suppression/unsubscribes"
paginator:
$ref: "*ref(definitions.offset_paginator)"
stream_slicer:
$ref: "*ref(definitions.stream_slicer)"
- type: DeclarativeStream
$options:
name: "blocks"
primary_key: "email"
stream_cursor_field: "created"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/suppression/blocks"
paginator:
$ref: "*ref(definitions.offset_paginator)"
stream_slicer:
$ref: "*ref(definitions.stream_slicer)"
- type: DeclarativeStream
$options:
name: "suppression_groups"
primary_key: "id"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/asm/groups"
paginator:
type: "NoPagination"
- type: DeclarativeStream
$options:
name: "suppression_group_members"
primary_key: "group_id"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/asm/suppressions"
paginator:
$ref: "*ref(definitions.offset_paginator)"
- type: DeclarativeStream
$options:
name: "invalid_emails"
primary_key: "email"
stream_cursor_field: "created"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/suppression/invalid_emails"
paginator:
$ref: "*ref(definitions.offset_paginator)"
stream_slicer:
$ref: "*ref(definitions.stream_slicer)"
- type: DeclarativeStream
$options:
name: "spam_reports"
primary_key: "email"
stream_cursor_field: "created"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/suppression/spam_reports"

paginator:
$ref: "*ref(definitions.offset_paginator)"
stream_slicer:
$ref: "*ref(definitions.stream_slicer)"
- type: DeclarativeStream
$options:
name: "messages"
primary_key: "msg_id"
stream_cursor_field: "last_event_time"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
$ref: "*ref(definitions.selector)"
requester:
$ref: "*ref(definitions.requester)"
path: "/v3/messages"
request_options_provider:
request_parameters:
limit: 1000
query: 'last_event_time BETWEEN TIMESTAMP "{{stream_slice.start_time}}" AND TIMESTAMP "{{stream_slice.end_time}}"'
stream_slicer:
$ref: "*ref(definitions.messages_stream_slicer)"
check:
type: CheckStream
stream_names: ["lists"]
Loading