Skip to content

Commit

Permalink
Add improvements to "Building a connector the hard way" (#19093)
Browse files Browse the repository at this point in the history
* Add improvements to "Building a connector the hard way"

* add log_error() formatting to pass SAT tests

* Update to new version of acceptance-test-config.yml

* Edit tutorial directory to match tutorial

* Change permissions on acceptance-test-docker.sh

This reverts commit 40b2d98.
  • Loading branch information
erohmensing authored and akashkulk committed Nov 17, 2022
1 parent 2928869 commit 4eee33a
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 36 deletions.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
def read(config, catalog):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)

# Find the stock_prices stream if it is present in the input catalog
Expand All @@ -43,22 +43,22 @@ def read(config, catalog):
stock_prices_stream = configured_stream

if stock_prices_stream is None:
log("No streams selected")
log_error("No streams selected")
return

# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
if stock_prices_stream["sync_mode"] != "full_refresh":
log("This connector only supports full refresh syncs! (for now)")
log_error("This connector only supports full refresh syncs! (for now)")
sys.exit(1)

# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"])
if response.status_code != 200:
# In a real scenario we'd handle this error better :)
log("Failure occurred when calling Polygon.io API")
log_error("Failure occurred when calling Polygon.io API")
sys.exit(1)
else:
# Stock prices are returned sorted by by date in ascending order
# Stock prices are returned sorted by date in ascending order
# We want to output them one by one as AirbyteMessages
results = response.json()["results"]
for result in results:
Expand All @@ -83,7 +83,7 @@ def _call_api(ticker, token):
def check(config):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)
else:
# Validate input configuration by attempting to get the daily closing prices of the input stock ticker
Expand All @@ -107,6 +107,12 @@ def log(message):
print(json.dumps(log_json))


def log_error(error_message):
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
print(json.dumps(log_json))


def discover():
catalog = {
"streams": [{
Expand Down
100 changes: 70 additions & 30 deletions docs/connector-development/tutorials/build-a-connector-the-hard-way.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ README.md
acceptance-test-config.yml
acceptance-test-docker.sh
build.gradle
source.py
spec.json
```

Expand Down Expand Up @@ -261,6 +262,9 @@ Then we'll add the `check_method`:

```python
import requests
import datetime
from datetime import date
from datetime import timedelta

def _call_api(ticker, token):
today = date.today()
Expand Down Expand Up @@ -314,6 +318,12 @@ elif command == "check":
check(config)
```

Then we need to update our list of available commands:

```python
log("Invalid command. Allowable commands: [spec, check]")
```

This results in the following `run` method.

```python
Expand Down Expand Up @@ -349,7 +359,9 @@ def run(args):
sys.exit(0)
```

and that should be it. Let's test our new method:
and that should be it.

Let's test our new method:

```bash
$ python source.py check --config secrets/valid_config.json
Expand Down Expand Up @@ -416,6 +428,12 @@ elif command == "discover":
discover()
```

We need to update our list of available commands:

```python
log("Invalid command. Allowable commands: [spec, check, discover]")
```

You may be wondering why `config` is a required input to `discover` if it's not used. This is done for consistency: the Airbyte Specification requires `--config` as an input to `discover` because many sources require it \(e.g: to discover the tables available in a Postgres database, you must supply a password\). So instead of guessing whether the flag is required depending on the connector, we always assume it is required, and the connector can choose whether to use it.

The full run method is now below:
Expand Down Expand Up @@ -526,14 +544,16 @@ First, let's create a configured catalog `fullrefresh_configured_catalog.json` t
Then we'll define the `read` method in `source.py`:

```python
import datetime
from datetime import date
from datetime import timedelta
def log_error(error_message):
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
print(json.dumps(log_json))


def read(config, catalog):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)

# Find the stock_prices stream if it is present in the input catalog
Expand All @@ -543,19 +563,19 @@ def read(config, catalog):
stock_prices_stream = configured_stream

if stock_prices_stream is None:
log("No streams selected")
log_error("No stream selected.")
return

# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
if stock_prices_stream["sync_mode"] != "full_refresh":
log("This connector only supports full refresh syncs! (for now)")
log_error("This connector only supports full refresh syncs! (for now)")
sys.exit(1)

# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"])
if response.status_code != 200:
# In a real scenario we'd handle this error better :)
log("Failure occurred when calling Polygon.io API")
log_error("Failure occurred when calling Polygon.io API")
sys.exit(1)
else:
# Stock prices are returned sorted by date in ascending order
Expand All @@ -568,6 +588,8 @@ def read(config, catalog):
print(json.dumps(output_message))
```

Note we've added a `log_error()` function to simplify formatting error messages from within connector functions as [`AirbyteTraceMessage`](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#airbytetracemessage)s, specifically `AirbyteErrorTraceMessage`s.

After doing some input validation, the code above calls the API to obtain daily prices for the input stock ticker, then outputs the prices. As always, our output is formatted according to the Airbyte Specification. Let's update our args parser with the following blocks:

```python
Expand All @@ -590,6 +612,12 @@ elif command == "read":
read(config, configured_catalog)
```

and:

```python
log("Invalid command. Allowable commands: [spec, check, discover, read]")
```

this yields the following `run` method:

```python
Expand Down Expand Up @@ -696,7 +724,7 @@ from datetime import timedelta
def read(config, catalog):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)

# Find the stock_prices stream if it is present in the input catalog
Expand All @@ -706,19 +734,19 @@ def read(config, catalog):
stock_prices_stream = configured_stream

if stock_prices_stream is None:
log("No streams selected")
log_error("No streams selected")
return

# We only support full_refresh at the moment, so verify the user didn't ask for another sync mode
if stock_prices_stream["sync_mode"] != "full_refresh":
log("This connector only supports full refresh syncs! (for now)")
log_error("This connector only supports full refresh syncs! (for now)")
sys.exit(1)

# If we've made it this far, all the configuration is good and we can pull the last 7 days of market data
response = _call_api(ticker=config["stock_ticker"], token = config["api_key"])
if response.status_code != 200:
# In a real scenario we'd handle this error better :)
log("Failure occurred when calling Polygon.io API")
log_error("Failure occurred when calling Polygon.io API")
sys.exit(1)
else:
# Stock prices are returned sorted by date in ascending order
Expand Down Expand Up @@ -746,7 +774,7 @@ def _call_api(ticker, token):
def check(config):
# Assert required configuration was provided
if "api_key" not in config or "stock_ticker" not in config:
log("Input config must contain the properties 'api_key' and 'stock_ticker'")
log_error("Input config must contain the properties 'api_key' and 'stock_ticker'")
sys.exit(1)
else:
# Validate input configuration by attempting to get the daily closing prices of the input stock ticker
Expand All @@ -770,6 +798,12 @@ def log(message):
print(json.dumps(log_json))


def log_error(error_message):
current_time_in_ms = int(datetime.datetime.now().timestamp()) * 1000
log_json = {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": current_time_in_ms, "error": {"message": error_message}}}
print(json.dumps(log_json))


def discover():
catalog = {
"streams": [{
Expand Down Expand Up @@ -915,7 +949,7 @@ Then we can run the image using:
docker run airbyte/source-stock-ticker-api:dev
```

to run any of our commands, we'll need to mount all the inputs into the Docker container first, then refer to their _mounted_ paths when invoking the connector. For example, we'd run `check` or `read` as follows:
To run any of our commands, we'll need to mount all the inputs into the Docker container first, then refer to their _mounted_ paths when invoking the connector. This allows the connector to access your secrets without having to build them into the container. For example, we'd run `check` or `read` as follows:

```bash
$ docker run airbyte/source-stock-ticker-api:dev spec
Expand Down Expand Up @@ -948,25 +982,31 @@ The code generator should have already generated a YAML file which configures th
# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-stock-ticker-api:dev
tests:
spec:
- spec_path: "spec.json"
config_path: "secrets/valid_config.json"
connection:
- config_path: "secrets/valid_config.json"
status: "succeed"
- config_path: "secrets/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/valid_config.json"
acceptance_tests:
basic_read:
- config_path: "secrets/valid_config.json"
configured_catalog_path: "fullrefresh_configured_catalog.json"
tests:
- config_path: secrets/valid_config.json
configured_catalog_path: fullrefresh_configured_catalog.json
empty_streams: []
connection:
tests:
- config_path: secrets/valid_config.json
status: succeed
- config_path: secrets/invalid_config.json
status: failed
discovery:
tests:
- config_path: secrets/valid_config.json
full_refresh:
- config_path: "secrets/valid_config.json"
configured_catalog_path: "fullrefresh_configured_catalog.json"
tests:
- config_path: secrets/valid_config.json
configured_catalog_path: fullrefresh_configured_catalog.json
spec:
tests:
- config_path: secrets/valid_config.json
spec_path: spec.json
# incremental: # TODO uncomment this once you implement incremental sync in part 2 of the tutorial
# tests:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
Expand Down Expand Up @@ -1058,7 +1098,7 @@ airbyte-server | Version: dev
airbyte-server |
```

After you see the above banner printed out in the terminal window where you are running `docker-compose up`, visit [http://localhost:8000](http://localhost:8000) in your browser.
After you see the above banner printed out in the terminal window where you are running `docker-compose up`, visit [http://localhost:8000](http://localhost:8000) in your browser and log in with the default credentials: username `airbyte` and password `password`.

If this is the first time using the Airbyte UI, then you will be prompted to go through a first-time wizard. To skip it, click the "Skip Onboarding" button.

Expand Down

0 comments on commit 4eee33a

Please sign in to comment.