Skip to content

Commit

Permalink
[docs] update CDK Tutorial: Python HTTP (#22069)
Browse files Browse the repository at this point in the history
* [docs] update CDK Tutorial: Python HTTP

* Update airbyte-cdk/python/docs/tutorials/cdk-tutorial-python-http/6-read-data.md

Co-authored-by: Sergio Ropero <42538006+sergio-ropero@users.noreply.github.com>

* Code review

---------

Co-authored-by: Sergio Ropero <42538006+sergio-ropero@users.noreply.github.com>
  • Loading branch information
maxi297 and sergio-ropero committed Jan 30, 2023
1 parent c2dcb0e commit 57dffd6
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This is a step-by-step guide for how to create an Airbyte source in Python to re

All the commands below assume that `python` points to a version of python &gt;=3.9.0. On some systems, `python` points to a Python2 installation and `python3` points to Python3. If this is the case on your machine, substitute all `python` commands in this guide with `python3`.

## Exchange Rates API Setup

For this guide we will be making API calls to the Exchange Rates API. In order to generate the API access key that will be used by the new connector, you will have to follow steps on the [Exchange Rates API](https://exchangeratesapi.io/) by signing up for the Free tier plan. Once you have an API access key, you can continue with the guide.

## Checklist

* Step 1: Create the source using the template
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Each connector declares the inputs it needs to read data from the underlying data source. This is the Airbyte Protocol's `spec` operation.

The simplest way to implement this is by creating a `.json` file in `source_<name>/spec.json` which describes your connector's inputs according to the [ConnectorSpecification](https://github.com/airbytehq/airbyte/blob/master/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml#L211) schema. This is a good place to start when developing your source. Using JsonSchema, define what the inputs are \(e.g. username and password\). Here's [an example](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/spec.json) of what the `spec.json` looks like for the Freshdesk API source.
The simplest way to implement this is by creating a `spec.json` file in `source_<name>/spec.json` which describes your connector's inputs according to the [ConnectorSpecification](https://github.com/airbytehq/airbyte/blob/master/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml#L211) schema. This is a good place to start when developing your source. Using JsonSchema, define what the inputs are \(e.g. username and password\). Here's [an example](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/spec.json) of what the `spec.json` looks like for the Freshdesk API source.

For more details on what the spec is, you can read about the Airbyte Protocol [here](https://docs.airbyte.io/understanding-airbyte/airbyte-protocol).

Expand All @@ -17,8 +17,13 @@ Given that we'll pulling currency data for our example source, we'll define the
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Python Http Tutorial Spec",
"type": "object",
"required": ["start_date", "currency_base"],
"required": ["apikey", "start_date", "base"],
"properties": {
"apikey": {
"type": "string",
"description": "API access key used to retrieve data from the Exchange Rates API.",
"airbyte_secret": true
}
"start_date": {
"type": "string",
"description": "Start getting data from that date.",
Expand All @@ -27,7 +32,7 @@ Given that we'll pulling currency data for our example source, we'll define the
},
"base": {
"type": "string",
"examples": ["USD", "EUR"]
"examples": ["USD", "EUR"],
"description": "ISO reference currency. See <a href=\"https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html\">here</a>."
}
}
Expand All @@ -40,8 +45,9 @@ Beside regular parameter there is intenal CDK config that started with '_' chara
* _page_size - for http based streams set number of records for each page. Depends on stream implementation.


In addition to metadata, we define two inputs:
In addition to metadata, we define three inputs:

* `apikey`: The API access key used to authenticate requests to the API
* `start_date`: The beginning date to start tracking currency exchange rates from
* `base`: The currency whose rates we're interested in tracking

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ The second operation in the Airbyte Protocol that we'll implement is the `check`

This operation verifies that the input configuration supplied by the user can be used to connect to the underlying data source. Note that this user-supplied configuration has the values described in the `spec.json` filled in. In other words if the `spec.json` said that the source requires a `username` and `password` the config object might be `{ "username": "airbyte", "password": "password123" }`. You should then implement something that returns a json object reporting, given the credentials in the config, whether we were able to connect to the source.

In order to make requests to the API, we need to specify the access.
In our case, this is a fairly trivial check since the API requires no credentials. Instead, let's verify that the user-input `base` currency is a legitimate currency. In `source.py` we'll find the following autogenerated source:

```python
Expand Down Expand Up @@ -37,11 +38,11 @@ Following the docstring instructions, we'll change the implementation to verify
return True, None
```

Let's test out this implementation by creating two objects: a valid and an invalid config and attempt to give them as input to the connector
Let's test out this implementation by creating two objects: a valid and an invalid config and attempt to give them as input to the connector. For this section, you will need to take the API access key generated earlier and add it to both configs. Because these configs contain secrets, we recommend storing configs which contain secrets in `secrets/config.json` because the `secrets` directory is gitignored by default. For the purpose of this example, a dummy apikey has been setup in `sample_files/config.json`.

```text
echo '{"start_date": "2021-04-01", "base": "USD"}' > sample_files/config.json
echo '{"start_date": "2021-04-01", "base": "BTC"}' > sample_files/invalid_config.json
echo '{"start_date": "2021-04-01", "base": "USD", "apikey": <your_apikey>}' > sample_files/config.json
echo '{"start_date": "2021-04-01", "base": "BTC", "apikey": <your_apikey>}' > sample_files/invalid_config.json
python main.py check --config sample_files/config.json
python main.py check --config sample_files/invalid_config.json
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ We'll begin by creating a stream to represent the data that we're pulling from t

```python
class ExchangeRates(HttpStream):
url_base = "https://api.exchangeratesapi.io/"
url_base = "https://api.apilayer.com/exchangerates_data/"

# Set this as a noop.
primary_key = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ Let's begin by pulling data for the last day's rates by using the `/latest` endp

```python
class ExchangeRates(HttpStream):
url_base = "https://api.exchangeratesapi.io/"
url_base = "https://api.apilayer.com/exchangerates_data/"

primary_key = None

def __init__(self, base: str, **kwargs):
def __init__(self, config: Mapping[str, Any], **kwargs):
super().__init__()
self.base = base
self.base = config['base']
self.apikey = config['apikey']


def path(
Expand All @@ -54,6 +55,12 @@ class ExchangeRates(HttpStream):
# The "/latest" path gives us the latest currency exchange rates
return "latest"

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
# The api requires that we include apikey as a header so we do that in this method
return {'apikey': self.apikey}

def request_params(
self,
stream_state: Mapping[str, Any],
Expand All @@ -80,14 +87,20 @@ class ExchangeRates(HttpStream):
return None
```

This may look big, but that's just because there are lots of \(unused, for now\) parameters in these methods \(those can be hidden with Python's `**kwargs`, but don't worry about it for now\). Really we just added a few lines of "significant" code: 1. Added a constructor `__init__` which stores the `base` currency to query for. 2. `return {'base': self.base}` to add the `?base=<base-value>` query parameter to the request based on the `base` input by the user. 3. `return [response.json()]` to parse the response from the API to match the schema of our schema `.json` file. 4. `return "latest"` to indicate that we want to hit the `/latest` endpoint of the API to get the latest exchange rate data.
This may look big, but that's just because there are lots of \(unused, for now\) parameters in these methods \(those can be hidden with Python's `**kwargs`, but don't worry about it for now\). Really we just added a few lines of "significant" code:

Let's also pass the `base` parameter input by the user to the stream class:
1. Added a constructor `__init__` which stores the `base` currency to query for and the `apikey` used for authentication.
2. `return {'base': self.base}` to add the `?base=<base-value>` query parameter to the request based on the `base` input by the user.
3. `return {'apikey': self.apikey}` to add the header `apikey=<apikey-string>` to the request based on the `apikey` input by the user.
4. `return [response.json()]` to parse the response from the API to match the schema of our schema `.json` file.
5. `return "latest"` to indicate that we want to hit the `/latest` endpoint of the API to get the latest exchange rate data.

Let's also pass the config specified by the user to the stream class:

```python
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = NoAuth()
return [ExchangeRates(authenticator=auth, base=config['base'])]
return [ExchangeRates(authenticator=auth, config=config)]
```

We're now ready to query the API!
Expand Down Expand Up @@ -127,20 +140,21 @@ Let's get the easy parts out of the way and pass the `start_date`:

```python
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = NoAuth()
# Parse the date from a string into a datetime object
start_date = datetime.strptime(config['start_date'], '%Y-%m-%d')
return [ExchangeRates(authenticator=auth, base=config['base'], start_date=start_date)]
auth = NoAuth()
# Parse the date from a string into a datetime object
start_date = datetime.strptime(config['start_date'], '%Y-%m-%d')
return [ExchangeRates(authenticator=auth, config=config, start_date=start_date)]
```

Let's also add this parameter to the constructor and declare the `cursor_field`:

```python
from datetime import datetime, timedelta
from airbyte_cdk.sources.streams import IncrementalMixin


class ExchangeRates(HttpStream, IncrementalMixin):
url_base = "https://api.exchangeratesapi.io/"
url_base = "https://api.apilayer.com/exchangerates_data/"
cursor_field = "date"
primary_key = "date"

Expand Down Expand Up @@ -176,7 +190,7 @@ Update internal state `cursor_value` inside `read_records` method
def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(*args, **kwargs):
if self._cursor_value:
latest_record_date = datetime.strptime(latest_record[self.cursor_field], '%Y-%m-%d')
latest_record_date = datetime.strptime(record[self.cursor_field], '%Y-%m-%d')
self._cursor_value = max(self._cursor_value, latest_record_date)
yield record

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


class ExchangeRates(HttpStream):
url_base = "http://api.exchangeratesapi.io/"
url_base = "https://api.apilayer.com/exchangerates_data/"
cursor_field = "date"
primary_key = "date"

Expand All @@ -34,14 +34,20 @@ def path(
) -> str:
return stream_slice["date"]

def request_headers(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
# The api requires that we include apikey as a header so we do that in this method
return {'apikey': self.apikey}

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
# The api requires that we include access_key as a query param so we do that in this method
return {"access_key": self.access_key}
# The api requires that we include the base currency as a query param so we do that in this method
return {'base': self.base}

def parse_response(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/sources/exchangeratesapi",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/exchangeratesapi",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Python Http Tutorial Spec",
"type": "object",
"required": ["start_date", "base"],
"additionalProperties": false,
"required": ["apikey", "start_date", "base"],
"properties": {
"access_key": {
"title": "Access Key",
"apikey": {
"type": "string",
"description": "API access key used to retrieve data from the Exchange Rates API."
},
"description": "API access key used to retrieve data from the Exchange Rates API.",
"airbyte_secret": true
}
"start_date": {
"title": "Start Date",
"type": "string",
"description": "UTC date and time in the format 2017-01-25. Any data before this date will not be replicated.",
"description": "Start getting data from that date.",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"examples": ["YYYY-MM-DD"]
"examples": ["%Y-%m-%d"]
},
"base": {
"title": "Currency",
"type": "string",
"examples": ["USD", "EUR"],
"description": "ISO reference currency. See <a href=\"https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html\">here</a>."
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ The second operation in the Airbyte Protocol that we'll implement is the `check`

This operation verifies that the input configuration supplied by the user can be used to connect to the underlying data source. Note that this user-supplied configuration has the values described in the `spec.yaml` filled in. In other words if the `spec.yaml` said that the source requires a `username` and `password` the config object might be `{ "username": "airbyte", "password": "password123" }`. You should then implement something that returns a json object reporting, given the credentials in the config, whether we were able to connect to the source.

In order to make requests to the API, we need to specify the access.
In order to make requests to the API, we need to specify the access.
In our case, this is a fairly trivial check since the API requires no credentials. Instead, let's verify that the user-input `base` currency is a legitimate currency. In `source.py` we'll find the following autogenerated source:

```python
Expand Down Expand Up @@ -44,8 +44,8 @@ Let's test out this implementation by creating two objects: a valid and an inval

```text
mkdir sample_files
echo '{"start_date": "2022-04-01", "base": "USD", "access_key": <your_access_key>}' > secrets/config.json
echo '{"start_date": "2022-04-01", "base": "BTC", "access_key": <your_access_key>}' > secrets/invalid_config.json
echo '{"start_date": "2022-04-01", "base": "USD", "apikey": <your_apikey>}' > secrets/config.json
echo '{"start_date": "2022-04-01", "base": "BTC", "apikey": <your_apikey>}' > secrets/invalid_config.json
python main.py check --config secrets/config.json
python main.py check --config secrets/invalid_config.json
```
Expand All @@ -59,3 +59,5 @@ You should see output like the following:
> python main.py check --config secrets/invalid_config.json
{"type": "CONNECTION_STATUS", "connectionStatus": {"status": "FAILED", "message": "Input currency BTC is invalid. Please input one of the following currencies: {'DKK', 'USD', 'CZK', 'BGN', 'JPY'}"}}
```

While developing, we recommend storing configs which contain secrets in `secrets/config.json` because the `secrets` directory is gitignored by default.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ We'll begin by creating a stream to represent the data that we're pulling from t

```python
class ExchangeRates(HttpStream):
url_base = "http://api.exchangeratesapi.io/"
url_base = "https://api.apilayer.com/exchangerates_data/"

# Set this as a noop.
primary_key = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ connectionSpecification:
title: Python Http Tutorial Spec
type: object
required:
- access_key
- apikey
- start_date
- base
properties:
access_key:
apikey:
type: string
description: API access key used to retrieve data from the Exchange Rates API.
airbyte_secret: true
Expand All @@ -39,9 +39,9 @@ connectionSpecification:
description: "ISO reference currency. See <a href=\"https://www.ecb.europa.eu/stats/policy_and_exchange_rates/euro_reference_exchange_rates/html/index.en.html\">here</a>."
```

In addition to metadata, we define two inputs:
In addition to metadata, we define three inputs:

* `access_key`: The API access key used to authenticate requests to the API
* `apikey`: The API access key used to authenticate requests to the API
* `start_date`: The beginning date to start tracking currency exchange rates from
* `base`: The currency whose rates we're interested in tracking

Loading

0 comments on commit 57dffd6

Please sign in to comment.