diff --git a/airbyte-integrations/connectors/source-looker/sample_files/full_configured_catalog.json b/airbyte-integrations/connectors/source-looker/sample_files/full_configured_catalog.json index 01ddec8982b19d..2757edd52e3919 100644 --- a/airbyte-integrations/connectors/source-looker/sample_files/full_configured_catalog.json +++ b/airbyte-integrations/connectors/source-looker/sample_files/full_configured_catalog.json @@ -4765,6 +4765,28 @@ }, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "run_looks", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": true, + "type": "object", + "properties": { + "look_id": { + "properties": { + "additionalProperties": true + }, + "type": ["null", "object"], + "additionalProperties": true + } + } + }, + "supported_sync_modes": ["full_refresh"], + "destination_sync_mode": "overwrite", + "source_defined_cursor": false + } } ] } diff --git a/airbyte-integrations/connectors/source-looker/source_looker/client.py b/airbyte-integrations/connectors/source-looker/source_looker/client.py index baca636b5e6ffb..609dad2a301c18 100644 --- a/airbyte-integrations/connectors/source-looker/source_looker/client.py +++ b/airbyte-integrations/connectors/source-looker/source_looker/client.py @@ -27,15 +27,21 @@ import backoff import requests +from airbyte_protocol import AirbyteStream from base_python import BaseClient from requests.exceptions import ConnectionError from requests.structures import CaseInsensitiveDict - +from typing import Generator class Client(BaseClient): API_VERSION = "3.1" - def __init__(self, domain: str, client_id: str, client_secret: str): + def __init__(self, domain: str, client_id: str, client_secret: str, run_look_ids: list=[]): + """ + Note that we dynamically generate schemas for the stream__run_looks + function because the fields returned depend on the user's look(s) + (entered during configuration). See get_run_look_json_schema(). + """ self.BASE_URL = f"https://{domain}/api/{self.API_VERSION}" self._client_id = client_id self._client_secret = client_secret @@ -45,6 +51,28 @@ def __init__(self, domain: str, client_id: str, client_secret: str): "Content-Type": "application/json", "Accept": "application/json", } + + # Maps Looker types to JSON Schema types for run_look JSON schema + self._field_type_mapping = { + "string": "string", "date_date": "datetime", "date_raw": "datetime", + "date": "datetime", "date_week": "datetime", "date_day_of_week": "string", + "date_day_of_week_index": "integer", "date_month": "string", + "date_month_num": "integer", "date_month_name": "string", + "date_day_of_month": "integer", "date_fiscal_month_num": "integer", + "date_quarter": "string", "date_quarter_of_year": "string", + "date_fiscal_quarter": "string", "date_fiscal_quarter_of_year": "string", + "date_year": "integer", "date_day_of_year": "integer", + "date_week_of_year": "integer", "date_fiscal_year": "integer", + "date_time_of_day": "string", "date_hour": "string", + "date_hour_of_day": "integer", "date_minute": "datetime", + "date_second": "datetime", "date_millisecond": "datetime", + "date_microsecond": "datetime", "number": "number", "int": "integer", + "list": "array", "yesno": "boolean" + } + + # Helpers for the self.stream__run_looks function + self._run_look_explore_fields = {} + self._run_looks, self._run_looks_connect_error = self.get_run_look_info(run_look_ids) self._dashboard_ids = [] self._project_ids = [] @@ -54,6 +82,19 @@ def __init__(self, domain: str, client_id: str, client_secret: str): self._context_metadata_mapping = {"dashboards": [], "folders": [], "homepages": [], "looks": [], "spaces": []} super().__init__() + @property + def streams(self) -> Generator[AirbyteStream, None, None]: + """ + Uses the default streams except for the run_look endpoint, where we have + to generate its JSON Schema on the fly for the given look + """ + + streams = super().streams + for stream in streams: + if len(self._run_looks) > 0 and stream.name == "run_looks": + stream.json_schema = self._get_run_look_json_schema() + yield stream + def get_token(self): headers = CaseInsensitiveDict() headers["Content-Type"] = "application/x-www-form-urlencoded" @@ -67,9 +108,26 @@ def get_token(self): except ConnectionError as error: return None, str(error) + def get_run_look_info(self, run_look_ids): + """ + Checks that the look IDs entered exist and can be queried + and returns the LookML model for each (needed for JSON Schema creation) + """ + looks = [] + for look_id in run_look_ids: + resp = self._request(f"{self.BASE_URL}/looks/{look_id}?fields=model(id),title") + if resp == []: + return [], f"Unable to find look {look_id}. Verify that you have entered a valid look ID and that you have permission to run it." + + looks.append((resp[0]["model"]["id"], look_id, resp[0]["title"])) + + return looks, None + def health_check(self) -> Tuple[bool, str]: if self._connect_error: return False, self._connect_error + elif self._run_looks_connect_error: + return False, self._run_looks_connect_error return True, "" @backoff.on_exception(backoff.expo, requests.exceptions.ConnectionError, max_tries=7) @@ -84,6 +142,76 @@ def _request(self, url: str, method: str = "GET", data: dict = None) -> List[dic return [response_data] return [] + def _get_run_look_json_schema(self): + """ + Generates a JSON Schema for the run_look endpoint based on the Look IDs + entered in configuration + """ + json_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "type": "object", + "properties": { + self._get_run_look_key(look_id, look_name): { + "title": look_name, + "properties": { + field: self._get_look_field_schema(model, field) for field in self._get_look_fields(look_id) + }, + "type": ["null", "object"], + "additionalProperties": False + } + for (model, look_id, look_name) in self._run_looks + } + } + return json_schema + + def _get_run_look_key(self, look_id, look_name): + return f"{look_id} - {look_name}" + + def _get_look_field_schema(self, model, field): + """ + For a given LookML model and field, looks up its type and generates + its properties for the run_look endpoint JSON Schema + """ + explore = field.split(".")[0] + + fields = self._get_explore_fields(model, explore) + + field_type = "string" # default to string + for dimension in fields['dimensions']: + if field == dimension['name'] and dimension['type'] in self._field_type_mapping: + field_type = self._field_type_mapping[dimension['type']] + for measure in fields['measures']: + if field == measure['name']: + # Default to number except for list, date, and yesno + field_type = "number" + if measure['type'] in self._field_type_mapping: + field_type = self._field_type_mapping[measure['type']] + + if field_type == 'datetime': + # no datetime type for JSON Schema + return { + "type": ["null", "string"], + "format": "date-time" + } + + return { + "type": ["null", field_type] + } + + def _get_explore_fields(self, model, explore): + """ + For a given LookML model and explore, looks up its dimensions/measures + and their types for run_look endpoint JSON Schema generation + """ + if (model, explore) not in self._run_look_explore_fields: + self._run_look_explore_fields[(model, explore)] = self._request(f"{self.BASE_URL}/lookml_models/{model}/explores/{explore}?fields=fields(dimensions(name,type),measures(name,type))")[0]['fields'] + + return self._run_look_explore_fields[(model, explore)] + + def _get_look_fields(self, look_id) -> List[str]: + return self._request(f"{self.BASE_URL}/looks/{look_id}?fields=query(fields)")[0]["query"]["fields"] + def _get_dashboard_ids(self) -> List[int]: if not self._dashboard_ids: self._dashboard_ids = [obj["id"] for obj in self._request(f"{self.BASE_URL}/dashboards") if isinstance(obj["id"], int)] @@ -198,6 +326,11 @@ def stream__role_groups(self, fields): for role_id in self._role_ids: yield from self._request(f"{self.BASE_URL}/roles/{role_id}/groups") + def stream__run_looks(self, fields): + for (model, look_id, look_name) in self._run_looks: + yield from [{self._get_run_look_key(look_id, look_name): row} + for row in self._request(f"{self.BASE_URL}/looks/{look_id}/run/json")] + def stream__scheduled_plans(self, fields): yield from self._request(f"{self.BASE_URL}/scheduled_plans?all_users=true") diff --git a/airbyte-integrations/connectors/source-looker/source_looker/schemas/run_looks.json b/airbyte-integrations/connectors/source-looker/source_looker/schemas/run_looks.json new file mode 100644 index 00000000000000..fe2cb1c3f6faf9 --- /dev/null +++ b/airbyte-integrations/connectors/source-looker/source_looker/schemas/run_looks.json @@ -0,0 +1,3 @@ +{ + "comment": "This schema gets created in client.py, but we need a placeholder for the super() method to work" +} diff --git a/airbyte-integrations/connectors/source-looker/source_looker/spec.json b/airbyte-integrations/connectors/source-looker/source_looker/spec.json index 0b303c9f38d2cc..6f9fd621248694 100644 --- a/airbyte-integrations/connectors/source-looker/source_looker/spec.json +++ b/airbyte-integrations/connectors/source-looker/source_looker/spec.json @@ -22,6 +22,15 @@ "title": "Client Secret", "type": "string", "description": "The Client Secret is second part of an API3 key." + }, + "run_look_ids": { + "title": "Look IDs to Run", + "type": "array", + "items": { + "type": "string", + "pattern": ["^[0-9]*$"] + }, + "description": "The IDs of any Looks to run (optional)" } } } diff --git a/docs/integrations/sources/looker.md b/docs/integrations/sources/looker.md index 900c6303099500..11795b7329ebbc 100644 --- a/docs/integrations/sources/looker.md +++ b/docs/integrations/sources/looker.md @@ -25,6 +25,7 @@ Several output streams are available from this source: * [Lookml Dashboards](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/dashboard#get_all_dashboards) * [Lookml Models](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/lookml-model#get_all_lookml_models) * [Looks](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/look#get_all_looks) + * [Run Look](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/look#run_look) * [Projects](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/project#get_all_projects) * [Project Files](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/project#get_all_project_files) * [Git Branches](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/project#get_all_git_branches)