From 04e8128abb82bd85be95180ce5989410deac9ad7 Mon Sep 17 00:00:00 2001 From: David O Neill Date: Mon, 20 May 2024 16:56:14 +0100 Subject: [PATCH] Upload Strategy --- insights_analytics_collector/collection.py | 18 +++- .../collection_csv.py | 3 +- .../collection_data_status.py | 3 +- .../collection_json.py | 3 +- insights_analytics_collector/collector.py | 41 ++++++--- insights_analytics_collector/package.py | 92 ++++++++----------- .../uploader/__init__.py | 0 insights_analytics_collector/uploader/base.py | 52 +++++++++++ .../uploader/basic.py | 28 ++++++ insights_analytics_collector/uploader/cert.py | 9 ++ .../uploader/factory.py | 22 +++++ .../uploader/identity.py | 22 +++++ insights_analytics_collector/uploader/pat.py | 49 ++++++++++ insights_analytics_collector/uploader/pkce.py | 9 ++ insights_analytics_collector/uploader/s3.py | 9 ++ tests/functional/test_gathering.py | 15 ++- tests/functional/test_slicing.py | 2 +- 17 files changed, 294 insertions(+), 83 deletions(-) create mode 100644 insights_analytics_collector/uploader/__init__.py create mode 100644 insights_analytics_collector/uploader/base.py create mode 100644 insights_analytics_collector/uploader/basic.py create mode 100644 insights_analytics_collector/uploader/cert.py create mode 100644 insights_analytics_collector/uploader/factory.py create mode 100644 insights_analytics_collector/uploader/identity.py create mode 100644 insights_analytics_collector/uploader/pat.py create mode 100644 insights_analytics_collector/uploader/pkce.py create mode 100644 insights_analytics_collector/uploader/s3.py diff --git a/insights_analytics_collector/collection.py b/insights_analytics_collector/collection.py index a173a11..274167b 100644 --- a/insights_analytics_collector/collection.py +++ b/insights_analytics_collector/collection.py @@ -36,7 +36,8 @@ def __init__(self, collector, fnc_collecting): self.gathering_started_at = None self.gathering_finished_at = None self.gathering_successful = None - self.last_gathered_entry = self.collector.last_gathered_entry_for(self.key) + self.last_gathered_entry = self.collector.last_gathered_entry_for( + self.key) @abstractmethod def add_to_tar(self, tar): @@ -67,7 +68,10 @@ def gather(self, max_data_size): self.gathering_successful = True except Exception as e: - self.logger.exception(f"Could not generate metric {self.filename}: {e}") + self.logger.exception( + f"Could not generate metric " # nopep8 + f"{self.filename}: {e}" + ) # nopep8 self.gathering_successful = False finally: self._set_gathering_finished() @@ -87,7 +91,8 @@ def slices(self): # Or it can force full table sync if interval is given if self.fnc_slicing: if self.full_sync_enabled: - slices = self.fnc_slicing(self.key, last_gather, full_sync_enabled=True) + slices = self.fnc_slicing( + self.key, last_gather, full_sync_enabled=True) else: slices = self.fnc_slicing( self.key, last_gather, since=since, until=until @@ -119,7 +124,9 @@ def update_last_gathered_entries(self, updates_dict): if self.full_sync_enabled: self._update_last_gathered_key( - updates_dict, f"{self.key}_full", self.gathering_finished_at + updates_dict, # nopep8 + f"{self.key}_full", # nopep8 + self.gathering_finished_at, # nopep8 ) else: # collections are ordered by time slices. @@ -141,7 +148,8 @@ def _is_full_sync_enabled(self, interval_days): if not interval_days: return False - last_full_sync = self.collector.last_gathered_entry_for(f"{self.key}_full") + last_full_sync = self.collector.last_gathered_entry_for( + f"{self.key}_full") return not last_full_sync or last_full_sync < now() - timedelta( days=interval_days ) diff --git a/insights_analytics_collector/collection_csv.py b/insights_analytics_collector/collection_csv.py index 7e4d2e1..84194ad 100644 --- a/insights_analytics_collector/collection_csv.py +++ b/insights_analytics_collector/collection_csv.py @@ -24,7 +24,8 @@ def __init__(self, collector, fnc_collecting): def add_to_tar(self, tar): """Adds CSV file to the tar(tgz) archive""" self.logger.debug( - f"CollectionCSV._add_to_tar: | {self.key}.csv | Size: {self.data_size()}" + f"CollectionCSV._add_to_tar: | " + f"{self.key}.csv | Size: {self.data_size()}" ) tar.add(self.target(), arcname=f"./{self.filename}") diff --git a/insights_analytics_collector/collection_data_status.py b/insights_analytics_collector/collection_data_status.py index 872e27b..c374fe7 100644 --- a/insights_analytics_collector/collection_data_status.py +++ b/insights_analytics_collector/collection_data_status.py @@ -28,7 +28,8 @@ def data_collection_status(self, full_path, **kwargs): "status", "elapsed", ] - writer = csv.DictWriter(csvfile, delimiter=",", fieldnames=fieldnames) + writer = csv.DictWriter( + csvfile, delimiter=",", fieldnames=fieldnames) writer.writeheader() for collection in self.package.collections: diff --git a/insights_analytics_collector/collection_json.py b/insights_analytics_collector/collection_json.py index 9ffc083..424c750 100644 --- a/insights_analytics_collector/collection_json.py +++ b/insights_analytics_collector/collection_json.py @@ -32,7 +32,8 @@ def add_to_tar(self, tar): """Adds JSON data to TAR(tgz) archive""" buf = self.target().encode("utf-8") self.logger.debug( - f"CollectionJSON._add_to_tar: | {self.key}.json | Size: {self.data_size()}" + f"CollectionJSON._add_to_tar: | " + f"{self.key}.json | Size: {self.data_size()}" ) info = tarfile.TarInfo(f"./{self.filename}") info.size = len(buf) diff --git a/insights_analytics_collector/collector.py b/insights_analytics_collector/collector.py index 69bceda..b977a9a 100644 --- a/insights_analytics_collector/collector.py +++ b/insights_analytics_collector/collector.py @@ -191,12 +191,14 @@ def _calculate_collection_interval(self, since, until): if until is not None and until > _now: until = _now self.logger.warning( - f"End of the collection interval is in the future, setting to {_now}." + f"End of the collection interval is in the future," + f" setting to {_now}." ) if since is not None and since > _now: since = _now self.logger.warning( - f"Start of the collection interval is in the future, setting to {_now}." + f"Start of the collection interval is in the " + f"future, setting to {_now}." ) # The value of `until` needs to be concrete, so resolve it. If it wasn't passed in, @@ -207,7 +209,8 @@ def _calculate_collection_interval(self, since, until): if until > since + timedelta(weeks=4): until = since + timedelta(weeks=4) self.logger.warning( - f"End of the collection interval is greater than 4 weeks from start, setting end to {until}." + f"End of the collection interval is greater " # nopep8 + f"than 4 weeks from start, setting end to {until}." # nopep8 ) else: # until is None until = min(since + timedelta(weeks=4), _now) @@ -228,14 +231,16 @@ def _calculate_collection_interval(self, since, until): if since is not None and since < horizon: since = horizon self.logger.warning( - f"Start of the collection interval is more than 4 weeks prior to {until}, setting to {horizon}." + f"Start of the collection interval is " # nopep8 + f"more than 4 weeks prior to {until}, setting to {horizon}." # nopep8 ) last_gather = self._last_gathering() or horizon if last_gather < horizon: last_gather = horizon self.logger.warning( - f"Last analytics run was more than 4 weeks prior to {until}, using {horizon} instead." + f"Last analytics run was more than 4 weeks prior to " # nopep8 + f"{until}, using {horizon} instead." # nopep8 ) self.gather_since = since @@ -287,10 +292,12 @@ def _gather_config(self): TODO: add "always" flag to @register decorator """ if not self.config_present(): - self.logger.log(self.log_level, "'config' collector data is missing") + self.logger.log( + self.log_level, "'config' collector data is missing") return False else: - self.collections["config"].gather(self._package_class().max_data_size()) + self.collections["config"].gather( + self._package_class().max_data_size()) return True def _gather_json_collections(self): @@ -338,19 +345,23 @@ def _pg_advisory_lock(self, key, wait=False): yield True else: # Build 64-bit integer out of the resource id - resource_key = int(hashlib.sha512(key.encode()).hexdigest(), 16) % 2**63 + resource_key = int(hashlib.sha512( + key.encode()).hexdigest(), 16) % 2**63 cursor = connection.cursor() try: if wait: - cursor.execute("SELECT pg_advisory_lock(%s);", (resource_key,)) + cursor.execute( + "SELECT pg_advisory_lock(%s);", (resource_key,)) else: - cursor.execute("SELECT pg_try_advisory_lock(%s);", (resource_key,)) + cursor.execute( + "SELECT pg_try_advisory_lock(%s);", (resource_key,)) acquired = cursor.fetchall()[0][0] yield acquired finally: - cursor.execute("SELECT pg_advisory_unlock(%s);", (resource_key,)) + cursor.execute("SELECT pg_advisory_unlock(%s);", + (resource_key,)) cursor.close() def _process_packages(self): @@ -487,7 +498,8 @@ def _create_collections(self, subset=None): for since, until in collection.slices(): collection.since = since collection.until = until - self.collections[collection.data_type].append(collection) + self.collections[collection.data_type].append( + collection) collection = self._create_collection(fnc) def _create_collection(self, fnc_collecting): @@ -499,7 +511,10 @@ def _create_collection(self, fnc_collecting): collection = self._collection_csv_class()(self, fnc_collecting) if collection is None: - raise RuntimeError(f"Collection of type {data_type} not implemented") + raise RuntimeError( + f"Collection of type " # nopep8 + f"{data_type} not implemented" + ) # nopep8 return collection diff --git a/insights_analytics_collector/package.py b/insights_analytics_collector/package.py index 217aa27..d00cca7 100644 --- a/insights_analytics_collector/package.py +++ b/insights_analytics_collector/package.py @@ -3,10 +3,9 @@ import os import pathlib import tarfile +from uploader.factory import get_uploader from abc import abstractmethod -import requests - class Package: """ @@ -24,10 +23,12 @@ class Package: # i.e. "application/vnd.redhat.tower.tower_payload+tgz" PAYLOAD_CONTENT_TYPE = "application/vnd.redhat.TODO+tgz" - SHIPPING_AUTH_USERPASS = "user-pass" + SHIPPING_AUTH_USERPASS = "user-pass" # Development mode only SHIPPING_AUTH_S3_USERPASS = "user-pass-s3" SHIPPING_AUTH_IDENTITY = "x-rh-identity" # Development mode only SHIPPING_AUTH_CERTIFICATES = "mutual-tls" # Mutual TLS + SHIPPING_AUTH_OAUTH_PAT = "service-account" + SHIPPING_AUTH_OAUTH_PKCE = "service-account" DEFAULT_RHSM_CERT_FILE = "/etc/pki/consumer/cert.pem" DEFAULT_RHSM_KEY_FILE = "/etc/pki/consumer/key.pem" @@ -84,12 +85,14 @@ def has_free_space(self, requested_size): def is_shipping_configured(self): if not self.tar_path: - self.logger.error("Insights for Ansible Automation Platform TAR not found") + self.logger.error( + "Insights for Ansible Automation Platform TAR not found") return False if not os.path.exists(self.tar_path): self.logger.error( - f"Insights for Ansible Automation Platform TAR {self.tar_path} not found" + f"Insights for Ansible Automation Platform TAR " + f"{self.tar_path} not found" ) return False @@ -153,7 +156,8 @@ def make_tgz(self): self.tar_path = f.name return True except Exception as e: - self.logger.exception(f"Failed to write analytics archive file: {e}") + self.logger.exception( + f"Failed to write analytics archive file: {e}") return False def ship(self): @@ -167,29 +171,27 @@ def ship(self): self.logger.debug(f"shipping analytics file: {self.tar_path}") with open(self.tar_path, "rb") as f: - files = { - "file": ( - os.path.basename(self.tar_path), - f, - self._payload_content_type(), - ) - } - s = requests.Session() - if self.shipping_auth_mode() == self.SHIPPING_AUTH_CERTIFICATES: - # as a single file (containing the private key and the certificate) or - # as a tuple of both files paths (cert_file, keyfile) - s.cert = self._get_client_certificates() - - s.headers = self._get_http_request_headers() - s.headers.pop("Content-Type") - - if self.shipping_auth_mode() == self.SHIPPING_AUTH_IDENTITY: - s.headers["x-rh-identity"] = self._get_x_rh_identity() - - url = self.get_ingress_url() - self.shipping_successful = self._send_data(url, files, s) + uploader = get_uploader( + self.shipping_auth_mode(), + url=self.get_ingress_url(), + files={ + "file": ( + os.path.basename(self.tar_path), + f, + self._payload_content_type(), + ) + }, + username=self._get_rh_user(), + password=self._get_rh_password(), + cert_path=self.CERT_PATH, + cert=self._get_client_certificates(), + xrhidentity=self._get_x_rh_identity(), + headers=self._get_http_request_headers(), + proxy=self.get_proxy_url(), + ) - return self.shipping_successful + uploader.headers.pop("Content-Type") + return uploader.send_data() def shipping_auth_mode(self): return self.SHIPPING_AUTH_USERPASS @@ -214,32 +216,6 @@ def _collection_to_tar(self, tar, collection): ) return None - def _send_data(self, url, files, session): - if self.shipping_auth_mode() == self.SHIPPING_AUTH_USERPASS: - response = session.post( - url, - files=files, - verify=self.CERT_PATH, - auth=(self._get_rh_user(), self._get_rh_password()), - headers=session.headers, - timeout=(31, 31), - ) - else: - response = session.post( - url, files=files, headers=session.headers, timeout=(31, 31) - ) - - # Accept 2XX status_codes - if response.status_code >= 300: - self.logger.error( - "Upload failed with status {}, {}".format( - response.status_code, response.text - ) - ) - return False - - return True - def _config_to_tar(self, tar): if self.collector.collections["config"] is None: self.logger.error( @@ -310,7 +286,8 @@ def _data_collection_status_to_tar(self, tar): self.manifest.add_collection(self.data_collection_status) except Exception as e: self.logger.exception( - f"Could not generate {self.data_collection_status.filename}: {e}" + f"Could not generate " # nopep8 + f"{self.data_collection_status.filename}: {e}" # nopep8 ) def _manifest_to_tar(self, tar): @@ -319,7 +296,10 @@ def _manifest_to_tar(self, tar): self.manifest.add_to_tar(tar) self.add_collection(self.manifest) except Exception as e: - self.logger.exception(f"Could not generate {self.manifest.filename}: {e}") + self.logger.exception( + f"Could not generate " # nopep8 + f"{self.manifest.filename}: {e}" + ) # nopep8 def _payload_content_type(self): return self.PAYLOAD_CONTENT_TYPE diff --git a/insights_analytics_collector/uploader/__init__.py b/insights_analytics_collector/uploader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/insights_analytics_collector/uploader/base.py b/insights_analytics_collector/uploader/base.py new file mode 100644 index 0000000..96c2856 --- /dev/null +++ b/insights_analytics_collector/uploader/base.py @@ -0,0 +1,52 @@ +from abc import ABC, abstractmethod + +import requests + + +class BaseUploader(ABC): + def __init__(self, **kwargs): + self.session = requests.Session() + self._cert = None + self._headers = {} + self._proxy = None + for key, value in kwargs.items(): + setattr(self, key, value) + + @abstractmethod + def send_data(self): + pass + + @property + def cert(self): + return self._cert + + @cert.setter + def cert(self, cert): + self._cert = cert + + @property + def headers(self): + return self._headers + + @headers.setter + def headers(self, headers): + self._headers = headers + + @property + def proxy(self): + return self._proxy + + @proxy.setter + def proxy(self, proxy): + self._proxy = proxy + + def response(self, response): + # Accept only 200-299 status codes + if not (200 <= response.status_code < 300): + self.logger.error( + "Upload failed with status {}, {}".format( + response.status_code, response.text + ) + ) + return False + return True diff --git a/insights_analytics_collector/uploader/basic.py b/insights_analytics_collector/uploader/basic.py new file mode 100644 index 0000000..16e79f0 --- /dev/null +++ b/insights_analytics_collector/uploader/basic.py @@ -0,0 +1,28 @@ +from base import BaseUploader + + +class Basic(BaseUploader): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def send_data(self): + if hasattr(self, "cert_path") is False: + raise ("cert_path is none") + + if hasattr(self, "username") is False: + raise ("username is none") + + if hasattr(self, "username") is False: + raise ("password is none") + + response = self.session.post( + self.url, + files=self.files, + verify=self.cert_path, + auth=(self.username, self.password), + headers=self.headers, + timeout=(31, 31), + proxies={"https": self.proxy, "http": self.proxy}, + ) + + return self.response(response) diff --git a/insights_analytics_collector/uploader/cert.py b/insights_analytics_collector/uploader/cert.py new file mode 100644 index 0000000..94dd00f --- /dev/null +++ b/insights_analytics_collector/uploader/cert.py @@ -0,0 +1,9 @@ +from base import BaseUploader + + +class Cert(BaseUploader): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def send_data(self): + raise ("Auth strategy not implemented") diff --git a/insights_analytics_collector/uploader/factory.py b/insights_analytics_collector/uploader/factory.py new file mode 100644 index 0000000..2b727cd --- /dev/null +++ b/insights_analytics_collector/uploader/factory.py @@ -0,0 +1,22 @@ +from cert import Cert +from pat import Pat +from pkce import Pkce +from identity import Identity +from s3 import S3 +from basic import Basic + + +def get_uploader(strategy, **kwargs): + strategies = { + "user-pass": Basic, + "user-pass-s3": S3, + "x-rh-identity": Identity, + "mutual-tls": Cert, + "service-account": Pat, + "service-account-pkce": Pkce, + } + + if strategy not in strategies: + raise ValueError("Unknown auth strategy: {}".format(strategy)) + + return strategies[strategy](**kwargs) diff --git a/insights_analytics_collector/uploader/identity.py b/insights_analytics_collector/uploader/identity.py new file mode 100644 index 0000000..4c0943b --- /dev/null +++ b/insights_analytics_collector/uploader/identity.py @@ -0,0 +1,22 @@ +from base import BaseUploader + + +class Identity(BaseUploader): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def send_data(self): + if self.xrhidentity is None: + raise ("xrhidentity is none") + + self.headers["x-rh-identity"] = self.xrhidentity + + response = self.session.post( + self.url, + files=self.files, + headers=self.headers, + timeout=(31, 31), + proxies={"https": self.proxy, "http": self.proxy}, + ) + + return self.response(response) diff --git a/insights_analytics_collector/uploader/pat.py b/insights_analytics_collector/uploader/pat.py new file mode 100644 index 0000000..f6e6fcb --- /dev/null +++ b/insights_analytics_collector/uploader/pat.py @@ -0,0 +1,49 @@ +from base import BaseUploader +import json + + +class Pat(BaseUploader): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.sso_url = "https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token" + + def send_data(self): + if hasattr(self, "cert_path") is False: + raise ("cert_path is none") + + if hasattr(self, "username") is False: + raise ("username is none") + + if hasattr(self, "username") is False: + raise ("password is none") + + self.headers["Content-Type"] = "application/x-www-form-urlencoded" + + data = { + "client_id": self.username, + "client_secret": self.password, + "grant_type": "client_credentials", + } + + response = self.session.post( + self.sso_url, + headers=self.headers, + data=data, + verify=self.cert_path, + timeout=(31, 31), + proxies={"https": self.proxy, "http": self.proxy}, + ) + access_token = json.loads(response.content)["access_token"] + + self.headers["authorization"] = "Bearer {}".format(access_token) + + response = self.session.post( + self.url, + files=self.files, + verify=self.cert_path, + proxies={"https": self.proxy, "http": self.proxy}, + headers=self.headers, + timeout=(31, 31), + ) + + return self.response(response) diff --git a/insights_analytics_collector/uploader/pkce.py b/insights_analytics_collector/uploader/pkce.py new file mode 100644 index 0000000..be8dc27 --- /dev/null +++ b/insights_analytics_collector/uploader/pkce.py @@ -0,0 +1,9 @@ +from base import BaseUploader + + +class Pkce(BaseUploader): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def send_data(self): + raise ("Auth strategy not implemented") diff --git a/insights_analytics_collector/uploader/s3.py b/insights_analytics_collector/uploader/s3.py new file mode 100644 index 0000000..9256e75 --- /dev/null +++ b/insights_analytics_collector/uploader/s3.py @@ -0,0 +1,9 @@ +from base import BaseUploader + + +class S3(BaseUploader): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def send_data(self): + raise ("Auth strategy not implemented") diff --git a/tests/functional/test_gathering.py b/tests/functional/test_gathering.py index 4013246..b3c20ee 100644 --- a/tests/functional/test_gathering.py +++ b/tests/functional/test_gathering.py @@ -24,7 +24,8 @@ def collector(mocker): def test_missing_config(mocker, collector): mock_logger = mocker.patch.object(collector, "logger") - tgz_files = collector.gather(subset=["json_collection_1", "json_collection_2"]) + tgz_files = collector.gather( + subset=["json_collection_1", "json_collection_2"]) assert tgz_files is None mock_logger.log.assert_called_with( @@ -49,15 +50,18 @@ def test_json_collections(collector): assert "./json_collection_2.json" in files.keys() assert json.loads(files["./config.json"].read()) == {"version": "1.0"} - assert json.loads(files["./json_collection_1.json"].read()) == {"json1": "True"} - assert json.loads(files["./json_collection_2.json"].read()) == {"json2": "True"} + assert json.loads( + files["./json_collection_1.json"].read()) == {"json1": "True"} + assert json.loads( + files["./json_collection_2.json"].read()) == {"json2": "True"} collector._gather_cleanup() def test_small_csvs(collector): tgz_files = collector.gather( - subset=["config", "csv_collection_1", "csv_collection_2", "csv_collection_3"] + subset=["config", "csv_collection_1", + "csv_collection_2", "csv_collection_3"] ) assert len(tgz_files) == 1 @@ -133,7 +137,8 @@ def test_multiple_collections_multiple_tarballs(mocker, collector): mocker.patch("tests.classes.package.Package.MAX_DATA_SIZE", 1000) tgz_files = collector.gather( - subset=["config", "big_table_2", "csv_collection_1", "csv_collection_2"] + subset=["config", "big_table_2", + "csv_collection_1", "csv_collection_2"] ) assert len(tgz_files) == 3 diff --git a/tests/functional/test_slicing.py b/tests/functional/test_slicing.py index 36d38a6..efb5fd7 100644 --- a/tests/functional/test_slicing.py +++ b/tests/functional/test_slicing.py @@ -51,7 +51,7 @@ def test_slices_by_date(collector): assert "./csv_one_day_slicing_1.csv" in files.keys() lines = files["./csv_one_day_slicing_1.csv"].readlines() - _header = lines.pop(0) + lines.pop(0) row = decode_csv_line(lines[0]) csv_since = datetime.datetime(