Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lowcode CDK: Stream Slicer "step" field can't resolve config dict #18767

Closed
andnig opened this issue Nov 1, 2022 · 1 comment
Closed

Lowcode CDK: Stream Slicer "step" field can't resolve config dict #18767

andnig opened this issue Nov 1, 2022 · 1 comment

Comments

@andnig
Copy link
Contributor

andnig commented Nov 1, 2022

Environment

  • Airbyte version: 0.40.17
  • OS Version / Instance: Ubuntu 22.04
  • Deployment: local development
  • Step where error happened: Develop lowcode connector

Current Behavior

The "step" configuration in a DatetimeStreamSlicer currently only seems to take "hardcoded" string values or references - but an assertion fails somewhere, if we try to use a config value for the step.

Both the check and the read commands fail - but the read command is more verbose in how it fails.

Example:
source.yaml

  stream_slicer:
    type: "DatetimeStreamSlicer"
    start_datetime:
      datetime: "{{ config['start_date'] }}"
      datetime_format: "%Y-%m-%dT%H:%M:%SZ"
    end_datetime:
      datetime: "{{ now_utc() }}"
      datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
    step: "{{ config['slice_range'] }}"
    datetime_format: "%s"
    cursor_field: "{{ options['stream_cursor_field'] }}"

secrets/config.json

{"access_token": "secret", "backend_url": "secret", "dataset_id": "8f418098-ca28-4df5-9498-0df9fe78eda7", "start_date": "2019-10-21T00:00:00Z", "slice_range": "1d"}

The exception of python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --debug is as follows:

{"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": 1667298757214.9302, "error": {"message": "Something went wrong in the connector. See the logs for more details.", "internal_message": "", "stack_trace": "Traceback (most recent call last):
  File \"/home/andreas/github/airbyte/airbyte-integrations/connectors/source-senseforce/main.py\", line 13, in <module>
    launch(source, sys.argv[1:])
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 131, in launch
    for message in source_entrypoint.run(parsed_args):
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 122, in run
    for message in generator:
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 100, in read
    stream_instances = {s.name: s for s in self.streams(config)}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in streams
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in <listcomp>
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 209, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 200, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/create_partial.py\", line 56, in newfunc
    ret = func(*args, *fargs, **dynamic_args)
  File \"<string>\", line 16, in __init__
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 82, in __post_init__
    self._step = self._parse_timedelta(self.step)
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 195, in _parse_timedelta
    assert parts is not None
AssertionError
", "failure_type": "system_error"}}}

Expected Behavior

We can use the config to set the "step" variables in the DateTimeStreamSlicer without getting the assertionError

Logs

{"type": "DEBUG", "message": "Debug logs enabled", "data": {}}
{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceSenseforce"}}
{"type": "DEBUG", "message": "parsed YAML into declarative source", "data": {"parsed_config": "{\"version\": \"0.1.0\", \"definitions\": {\"step\": \"{{ config['slice_range'] }}\", \"selector\": {\"extractor\": {\"field_pointer\": []}}, \"requester\": {\"url_base\": \"{{ config['backend_url'] }}\", \"http_method\": \"POST\", \"request_options_provider\": {\"request_body_data\": \"[{\\\"clause\\\": {\\\"type\\\": \\\"timestamp\\\", \\\"operator\\\": 10, \\\"parameters\\\": \
    [{\\\"value\\\": {{ stream_slice['start_time'] | int * 1000 }} or 0 },\
     {\\\"value\\\": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }\
    ]\
    \
  }, \\\"columnName\\\": \\\"Timestamp\\\"}]/\
\", \"request_headers\": {\"Content-Type\": \"application/json\"}}, \"authenticator\": {\"type\": \"BearerAuthenticator\", \"api_token\": \"{{ config['access_token'] }}\"}}, \"stream_slicer\": {\"type\": \"DatetimeStreamSlicer\", \"start_datetime\": {\"datetime\": \"{{ config['start_date'] }}\", \"datetime_format\": \"%Y-%m-%dT%H:%M:%SZ\"}, \"end_datetime\": {\"datetime\": \"{{ now_utc() }}\", \"datetime_format\": \"%Y-%m-%d %H:%M:%S.%f+00:00\"}, \"step\": \"{{ config['slice_range'] }}\", \"datetime_format\": \"%s\", \"cursor_field\": \"{{ options['stream_cursor_field'] }}\"}, \"retriever\": {\"record_selector\": {\"extractor\": {\"field_pointer\": []}}, \"paginator\": {\"type\": \"DefaultPaginator\", \"url_base\": \"{{ config['backend_url'] }}\", \"page_size_option\": {\"inject_into\": \"request_parameter\", \"field_name\": \"limit\"}, \"pagination_strategy\": {\"type\": \"OffsetIncrement\", \"page_size\": 10}, \"page_token_option\": {\"field_name\": \"offset\", \"inject_into\": \"request_parameter\"}}, \"stream_slicer\": {\"type\": \"DatetimeStreamSlicer\", \"start_datetime\": {\"datetime\": \"{{ config['start_date'] }}\", \"datetime_format\": \"%Y-%m-%dT%H:%M:%SZ\"}, \"end_datetime\": {\"datetime\": \"{{ now_utc() }}\", \"datetime_format\": \"%Y-%m-%d %H:%M:%S.%f+00:00\"}, \"step\": \"{{ config['slice_range'] }}\", \"datetime_format\": \"%s\", \"cursor_field\": \"{{ options['stream_cursor_field'] }}\"}, \"requester\": {\"url_base\": \"{{ config['backend_url'] }}\", \"http_method\": \"POST\", \"request_options_provider\": {\"request_body_data\": \"[{\\\"clause\\\": {\\\"type\\\": \\\"timestamp\\\", \\\"operator\\\": 10, \\\"parameters\\\": \
    [{\\\"value\\\": {{ stream_slice['start_time'] | int * 1000 }} or 0 },\
     {\\\"value\\\": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }\
    ]\
    \
  }, \\\"columnName\\\": \\\"Timestamp\\\"}]/\
\", \"request_headers\": {\"Content-Type\": \"application/json\"}}, \"authenticator\": {\"type\": \"BearerAuthenticator\", \"api_token\": \"{{ config['access_token'] }}\"}}}, \"base_stream\": {\"retriever\": {\"record_selector\": {\"extractor\": {\"field_pointer\": []}}, \"paginator\": {\"type\": \"DefaultPaginator\", \"url_base\": \"{{ config['backend_url'] }}\", \"page_size_option\": {\"inject_into\": \"request_parameter\", \"field_name\": \"limit\"}, \"pagination_strategy\": {\"type\": \"OffsetIncrement\", \"page_size\": 10}, \"page_token_option\": {\"field_name\": \"offset\", \"inject_into\": \"request_parameter\"}}, \"stream_slicer\": {\"type\": \"DatetimeStreamSlicer\", \"start_datetime\": {\"datetime\": \"{{ config['start_date'] }}\", \"datetime_format\": \"%Y-%m-%dT%H:%M:%SZ\"}, \"end_datetime\": {\"datetime\": \"{{ now_utc() }}\", \"datetime_format\": \"%Y-%m-%d %H:%M:%S.%f+00:00\"}, \"step\": \"{{ config['slice_range'] }}\", \"datetime_format\": \"%s\", \"cursor_field\": \"{{ options['stream_cursor_field'] }}\"}, \"requester\": {\"url_base\": \"{{ config['backend_url'] }}\", \"http_method\": \"POST\", \"request_options_provider\": {\"request_body_data\": \"[{\\\"clause\\\": {\\\"type\\\": \\\"timestamp\\\", \\\"operator\\\": 10, \\\"parameters\\\": \
    [{\\\"value\\\": {{ stream_slice['start_time'] | int * 1000 }} or 0 },\
     {\\\"value\\\": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }\
    ]\
    \
  }, \\\"columnName\\\": \\\"Timestamp\\\"}]/\
\", \"request_headers\": {\"Content-Type\": \"application/json\"}}, \"authenticator\": {\"type\": \"BearerAuthenticator\", \"api_token\": \"{{ config['access_token'] }}\"}}}}, \"dataset_stream\": {\"retriever\": {\"record_selector\": {\"extractor\": {\"field_pointer\": []}}, \"paginator\": {\"type\": \"DefaultPaginator\", \"url_base\": \"{{ config['backend_url'] }}\", \"page_size_option\": {\"inject_into\": \"request_parameter\", \"field_name\": \"limit\"}, \"pagination_strategy\": {\"type\": \"OffsetIncrement\", \"page_size\": 10}, \"page_token_option\": {\"field_name\": \"offset\", \"inject_into\": \"request_parameter\"}}, \"stream_slicer\": {\"type\": \"DatetimeStreamSlicer\", \"start_datetime\": {\"datetime\": \"{{ config['start_date'] }}\", \"datetime_format\": \"%Y-%m-%dT%H:%M:%SZ\"}, \"end_datetime\": {\"datetime\": \"{{ now_utc() }}\", \"datetime_format\": \"%Y-%m-%d %H:%M:%S.%f+00:00\"}, \"step\": \"{{ config['slice_range'] }}\", \"datetime_format\": \"%s\", \"cursor_field\": \"{{ options['stream_cursor_field'] }}\"}, \"requester\": {\"url_base\": \"{{ config['backend_url'] }}\", \"http_method\": \"POST\", \"request_options_provider\": {\"request_body_data\": \"[{\\\"clause\\\": {\\\"type\\\": \\\"timestamp\\\", \\\"operator\\\": 10, \\\"parameters\\\": \
    [{\\\"value\\\": {{ stream_slice['start_time'] | int * 1000 }} or 0 },\
     {\\\"value\\\": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }\
    ]\
    \
  }, \\\"columnName\\\": \\\"Timestamp\\\"}]/\
\", \"request_headers\": {\"Content-Type\": \"application/json\"}}, \"authenticator\": {\"type\": \"BearerAuthenticator\", \"api_token\": \"{{ config['access_token'] }}\"}}}, \"$options\": {\"name\": \"dataset\", \"primary_key\": [\"Timestamp\", \"Thing\", \"Id\"], \"path\": \"/api/dataset/execute/8f418098-ca28-4df5-9498-0df9fe78eda7\", \"stream_cursor_field\": \"timestamp\"}, \"class_name\": \"airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream\"}}, \"streams\": [{\"retriever\": {\"record_selector\": {\"extractor\": {\"field_pointer\": []}}, \"paginator\": {\"type\": \"DefaultPaginator\", \"url_base\": \"{{ config['backend_url'] }}\", \"page_size_option\": {\"inject_into\": \"request_parameter\", \"field_name\": \"limit\"}, \"pagination_strategy\": {\"type\": \"OffsetIncrement\", \"page_size\": 10}, \"page_token_option\": {\"field_name\": \"offset\", \"inject_into\": \"request_parameter\"}}, \"stream_slicer\": {\"type\": \"DatetimeStreamSlicer\", \"start_datetime\": {\"datetime\": \"{{ config['start_date'] }}\", \"datetime_format\": \"%Y-%m-%dT%H:%M:%SZ\"}, \"end_datetime\": {\"datetime\": \"{{ now_utc() }}\", \"datetime_format\": \"%Y-%m-%d %H:%M:%S.%f+00:00\"}, \"step\": \"{{ config['slice_range'] }}\", \"datetime_format\": \"%s\", \"cursor_field\": \"{{ options['stream_cursor_field'] }}\"}, \"requester\": {\"url_base\": \"{{ config['backend_url'] }}\", \"http_method\": \"POST\", \"request_options_provider\": {\"request_body_data\": \"[{\\\"clause\\\": {\\\"type\\\": \\\"timestamp\\\", \\\"operator\\\": 10, \\\"parameters\\\": \
    [{\\\"value\\\": {{ stream_slice['start_time'] | int * 1000 }} or 0 },\
     {\\\"value\\\": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }\
    ]\
    \
  }, \\\"columnName\\\": \\\"Timestamp\\\"}]/\
\", \"request_headers\": {\"Content-Type\": \"application/json\"}}, \"authenticator\": {\"type\": \"BearerAuthenticator\", \"api_token\": \"{{ config['access_token'] }}\"}}}, \"$options\": {\"name\": \"dataset\", \"primary_key\": [\"Timestamp\", \"Thing\", \"Id\"], \"path\": \"/api/dataset/execute/8f418098-ca28-4df5-9498-0df9fe78eda7\", \"stream_cursor_field\": \"timestamp\"}, \"class_name\": \"airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream\"}], \"check\": {\"stream_names\": [\"dataset\"]}}", "path_to_yaml_file": "senseforce.yaml", "source_name": "SourceSenseforce"}}
{"type": "LOG", "log": {"level": "FATAL", "message": "
Traceback (most recent call last):
  File \"/home/andreas/github/airbyte/airbyte-integrations/connectors/source-senseforce/main.py\", line 13, in <module>
    launch(source, sys.argv[1:])
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 131, in launch
    for message in source_entrypoint.run(parsed_args):
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 122, in run
    for message in generator:
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 100, in read
    stream_instances = {s.name: s for s in self.streams(config)}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in streams
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in <listcomp>
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 209, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 200, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/create_partial.py\", line 56, in newfunc
    ret = func(*args, *fargs, **dynamic_args)
  File \"<string>\", line 16, in __init__
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 82, in __post_init__
    self._step = self._parse_timedelta(self.step)
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 195, in _parse_timedelta
    assert parts is not None
AssertionError"}}
{"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": 1667298757214.9302, "error": {"message": "Something went wrong in the connector. See the logs for more details.", "internal_message": "", "stack_trace": "Traceback (most recent call last):
  File \"/home/andreas/github/airbyte/airbyte-integrations/connectors/source-senseforce/main.py\", line 13, in <module>
    launch(source, sys.argv[1:])
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 131, in launch
    for message in source_entrypoint.run(parsed_args):
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/entrypoint.py\", line 122, in run
    for message in generator:
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/abstract_source.py\", line 100, in read
    stream_instances = {s.name: s for s in self.streams(config)}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in streams
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/yaml_declarative_source.py\", line 66, in <listcomp>
    source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 209, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 129, in create_component
    return self.build(
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in build
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 146, in <dictcomp>
    updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/parsers/factory.py\", line 200, in _create_subcomponent
    return self.create_component(definition, config, instantiate)()
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/create_partial.py\", line 56, in newfunc
    ret = func(*args, *fargs, **dynamic_args)
  File \"<string>\", line 16, in __init__
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 82, in __post_init__
    self._step = self._parse_timedelta(self.step)
  File \"/home/andreas/miniconda3/envs/airbyte_general/lib/python3.10/site-packages/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py\", line 195, in _parse_timedelta
    assert parts is not None
AssertionError
", "failure_type": "system_error"}}}

Steps to Reproduce

  1. Create a source.yaml and configure it with a DateStreamSlicer (example see above)
  2. Add a {{ config['your-config-for-slice-window'] }} to the "step" field of the stream slicer
  3. Run python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --debug

Note: If I use the exact same example but use step: "1d" instead of the config - dict, everything works as expected.

Example of source.yaml to reproduce

(For an example of config.json - see above)

version: "0.1.0"

definitions:
  step: "{{ config['slice_range'] }}"
  selector:
    extractor:
      field_pointer: []
  requester:
    # url_base: "http://localhost:8080"
    url_base: "{{ config['backend_url'] }}"
    http_method: "POST"
    request_options_provider:
      request_body_data: |
          [{"clause": {"type": "timestamp", "operator": 10, "parameters": 
              [{"value": {{ stream_slice['start_time'] | int * 1000 }} or 0 },
               {"value": {{ stream_slice['end_time'] | int * 1000 + (86400000 - 1) or utc_now() }} }
              ]
              
            }, "columnName": "Timestamp"}]/
      request_headers:
        Content-Type: application/json
    authenticator:
      type: BearerAuthenticator
      api_token: "{{ config['access_token'] }}"
  stream_slicer:
    type: "DatetimeStreamSlicer"
    start_datetime:
      datetime: "{{ config['start_date'] }}"
      datetime_format: "%Y-%m-%dT%H:%M:%SZ"
    end_datetime:
      datetime: "{{ now_utc() }}"
      datetime_format: "%Y-%m-%d %H:%M:%S.%f+00:00"
    step: "1d"
    datetime_format: "%s"
    cursor_field: "{{ options['stream_cursor_field'] }}"
  retriever:
    record_selector:
      $ref: "*ref(definitions.selector)"
    paginator:
      type: DefaultPaginator
      url_base: "*ref(definitions.requester.url_base)"
      page_size_option:
        inject_into: "request_parameter"
        field_name: "limit"
      pagination_strategy:
        type: "OffsetIncrement"
        page_size: 10
      page_token_option:
        field_name: "offset"
        inject_into: "request_parameter"
    stream_slicer:
      $ref: "*ref(definitions.stream_slicer)"
    requester:
      $ref: "*ref(definitions.requester)"
  base_stream:
    retriever:
      $ref: "*ref(definitions.retriever)"
  dataset_stream:
    $ref: "*ref(definitions.base_stream)"
    $options:
      name: "dataset"
      primary_key:
        - "Timestamp"
        - "Thing"
        - "Id"
      path: "/api/dataset/execute/{{ config['dataset_id']}}"
      stream_cursor_field: "timestamp"

streams:
  - "*ref(definitions.dataset_stream)"

check:
  stream_names:
    - "dataset"
@maxi297
Copy link
Contributor

maxi297 commented Jan 31, 2023

Implemented in #21930

@maxi297 maxi297 closed this as completed Jan 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants