Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalization not working on "HTTP Request" connector #2459

Closed
meenagangadharan opened this issue Mar 15, 2021 · 27 comments
Closed

Normalization not working on "HTTP Request" connector #2459

meenagangadharan opened this issue Mar 15, 2021 · 27 comments
Labels
type/bug Something isn't working

Comments

@meenagangadharan
Copy link

Expected Behavior

I am loading data from a private REST API to Bigquery. Normalization is enabled. Using the latest version 0.17.1-alpha.
Job completes successfully. The JSON data extracted from the API is expected to be split and loaded into multiple records of the Bigquery table

Current Behavior

The output JSON are loaded as a single record in the data field of Bigquery table.

Logs

catalog.json shows the below content -

docker run -it --rm --volume airbyte_workspace:/data busybox cat /data/7/0/destination_catalog.json
{"streams":[{"stream":{"name":"/usersdata","json_schema":{"type":"object","$schema":"http://json-schema.org/draft-07/schema#","properties":{"data":{"type":"object"}},"additionalProperties":true},"supported_sync_modes":["full_refresh"],"default_cursor_field":[]},"sync_mode":"full_refresh","cursor_field":[]}]}

Steps to Reproduce

  1. Using a private API with Bearer token. Unfortunately cannot share the API. The JSON format looks like below.
    {
    "data": [
    {
    "id": 4971,
    "email": "meena@gmail.com",
    "first_name": "Meena",
    },
    ],
    "links": {
    "first": "#####################",
    "last": "#####################",
    "prev": null,
    "next": "#####################"
    },
    "meta": {
    "current_page": 1,
    "from": 1,
    "last_page": 126,
    "links": [
    {
    "url": null,
    "label": "« Previous",
    "active": false
    },
    {
    "url": "#####################",
    "label": "1",
    "active": true
    },
    {
    "url": "https:#####################",
    "label": "2",
    "active": false
    }
    ],
    "path": "https:#####################,
    "per_page": 50,
    "to": 50,
    "total": 6270
    }
    }

Severity of the bug for you

High

Airbyte Version

Found in the .env file in the root of the project- 0.17.1-alpha

Connector Version (if applicable)

HTTP Request - 0.2.1

Additional context

Airbyte is installed on GCP

@meenagangadharan meenagangadharan added the type/bug Something isn't working label Mar 15, 2021
@cgardens
Copy link
Contributor

cgardens commented Mar 15, 2021

Can you share what error you're getting? You should be able to see it in the logs in the UI.

@ChristopheDuong for visibility.

@ChristopheDuong
Copy link
Contributor

ChristopheDuong commented Mar 15, 2021

The "problem" being reported here is that the catalog being sent by the source is too simple (just object)

Screenshot 2021-03-15 at 21 58 27

So normalization is not able to unnest anything even though the data from the HTTP source contains arrays of rows/nested structures:

The JSON data extracted from the API is expected to be split and loaded into multiple records of the Bigquery table

@meenagangadharan
Copy link
Author

The job completes successfully. Only the normalization is not happening as expected. Multiple JSON records gets loaded as one record.
logs-29-0.txt

@sherifnada
Copy link
Contributor

sherifnada commented Mar 16, 2021

@meenagangadharan this is an implementation decision we made in the connector since it is not possible to know just from the URL what the output schema should be reliably. It seems like some reasonable paths forward to address your request are:

  1. The connector should "guess" what the schema is based on the first few results returned. This is unreliable as there may be missing fields in the probing request.
  2. The user can optionally input a JSON schema declaring what the data types should be.

Would having a solution like 2 fix your problem?

@bashyroger
Copy link

Hi , I am the data architect / PO managing @meenagangadharan .
What I want to make extra clear is that the important issue for isn't the schema as in the columns and their data type. We can solve that problem by creating a view on top of the JSON objects.

The main problem is the fact that individual rows are not being created when importing what is returned.
@meenagangadharan , what I have seen from API's returning data that when imported will create individual rows follows this structure:

[{
	"key1": "value_a",
	"key2": "value_b",
	"key3": "value_d"
}, {
	"key1": "value_x",
	"key2": "value_y",
	"key3": "value_z"
}]

From what I can see, our API does not return data this way?

@meenagangadharan
Copy link
Author

@bashyroger The data for API looks has this structure:
{
"data": [
{
"key1": value1,
"key2": "value2",
......
},
{
"key1": value1,
"key2": "value2",
......
}
],
"links": {
"first": "#####################",
"last": "#####################",
"prev": null,
"next": "#####################"
},
"meta": {
"current_page": 1,
"from": 1,
"last_page": 126,
"links": [
{
"url": null,
"label": "« Previous",
"active": false
},
{
"url": "#####################",
"label": "1",
"active": true
},
{
"url": "https:#####################",
"label": "2",
"active": false
}
],
"path": "https:#####################,
"per_page": 50,
"to": 50,
"total": 6270
}
}

@sherifnada, If we are to proceed with providing a Schema file. Are you expecting in any particular format ? Can you give an example of what it looks like ? It could be helpful.

@meenagangadharan
Copy link
Author

@sherifnada, If we are to proceed with providing a Schema file. Are you expecting in any particular format ? Can you give an example of what it looks like ? It could be helpful.

@ChristopheDuong
Copy link
Contributor

@sherifnada, If we are to proceed with providing a Schema file. Are you expecting in any particular format ? Can you give an example of what it looks like ? It could be helpful.

You have examples of how JSON schema files look like here:
https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-greenhouse/source_greenhouse/schemas/users.json

@meenagangadharan
Copy link
Author

meenagangadharan commented Mar 18, 2021

@sherifnada @ChristopheDuong, The API that I am working on currently has the Open API for Schema specification. I need to convert it to the format specified here. Will there be a new version, where we can optionally update the Schema..? It would be very helpful if we can have the option to consider the Open API metadata.

@sherifnada
Copy link
Contributor

@meenagangadharan We could potentially support OpenAPI and JSON Schema as the format. I'm realizing there is a couple of complications with this approach though:

  1. OpenAPI, while a more natural fit than JSON schema, would be more complicated as it contains many endpoints and we'd need to understand which endpoints we are working with in the response
  2. In the case that the input schema does not match the data being returned, normalized data could produce unexpected results (depending on what the mistmatch is).
  3. This would not provide pagination on your API or remember state etc.

I think we can definitely do this given the caveats, but I want to make sure that these are OK caveats.

A simpler alternative than all this is to split the JSON in the destination using DBT or a custom SQL query. This is more in line with ELT as described here which is what Airbyte is going to be best at. Is this something that would work for you?

LMK which of the two approaches would make the most sense for your usecase. Also, would you be open to contributing this feature to the connector? (it's written in Python )

@meenagangadharan
Copy link
Author

@sherifnada,
Thanks for understanding! I have an Open API metadata for now. Also the open API has multiple end points like you mentioned. So it's OK to have this caveats. I can mention the end point that needs to be extracted for each pipeline. Alternatively I have also converted it to a JSON Schema manually(for one of the endpoints), to see which works better.

I am looking for the data to be split and loaded as multiple rows in Bigquery as the initial step through Airbyte. Further unnesting the JSON record and loading into multiple columns can be done using DBT/SQL.

@bashyroger
Copy link

bashyroger commented Mar 22, 2021

Hi @sherifnada , as @meenagangadharan mentioned it 'is ok' for us to extract the keys / flatten / apply the schema after the initial raw data import.
What is important though, is that we are able to indicate for any API source, wheter that has schema metadata or not, what the prime 'key' / array is that, when looping over that emits individual rows.
This should be covered by any EL importer for a custom API at least.
So as long as it is we are able to configure for any custom connector that does not have the schema metadata to do that automatically, what that key is, we are ok.
In our example, the 'row emitting' key for our API is data.

Not doing this will create problems when importing a large data set as some database (Like BigQuery / Redshift) have a limit on how large a JSON containing cell can be. So being able to least importing multiple rows when schema metadata is not there is paramount...

@sherifnada
Copy link
Contributor

@bashyroger great point about the max row size. We'll prioritize working on this soon.

More generally FWIW, it might be much more straightforward to implement a custom connector for your use case rather than try to fit the HTTP connector to all use cases. it's tricky to implement a generic HTTP source connector that will work for most APIs as they all contain slightly different permutations of features (auth, iteration, pagination, rate limiting, data extraction like in this case, different schemas, etc..) -- it's absolutely the gold standard we want to reach (we want to expose a library which allows you to effectively build a connector via YAML) but it's hard to make an accurate estimate about timeline right now.

@meenagangadharan
Copy link
Author

@sherifnada, @ChristopheDuong, I am working on to build a custom connector. Hope this works out. Looking for any support if required in the process! Thanks.

@sherifnada
Copy link
Contributor

@meenagangadharan please reach out here or to me on Slack -- happy to support however I can!

@meenagangadharan
Copy link
Author

Hi @sherifnada @ChristopheDuong, I created the a connector source-reducept and tested it using Docker and python commands. As mentioned in Step 8 - I am trying to execute the gradle build command using
./gradlew :airbyte-integrations:connectors:source-reducept:integrationTest.

Getting the below error - Can you please help me on this.
Error :
FAILURE: Build failed with an exception.

  • What went wrong:
    Execution failed for task ':airbyte-integrations:connectors:source-reducept:flakeCheck'.

Python call failed: .venv/bin/python -m flake8 . --config /home/meena/code/airbyte/airbyte/tools/python/.flake8

  • Try:
    Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

  • Get more help at https://help.gradle.org

Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.7.1/userguide/command_line_interface.html#sec:command_line_warnings

BUILD FAILED in 7m 32s
38 actionable tasks: 38 executed

@sherifnada
Copy link
Contributor

@meenagangadharan can you run the command with the --scan option enabled and then paste the build scan link that shows up at the end of the logs?

@meenagangadharan
Copy link
Author

Hi @sherifnada, @ChristopheDuong, cc @bashyroger,
I skipped the above gradle build and tried adding the docker image in the UI. The connector shows up in the UI and in the Authentication I pass the bearer token info. Throws an error like below.

image

While testing through python and docker commands , it executed and displayed output records. In the config.json, the hardcoded contents exists. It would be overwritten while passing it from here..?

  • [ ]

@meenagangadharan
Copy link
Author

@meenagangadharan can you run the command with the --scan option enabled and then paste the build scan link that shows up at the end of the logs?

Done. Link is generated as - https://gradle.com/s/pg3j7u5ogwlas

@meenagangadharan
Copy link
Author

Hi @sherifnada,
image

@sherifnada
Copy link
Contributor

sherifnada commented Apr 19, 2021

@meenagangadharan are you happy with the connectors' behavior locally? If so, you can just maintain it on your own -- no need to submit it to Airbyte. In this case there is no need to pass the standard tests. Just publish the connector to a docker repository like dockerhub (or just publish it locally on the node that you are running using docker build) and add it in the UI.

@meenagangadharan
Copy link
Author

Also since I have added it multiple times.. In the source connector it shows many options - how can we remove this ?
image

@sherifnada
Copy link
Contributor

@meenagangadharan Airbyte doesn't currently offer the ability to delete connector definitinos. The best way would be to reset to a clean airbyte instance, or wait until a future version offers that ability. For now you can probably just rename all your connectors to indicate they are the old ones.

@michel-tricot
Copy link
Contributor

If you really need to delete it, you can always remove it manually by deleting the record from airbyte data volume:
2021-04-20_10-16-01

@meenagangadharan
Copy link
Author

I edited the code to remove pagination and kept it simple to extract the data with columns/rows separated
I added the schema in discover, catalog and configured_catalog.json
and extracted the data in a json file.. It extracted the 50 records but considered it as 1 record.
Any other modification to be done to separate them ..?

Earlier with pagination too it extracted all records but end of it, its still placed as 1 record.

@meenagangadharan
Copy link
Author

I am extracting the r['data'] part from the response and keeping it as a "dict" since it is the required format of AirbyteMessage. It throws the below error:
2021-04-22 00:39:09 INFO (/tmp/workspace/55/0) DefaultAirbyteStreamFactory(lambda$create$0):73 - http_method: get
2021-04-22 00:39:09 ERROR (/tmp/workspace/55/0) LineGobbler(voidCall):69 - pydantic.error_wrappers.ValidationError: 1 validation error for AirbyteRecordMessage
2021-04-22 00:39:09 ERROR (/tmp/workspace/55/0) LineGobbler(voidCall):69 - data
2021-04-22 00:39:09 ERROR (/tmp/workspace/55/0) LineGobbler(voidCall):69 - none is not an allowed value (type=type_error.none.not_allowed)
2021-04-22 00:39:10 DEBUG (/tmp/workspace/55/0) DefaultAirbyteSource(close):109 - Closing tap process
2021-04-22 00:39:10 DEBUG (/tmp/workspace/55/0) DefaultAirbyteDestination(close):105 - Closing target process

Code:
for page in range(1, num_pages + 1):
r = self._make_request(config, page).json()
data2 = {k:v for k, v in r.items() if k in ('data')}
data1 = data1.update(data2)
yield AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream=SourceReducept.stream_name, data=data1, emitted_at=int(datetime.now().timestamp()) * 1000
),
)

@davinchia
Copy link
Contributor

@meenagangadharan let's move this conversation to Slack. Any questions will be answered quicker since all team members are there and messages are better surfaced to us.

I'm going to close this ticket for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

7 participants