Skip to content

Latest commit

 

History

History
197 lines (172 loc) · 5.79 KB

7-reading-from-a-subresource.md

File metadata and controls

197 lines (172 loc) · 5.79 KB

Reading from a subresource

In this section, we'll implement a stream for the survey responses stream. This stream structure is a little different because it depends on the surveys stream.

Start by creating a new base class for substreams:

class SurveyMonkeySubstream(HttpStream, ABC):

    def __init__(self, name: str, path: str, primary_key: Union[str, List[str]], parent_stream: Stream, **kwargs: Any) -> None:
        self._name = name
        self._path = path
        self._primary_key = primary_key
        self._parent_stream = parent_stream
        super().__init__(**kwargs)

    url_base = "https://api.surveymonkey.com"

    def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
        links = response.json().get("links", {})
        if "next" in links:
            return {"next_url": links["next"]}
        else:
            return {}

    def request_params(
        self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
    ) -> MutableMapping[str, Any]:
        if next_page_token:
            return urlparse(next_page_token["next_url"]).query
        else:
            return {"per_page": _PAGE_SIZE}

    def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
        yield from response.json().get("data", [])

    @property
    def name(self) -> str:
        return self._name

    def path(
        self,
        *,
        stream_state: Optional[Mapping[str, Any]] = None,
        stream_slice: Optional[Mapping[str, Any]] = None,
        next_page_token: Optional[Mapping[str, Any]] = None,
    ) -> str:
        try:
            return self._path.format(stream_slice=stream_slice)
        except Exception as e:
            raise e

    @property
    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
        return self._primary_key

    def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
        for _slice in self._parent_stream.stream_slices():
            for parent_record in self._parent_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=_slice):
                yield parent_record

This class is similar to the base class, but it does not support incremental reads, and its stream slices are generated by reading records from a parent stream. This is how we'll ensure we always read all survey responses.

Note that using this approach, the connector will checkpoint after reading responses for each survey.

Don't forget to update the streams method to also instantiate the surveys responses stream:

    def streams(self, config: Mapping[str, Any]) -> List[Stream]:
        auth = TokenAuthenticator(token=config["access_token"])
        surveys = SurveyMonkeyBaseStream(name="surveys", path="/v3/surveys", primary_key="id", data_field="data", cursor_field="date_modified", authenticator=auth)
        survey_responses = SurveyMonkeySubstream(name="survey_responses", path="/v3/surveys/{stream_slice[id]}/responses/", primary_key="id", authenticator=auth, parent_stream=surveys)
        return [
            surveys,
            survey_responses
            ]

Before moving on, we'll enable request caching on the surveys stream to avoid fetching the records both for the surveys stream and for the survey responses stream. You can do this by setting the use_cache property to true on the SurveyMonkeyBaseStream class.

    @property
    def use_cache(self) -> bool:
        return True

Now add the stream to the configured catalog:

{
  "streams": [
    {
      "stream": {
        "name": "surveys",
        "json_schema": {},
        "supported_sync_modes": ["full_refresh", "incremental"]
      },
      "sync_mode": "incremental",
      "destination_sync_mode": "overwrite"
    },
    {
      "stream": {
        "name": "survey_responses",
        "json_schema": {},
        "supported_sync_modes": ["full_refresh"]
      },
      "sync_mode": "full_refresh",
      "destination_sync_mode": "overwrite"
    }
  ]
}

and create a new schema file in source_survey_monkey_demo/schemas/survey_responses.json. You can use the connector builder to generate the schema, or paste the one provided below:

{
  "$schema": "http://json-schema.org/schema#",
  "properties": {
    "analyze_url": {
      "type": ["string", "null"]
    },
    "collect_stats": {
      "properties": {
        "status": {
          "properties": {
            "open": {
              "type": ["number", "null"]
            }
          },
          "type": ["object", "null"]
        },
        "total_count": {
          "type": ["number", "null"]
        },
        "type": {
          "properties": {
            "weblink": {
              "type": ["number", "null"]
            }
          },
          "type": ["object", "null"]
        }
      },
      "type": ["object", "null"]
    },
    "date_created": {
      "type": ["string", "null"]
    },
    "date_modified": {
      "type": ["string", "null"]
    },
    "href": {
      "type": ["string", "null"]
    },
    "id": {
      "type": ["string", "null"]
    },
    "language": {
      "type": ["string", "null"]
    },
    "nickname": {
      "type": ["string", "null"]
    },
    "preview": {
      "type": ["string", "null"]
    },
    "question_count": {
      "type": ["number", "null"]
    },
    "response_count": {
      "type": ["number", "null"]
    },
    "title": {
      "type": ["string", "null"]
    }
  },
  "type": "object"
}

You should now be able to read your survey responses:

poetry run source-survey-monkey-demo read --config secrets/config.json --catalog integration_tests/configured_catalog.json

In the next section we'll update the connector so it reads stream slices concurrently.