From 165c09bce3d838cc1064ed02eaee8b7a10fbd03a Mon Sep 17 00:00:00 2001 From: Omar Al-Jadda Date: Fri, 16 Aug 2019 13:14:16 -0700 Subject: [PATCH] [AIRFLOW-5158] adds google sheets hook --- airflow/contrib/hooks/gcp_sheets_hook.py | 255 ++++++++++++++++++++ tests/contrib/hooks/test_gcp_sheets_hook.py | 165 +++++++++++++ 2 files changed, 420 insertions(+) create mode 100644 airflow/contrib/hooks/gcp_sheets_hook.py create mode 100644 tests/contrib/hooks/test_gcp_sheets_hook.py diff --git a/airflow/contrib/hooks/gcp_sheets_hook.py b/airflow/contrib/hooks/gcp_sheets_hook.py new file mode 100644 index 0000000000000..ffffae08ced0f --- /dev/null +++ b/airflow/contrib/hooks/gcp_sheets_hook.py @@ -0,0 +1,255 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import print_function +from googleapiclient.discovery import build +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook +from airflow.exceptions import AirflowException + + +class GCPSheetsHook(GoogleCloudBaseHook): + """ + Interact with Google Sheets via GCP connection + + """ + + def __init__(self, + gcp_conn_id: str = 'google_cloud_default', + spreadsheet_id: str = None, + api_version: str = 'v4', + delegate_to: str = None) -> None: + """ + :param grpc_conn_id: The connection ID to use when fetching connection info. + :type grpc_conn_id: str + :param gsheet_id: The Google Sheet ID to interact with + :type gsheet_id: str + :param range_: The Range or Named Range to interact with + :type range_: str + """ + super().__init__(gcp_conn_id, delegate_to) + self.gcp_conn_id = gcp_conn_id + + if not spreadsheet_id: + raise AirflowException("The spreadsheet_id must be passed!") + + self.spreadsheet_id = spreadsheet_id + self.api_version = api_version + self.delegate_to = delegate_to + self.num_retries = self._get_field('num_retries', 1), # type: int + self._conn = None + + def get_conn(self): + """ + Retrieves connection to Google Sheets. + :return: Google Sheets services object. + :rtype: dict + """ + if not self._conn: + http_authorized = self._authorize() + self._conn = build('sheets', self.api_version, http=http_authorized, cache_discovery=False) + + return self._conn + + """ + Reading and writing cells in Google Sheet: + https://developers.google.com/sheets/api/guides/values + """ + + @GoogleCloudBaseHook.fallback_to_default_project_id + def get_values(self, + range_: str, + major_dimension: str = 'DIMENSION_UNSPECIFIED', + value_render_option: str = 'FORMATTED_VALUE', + date_time_render_option: str = 'SERIAL_NUMBER', + project_id: str = None) -> dict: + """ + Gets values from Google Sheet from a single range + https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get + """ + assert project_id is not None + service = self.get_conn() + response = service.spreadsheets().values().batchGet( + spreadsheetId=self.spreadsheet_id, + ranges=range_, + majorDimension=major_dimension, + valueRenderOption=value_render_option, + dateTimeRenderOption=date_time_render_option + ).execute() + + return response + + @GoogleCloudBaseHook.fallback_to_default_project_id + def batch_get_values(self, + ranges: list, + major_dimension: str = 'DIMENSION_UNSPECIFIED', + value_render_option: str = 'FORMATTED_VALUE', + date_time_render_option: str = 'SERIAL_NUMBER', + project_id: str = None) -> dict: + """ + Gets values from Google Sheet from a list of ranges + https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/batchGet + """ + assert project_id is not None + service = self.get_conn() + response = service.spreadsheets().values().batchGet( + spreadsheetId=self.spreadsheet_id, + ranges=ranges, + majorDimension=major_dimension, + valueRenderOption=value_render_option, + dateTimeRenderOption=date_time_render_option + ).execute() + + return response + + @GoogleCloudBaseHook.fallback_to_default_project_id + def update_values(self, + range_: str, + major_dimension: str = 'ROWS', + value_input_option: str = 'RAW', + include_values_in_response: bool = False, + response_value_render_option: str = 'FORMATTED_VALUE', + response_date_time_render_option: str = 'SERIAL_NUMBER', + values: list = [], + project_id: str = None) -> dict: + """ + Updates values from Google Sheet from a single range + https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/update """ + assert project_id is not None + service = self.get_conn() + body = { + "range": range_, + "majorDimension": major_dimension, + "values": values + } + response = service.spreadsheets().values().update( + spreadsheetId=self.spreadsheet_id, + range=range_, + valueInputOption=value_input_option, + includeValuesInResponse=include_values_in_response, + responseValueRenderOption=response_value_render_option, + responseDateTimeRenderOption=response_date_time_render_option, + body=body + ).execute() + + return response + + @GoogleCloudBaseHook.fallback_to_default_project_id + def batch_update_values(self, + ranges: list = [], + values: list = [], + major_dimension: str = 'ROWS', + value_input_option: str = 'RAW', + include_values_in_response: bool = False, + response_value_render_option: str = 'FORMATTED_VALUE', + response_date_time_render_option: str = 'SERIAL_NUMBER', + project_id: str = None) -> dict: + """ + Updates values from Google Sheet for multiple ranges + https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/batchUpdate + """ + assert project_id is not None + service = self.get_conn() + data = [] + for idx, range_ in enumerate(ranges): + value_range = { + "range": range_, + "majorDimension": major_dimension, + "values": values[idx] + } + data.append(value_range) + body = { + "valueInputOption": value_input_option, + "data": data, + "includeValuesInResponse": include_values_in_response, + "responseValueRenderOption": response_value_render_option, + "responseDateTimeRenderOption": response_date_time_render_option + } + response = service.spreadsheets().values().batchUpdate( + spreadsheetId=self.spreadsheet_id, + body=body + ).execute() + + return response + + @GoogleCloudBaseHook.fallback_to_default_project_id + def append_values(self, + range_: str, + major_dimension: str = 'ROWS', + value_input_option: str = 'RAW', + insert_data_option: str = 'OVERWRITE', # or INSERT_ROWS + include_values_in_response: bool = False, + response_value_render_option: str = 'FORMATTED_VALUE', + response_date_time_render_option: str = 'SERIAL_NUMBER', + values: list = [], + project_id: str = None) -> dict: + """ + Append values from Google Sheet from a single range + https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/append + """ + assert project_id is not None + service = self.get_conn() + body = { + "range": range_, + "majorDimension": major_dimension, + "values": values + } + response = service.spreadsheets().values().append( + spreadsheetId=self.spreadsheet_id, + range=range_, + valueInputOption=value_input_option, + insertDataOption=insert_data_option, + includeValuesInResponse=include_values_in_response, + responseValueRenderOption=response_value_render_option, + responseDateTimeRenderOption=response_date_time_render_option, + body=body + ).execute() + + return response + + @GoogleCloudBaseHook.fallback_to_default_project_id + def clear(self, range_: str, project_id: str = None) -> dict: + """ + Clear values from Google Sheet from a single range + https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/clear """ + assert project_id is not None + service = self.get_conn() + response = service.spreadsheets().values().clear( + spreadsheetId=self.spreadsheet_id, + range=range_ + ).execute() + + return response + + @GoogleCloudBaseHook.fallback_to_default_project_id + def batch_clear(self, ranges: list = [], project_id: str = None) -> dict: + """ + Clear values from Google Sheet from a list of ranges + https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/batchClear """ + assert project_id is not None + service = self.get_conn() + body = { + "ranges": ranges + } + response = service.spreadsheets().values().batchClear( + spreadsheetId=self.spreadsheet_id, + body=body + ).execute() + + return response diff --git a/tests/contrib/hooks/test_gcp_sheets_hook.py b/tests/contrib/hooks/test_gcp_sheets_hook.py new file mode 100644 index 0000000000000..54e9e18c70880 --- /dev/null +++ b/tests/contrib/hooks/test_gcp_sheets_hook.py @@ -0,0 +1,165 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +import time +# from pprint import pprint + +google_import = True +try: + from airflow.contrib.hooks.gcp_sheets_hook import GCPSheetsHook +except ImportError: + google_import = False + +# ----------------------------------------------------------------------- +# To test this hook on a live Google sheet: +# Update below with the details to your GCP service_account connection. +# Also, be sure add a SPREADSHEET_ID and SHARE the Google Sheet +# with the email of your service account (full edit access) + +GCP_CONN_ID = 'google_cloud_default' +SPREADHSEET_ID = None +SHEET_NAME = 'hook' +SLEEP_TIME = 1 +# ----------------------------------------------------------------------- + + +class TestGCPSheetsHook(unittest.TestCase): + def setUp(self): + ''' Load the spreadsheet with the values used for testing. ''' + self.sheet_name = SHEET_NAME + self.sheet_range = self.sheet_name + '!A:E' + self.sheet_range_a = self.sheet_name + '!A:A' + self.sheet_range_b = self.sheet_name + '!B:B' + self.sheet_range_c = self.sheet_name + '!C:C' + + self.sheet_clear_range_a = self.sheet_name + '!A1:A3' + self.sheet_clear_range_b = self.sheet_name + '!B1:B3' + + self.sheet_row_dim = [['$A$1', '$B$1', '$C$1', '$D$1', '$E$1'], + ['$A$2', '$B$2', '$C$2', '$D$2', '$E$2'], + ['$A$3', '$B$3', '$C$3', '$D$3', '$E$3']] + + self.sheet_col_dim = [['$A$1', '$A$2', '$A$3'], + ['$B$1', '$B$2', '$B$3'], + ['$C$1', '$C$2', '$C$3'], + ['$D$1', '$D$2', '$D$3'], + ['$E$1', '$E$2', '$E$3']] + + self.update_values_a = [['A1', 'A2', 'A3']] + self.update_values_b = [['B1', 'B2', 'B3']] + + self.gsheet_hook = GCPSheetsHook( + gcp_conn_id=GCP_CONN_ID, spreadsheet_id=SPREADHSEET_ID) + self.gsheet_hook.clear(range_=self.sheet_range) + self.gsheet_hook.update_values(range_=self.sheet_range, + values=self.sheet_row_dim, + include_values_in_response=True, + major_dimension='ROWS') + # pprint(response) + time.sleep(SLEEP_TIME) + + @unittest.skipUnless(google_import, 'import error. Try `pip install google-api-python-client`') + @unittest.skipUnless(SPREADHSEET_ID, 'SPREADHSEET_ID not provided.') + def test_get_values_by_rows(self): + response = self.gsheet_hook.get_values( + range_=self.sheet_range, major_dimension='ROWS') + self.assertEqual(response['valueRanges'][0] + ['values'], self.sheet_row_dim) + time.sleep(SLEEP_TIME) + + @unittest.skipUnless(google_import, 'import error. Try `pip install google-api-python-client`') + @unittest.skipUnless(SPREADHSEET_ID, 'SPREADHSEET_ID not provided.') + def test_get_values_by_columns(self): + response = self.gsheet_hook.get_values( + range_=self.sheet_range, major_dimension='COLUMNS') + # pprint(response) + self.assertEqual(response['valueRanges'][0] + ['values'], self.sheet_col_dim) + time.sleep(SLEEP_TIME) + + @unittest.skipUnless(google_import, 'import error. Try `pip install google-api-python-client`') + @unittest.skipUnless(SPREADHSEET_ID, 'SPREADHSEET_ID not provided.') + def test_batch_get_values_by_rows(self): + response = self.gsheet_hook.batch_get_values(ranges=[self.sheet_range_a, self.sheet_range_b], + major_dimension='ROWS') + # pprint(response) + self.assertEqual(response['valueRanges'][0]['values'], [ + [row[0]] for row in self.sheet_row_dim]) + self.assertEqual(response['valueRanges'][1]['values'], [ + [row[1]] for row in self.sheet_row_dim]) + time.sleep(SLEEP_TIME) + + @unittest.skipUnless(google_import, 'import error. Try `pip install google-api-python-client`') + @unittest.skipUnless(SPREADHSEET_ID, 'SPREADHSEET_ID not provided.') + def test_batch_get_values_by_columns(self): + response = self.gsheet_hook.batch_get_values(ranges=[self.sheet_range_a, self.sheet_range_b], + major_dimension='COLUMNS') + # pprint(response) + self.assertEqual(response['valueRanges'][0] + ['values'], [self.sheet_col_dim[0]]) + self.assertEqual(response['valueRanges'][1] + ['values'], [self.sheet_col_dim[1]]) + time.sleep(SLEEP_TIME) + + @unittest.skipUnless(google_import, 'import error. Try `pip install google-api-python-client`') + @unittest.skipUnless(SPREADHSEET_ID, 'SPREADHSEET_ID not provided.') + def test_update_values(self): + response = self.gsheet_hook.update_values(range_=self.sheet_range_a, + values=self.update_values_a, + include_values_in_response=True, + major_dimension='COLUMNS') + # pprint(response) + self.assertEqual(response['updatedData'] + ['values'], self.update_values_a) + time.sleep(SLEEP_TIME) + + @unittest.skipUnless(google_import, 'import error. Try `pip install google-api-python-client`') + @unittest.skipUnless(SPREADHSEET_ID, 'SPREADHSEET_ID not provided.') + def test_batch_update_values(self): + response = self.gsheet_hook.batch_update_values(ranges=[self.sheet_range_a, self.sheet_range_b], + values=[self.update_values_a, self.update_values_b], + include_values_in_response=True, + major_dimension='COLUMNS') + # pprint(response) + self.assertEqual(response['responses'][0] + ['updatedData']['values'], self.update_values_a) + self.assertEqual(response['responses'][1] + ['updatedData']['values'], self.update_values_b) + time.sleep(SLEEP_TIME) + + @unittest.skipUnless(google_import, 'import error. Try `pip install google-api-python-client`') + @unittest.skipUnless(SPREADHSEET_ID, 'SPREADHSEET_ID not provided.') + def test_clear_range(self): + response = self.gsheet_hook.clear(range_=self.sheet_clear_range_a) + # pprint(response) + self.assertEqual(response['clearedRange'], self.sheet_clear_range_a) + time.sleep(SLEEP_TIME) + + @unittest.skipUnless(google_import, 'import error. Try `pip install google-api-python-client`') + @unittest.skipUnless(SPREADHSEET_ID, 'SPREADHSEET_ID not provided.') + def test_batch_clear_range(self): + response = self.gsheet_hook.batch_clear(ranges=[self.sheet_clear_range_a, self.sheet_clear_range_b]) + # pprint(response) + self.assertEqual(response['clearedRanges'], [self.sheet_clear_range_a, self.sheet_clear_range_b]) + time.sleep(SLEEP_TIME) + + def tearDown(self): + self.gsheet_hook.clear(range_=self.sheet_range)