Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃帀 Source Looker: add run-look endpoint #3911

Merged
merged 12 commits into from
Jun 25, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -4765,6 +4765,28 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "run_looks",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": true,
"type": "object",
"properties": {
"look_id": {
"properties": {
"additionalProperties": true
},
"type": ["null", "object"],
"additionalProperties": true
}
}
},
"supported_sync_modes": ["full_refresh"],
"destination_sync_mode": "overwrite",
"source_defined_cursor": false
}
tuliren marked this conversation as resolved.
Show resolved Hide resolved
}
]
tuliren marked this conversation as resolved.
Show resolved Hide resolved
}
137 changes: 135 additions & 2 deletions airbyte-integrations/connectors/source-looker/source_looker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@

import backoff
import requests
from airbyte_protocol import AirbyteStream
from base_python import BaseClient
from requests.exceptions import ConnectionError
from requests.structures import CaseInsensitiveDict

from typing import Generator

class Client(BaseClient):
API_VERSION = "3.1"

def __init__(self, domain: str, client_id: str, client_secret: str):
def __init__(self, domain: str, client_id: str, client_secret: str, run_look_ids: list=[]):
tuliren marked this conversation as resolved.
Show resolved Hide resolved
"""
Note that we dynamically generate schemas for the stream__run_looks
function because the fields returned depend on the user's look(s)
(entered during configuration). See get_run_look_json_schema().
"""
self.BASE_URL = f"https://{domain}/api/{self.API_VERSION}"
self._client_id = client_id
self._client_secret = client_secret
Expand All @@ -45,6 +51,28 @@ def __init__(self, domain: str, client_id: str, client_secret: str):
"Content-Type": "application/json",
"Accept": "application/json",
}

# Maps Looker types to JSON Schema types for run_look JSON schema
self._field_type_mapping = {
"string": "string", "date_date": "datetime", "date_raw": "datetime",
"date": "datetime", "date_week": "datetime", "date_day_of_week": "string",
"date_day_of_week_index": "integer", "date_month": "string",
"date_month_num": "integer", "date_month_name": "string",
"date_day_of_month": "integer", "date_fiscal_month_num": "integer",
"date_quarter": "string", "date_quarter_of_year": "string",
"date_fiscal_quarter": "string", "date_fiscal_quarter_of_year": "string",
"date_year": "integer", "date_day_of_year": "integer",
"date_week_of_year": "integer", "date_fiscal_year": "integer",
"date_time_of_day": "string", "date_hour": "string",
"date_hour_of_day": "integer", "date_minute": "datetime",
"date_second": "datetime", "date_millisecond": "datetime",
"date_microsecond": "datetime", "number": "number", "int": "integer",
"list": "array", "yesno": "boolean"
}

# Helpers for the self.stream__run_looks function
self._run_look_explore_fields = {}
self._run_looks, self._run_looks_connect_error = self.get_run_look_info(run_look_ids)

self._dashboard_ids = []
self._project_ids = []
Expand All @@ -54,6 +82,19 @@ def __init__(self, domain: str, client_id: str, client_secret: str):
self._context_metadata_mapping = {"dashboards": [], "folders": [], "homepages": [], "looks": [], "spaces": []}
super().__init__()

@property
def streams(self) -> Generator[AirbyteStream, None, None]:
"""
Uses the default streams except for the run_look endpoint, where we have
to generate its JSON Schema on the fly for the given look
"""

streams = super().streams
for stream in streams:
if len(self._run_looks) > 0 and stream.name == "run_looks":
stream.json_schema = self._get_run_look_json_schema()
yield stream

def get_token(self):
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/x-www-form-urlencoded"
Expand All @@ -67,9 +108,26 @@ def get_token(self):
except ConnectionError as error:
return None, str(error)

def get_run_look_info(self, run_look_ids):
"""
Checks that the look IDs entered exist and can be queried
and returns the LookML model for each (needed for JSON Schema creation)
"""
looks = []
for look_id in run_look_ids:
resp = self._request(f"{self.BASE_URL}/looks/{look_id}?fields=model(id),title")
if resp == []:
return [], f"Unable to find look {look_id}. Verify that you have entered a valid look ID and that you have permission to run it."

looks.append((resp[0]["model"]["id"], look_id, resp[0]["title"]))

return looks, None

def health_check(self) -> Tuple[bool, str]:
if self._connect_error:
return False, self._connect_error
elif self._run_looks_connect_error:
return False, self._run_looks_connect_error
return True, ""

@backoff.on_exception(backoff.expo, requests.exceptions.ConnectionError, max_tries=7)
Expand All @@ -84,6 +142,76 @@ def _request(self, url: str, method: str = "GET", data: dict = None) -> List[dic
return [response_data]
return []

def _get_run_look_json_schema(self):
"""
Generates a JSON Schema for the run_look endpoint based on the Look IDs
entered in configuration
"""
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
self._get_run_look_key(look_id, look_name): {
"title": look_name,
"properties": {
field: self._get_look_field_schema(model, field) for field in self._get_look_fields(look_id)
},
"type": ["null", "object"],
"additionalProperties": False
}
for (model, look_id, look_name) in self._run_looks
}
}
return json_schema

def _get_run_look_key(self, look_id, look_name):
return f"{look_id} - {look_name}"

def _get_look_field_schema(self, model, field):
"""
For a given LookML model and field, looks up its type and generates
its properties for the run_look endpoint JSON Schema
"""
explore = field.split(".")[0]

fields = self._get_explore_fields(model, explore)

field_type = "string" # default to string
for dimension in fields['dimensions']:
if field == dimension['name'] and dimension['type'] in self._field_type_mapping:
field_type = self._field_type_mapping[dimension['type']]
for measure in fields['measures']:
if field == measure['name']:
# Default to number except for list, date, and yesno
field_type = "number"
if measure['type'] in self._field_type_mapping:
field_type = self._field_type_mapping[measure['type']]

if field_type == 'datetime':
# no datetime type for JSON Schema
return {
"type": ["null", "string"],
"format": "date-time"
}

return {
"type": ["null", field_type]
}

def _get_explore_fields(self, model, explore):
"""
For a given LookML model and explore, looks up its dimensions/measures
and their types for run_look endpoint JSON Schema generation
"""
if (model, explore) not in self._run_look_explore_fields:
self._run_look_explore_fields[(model, explore)] = self._request(f"{self.BASE_URL}/lookml_models/{model}/explores/{explore}?fields=fields(dimensions(name,type),measures(name,type))")[0]['fields']

return self._run_look_explore_fields[(model, explore)]

def _get_look_fields(self, look_id) -> List[str]:
return self._request(f"{self.BASE_URL}/looks/{look_id}?fields=query(fields)")[0]["query"]["fields"]

def _get_dashboard_ids(self) -> List[int]:
if not self._dashboard_ids:
self._dashboard_ids = [obj["id"] for obj in self._request(f"{self.BASE_URL}/dashboards") if isinstance(obj["id"], int)]
Expand Down Expand Up @@ -198,6 +326,11 @@ def stream__role_groups(self, fields):
for role_id in self._role_ids:
yield from self._request(f"{self.BASE_URL}/roles/{role_id}/groups")

def stream__run_looks(self, fields):
for (model, look_id, look_name) in self._run_looks:
yield from [{self._get_run_look_key(look_id, look_name): row}
for row in self._request(f"{self.BASE_URL}/looks/{look_id}/run/json")]

def stream__scheduled_plans(self, fields):
yield from self._request(f"{self.BASE_URL}/scheduled_plans?all_users=true")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"comment": "This schema gets created in client.py, but we need a placeholder for the super() method to work"
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@
"title": "Client Secret",
"type": "string",
"description": "The Client Secret is second part of an API3 key."
},
"run_look_ids": {
"title": "Look IDs to Run",
"type": "array",
"items": {
"type": "string",
"pattern": ["^[0-9]*$"]
},
"description": "The IDs of any Looks to run (optional)"
}
}
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/looker.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Several output streams are available from this source:
* [Lookml Dashboards](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/dashboard#get_all_dashboards)
* [Lookml Models](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/lookml-model#get_all_lookml_models)
* [Looks](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/look#get_all_looks)
* [Run Look](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/look#run_look)
* [Projects](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/project#get_all_projects)
* [Project Files](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/project#get_all_project_files)
* [Git Branches](https://docs.looker.com/reference/api-and-integration/api-reference/v3.1/project#get_all_git_branches)
Expand Down