Skip to content

Commit

Permalink
Upload Strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
dmzoneill committed May 21, 2024
1 parent d21110a commit 04e8128
Show file tree
Hide file tree
Showing 17 changed files with 294 additions and 83 deletions.
18 changes: 13 additions & 5 deletions insights_analytics_collector/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def __init__(self, collector, fnc_collecting):
self.gathering_started_at = None
self.gathering_finished_at = None
self.gathering_successful = None
self.last_gathered_entry = self.collector.last_gathered_entry_for(self.key)
self.last_gathered_entry = self.collector.last_gathered_entry_for(
self.key)

@abstractmethod
def add_to_tar(self, tar):
Expand Down Expand Up @@ -67,7 +68,10 @@ def gather(self, max_data_size):

self.gathering_successful = True
except Exception as e:
self.logger.exception(f"Could not generate metric {self.filename}: {e}")
self.logger.exception(
f"Could not generate metric " # nopep8
f"{self.filename}: {e}"
) # nopep8
self.gathering_successful = False
finally:
self._set_gathering_finished()
Expand All @@ -87,7 +91,8 @@ def slices(self):
# Or it can force full table sync if interval is given
if self.fnc_slicing:
if self.full_sync_enabled:
slices = self.fnc_slicing(self.key, last_gather, full_sync_enabled=True)
slices = self.fnc_slicing(
self.key, last_gather, full_sync_enabled=True)
else:
slices = self.fnc_slicing(
self.key, last_gather, since=since, until=until
Expand Down Expand Up @@ -119,7 +124,9 @@ def update_last_gathered_entries(self, updates_dict):

if self.full_sync_enabled:
self._update_last_gathered_key(
updates_dict, f"{self.key}_full", self.gathering_finished_at
updates_dict, # nopep8
f"{self.key}_full", # nopep8
self.gathering_finished_at, # nopep8
)
else:
# collections are ordered by time slices.
Expand All @@ -141,7 +148,8 @@ def _is_full_sync_enabled(self, interval_days):
if not interval_days:
return False

last_full_sync = self.collector.last_gathered_entry_for(f"{self.key}_full")
last_full_sync = self.collector.last_gathered_entry_for(
f"{self.key}_full")
return not last_full_sync or last_full_sync < now() - timedelta(
days=interval_days
)
Expand Down
3 changes: 2 additions & 1 deletion insights_analytics_collector/collection_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def __init__(self, collector, fnc_collecting):
def add_to_tar(self, tar):
"""Adds CSV file to the tar(tgz) archive"""
self.logger.debug(
f"CollectionCSV._add_to_tar: | {self.key}.csv | Size: {self.data_size()}"
f"CollectionCSV._add_to_tar: | "
f"{self.key}.csv | Size: {self.data_size()}"
)
tar.add(self.target(), arcname=f"./{self.filename}")

Expand Down
3 changes: 2 additions & 1 deletion insights_analytics_collector/collection_data_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def data_collection_status(self, full_path, **kwargs):
"status",
"elapsed",
]
writer = csv.DictWriter(csvfile, delimiter=",", fieldnames=fieldnames)
writer = csv.DictWriter(
csvfile, delimiter=",", fieldnames=fieldnames)
writer.writeheader()

for collection in self.package.collections:
Expand Down
3 changes: 2 additions & 1 deletion insights_analytics_collector/collection_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def add_to_tar(self, tar):
"""Adds JSON data to TAR(tgz) archive"""
buf = self.target().encode("utf-8")
self.logger.debug(
f"CollectionJSON._add_to_tar: | {self.key}.json | Size: {self.data_size()}"
f"CollectionJSON._add_to_tar: | "
f"{self.key}.json | Size: {self.data_size()}"
)
info = tarfile.TarInfo(f"./{self.filename}")
info.size = len(buf)
Expand Down
41 changes: 28 additions & 13 deletions insights_analytics_collector/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,14 @@ def _calculate_collection_interval(self, since, until):
if until is not None and until > _now:
until = _now
self.logger.warning(
f"End of the collection interval is in the future, setting to {_now}."
f"End of the collection interval is in the future,"
f" setting to {_now}."
)
if since is not None and since > _now:
since = _now
self.logger.warning(
f"Start of the collection interval is in the future, setting to {_now}."
f"Start of the collection interval is in the "
f"future, setting to {_now}."
)

# The value of `until` needs to be concrete, so resolve it. If it wasn't passed in,
Expand All @@ -207,7 +209,8 @@ def _calculate_collection_interval(self, since, until):
if until > since + timedelta(weeks=4):
until = since + timedelta(weeks=4)
self.logger.warning(
f"End of the collection interval is greater than 4 weeks from start, setting end to {until}."
f"End of the collection interval is greater " # nopep8
f"than 4 weeks from start, setting end to {until}." # nopep8
)
else: # until is None
until = min(since + timedelta(weeks=4), _now)
Expand All @@ -228,14 +231,16 @@ def _calculate_collection_interval(self, since, until):
if since is not None and since < horizon:
since = horizon
self.logger.warning(
f"Start of the collection interval is more than 4 weeks prior to {until}, setting to {horizon}."
f"Start of the collection interval is " # nopep8
f"more than 4 weeks prior to {until}, setting to {horizon}." # nopep8
)

last_gather = self._last_gathering() or horizon
if last_gather < horizon:
last_gather = horizon
self.logger.warning(
f"Last analytics run was more than 4 weeks prior to {until}, using {horizon} instead."
f"Last analytics run was more than 4 weeks prior to " # nopep8
f"{until}, using {horizon} instead." # nopep8
)

self.gather_since = since
Expand Down Expand Up @@ -287,10 +292,12 @@ def _gather_config(self):
TODO: add "always" flag to @register decorator
"""
if not self.config_present():
self.logger.log(self.log_level, "'config' collector data is missing")
self.logger.log(
self.log_level, "'config' collector data is missing")
return False
else:
self.collections["config"].gather(self._package_class().max_data_size())
self.collections["config"].gather(
self._package_class().max_data_size())
return True

def _gather_json_collections(self):
Expand Down Expand Up @@ -338,19 +345,23 @@ def _pg_advisory_lock(self, key, wait=False):
yield True
else:
# Build 64-bit integer out of the resource id
resource_key = int(hashlib.sha512(key.encode()).hexdigest(), 16) % 2**63
resource_key = int(hashlib.sha512(
key.encode()).hexdigest(), 16) % 2**63

cursor = connection.cursor()

try:
if wait:
cursor.execute("SELECT pg_advisory_lock(%s);", (resource_key,))
cursor.execute(
"SELECT pg_advisory_lock(%s);", (resource_key,))
else:
cursor.execute("SELECT pg_try_advisory_lock(%s);", (resource_key,))
cursor.execute(
"SELECT pg_try_advisory_lock(%s);", (resource_key,))
acquired = cursor.fetchall()[0][0]
yield acquired
finally:
cursor.execute("SELECT pg_advisory_unlock(%s);", (resource_key,))
cursor.execute("SELECT pg_advisory_unlock(%s);",
(resource_key,))
cursor.close()

def _process_packages(self):
Expand Down Expand Up @@ -487,7 +498,8 @@ def _create_collections(self, subset=None):
for since, until in collection.slices():
collection.since = since
collection.until = until
self.collections[collection.data_type].append(collection)
self.collections[collection.data_type].append(
collection)
collection = self._create_collection(fnc)

def _create_collection(self, fnc_collecting):
Expand All @@ -499,7 +511,10 @@ def _create_collection(self, fnc_collecting):
collection = self._collection_csv_class()(self, fnc_collecting)

if collection is None:
raise RuntimeError(f"Collection of type {data_type} not implemented")
raise RuntimeError(
f"Collection of type " # nopep8
f"{data_type} not implemented"
) # nopep8

return collection

Expand Down
92 changes: 36 additions & 56 deletions insights_analytics_collector/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import os
import pathlib
import tarfile
from uploader.factory import get_uploader
from abc import abstractmethod

import requests


class Package:
"""
Expand All @@ -24,10 +23,12 @@ class Package:
# i.e. "application/vnd.redhat.tower.tower_payload+tgz"
PAYLOAD_CONTENT_TYPE = "application/vnd.redhat.TODO+tgz"

SHIPPING_AUTH_USERPASS = "user-pass"
SHIPPING_AUTH_USERPASS = "user-pass" # Development mode only
SHIPPING_AUTH_S3_USERPASS = "user-pass-s3"
SHIPPING_AUTH_IDENTITY = "x-rh-identity" # Development mode only
SHIPPING_AUTH_CERTIFICATES = "mutual-tls" # Mutual TLS
SHIPPING_AUTH_OAUTH_PAT = "service-account"
SHIPPING_AUTH_OAUTH_PKCE = "service-account"

DEFAULT_RHSM_CERT_FILE = "/etc/pki/consumer/cert.pem"
DEFAULT_RHSM_KEY_FILE = "/etc/pki/consumer/key.pem"
Expand Down Expand Up @@ -84,12 +85,14 @@ def has_free_space(self, requested_size):

def is_shipping_configured(self):
if not self.tar_path:
self.logger.error("Insights for Ansible Automation Platform TAR not found")
self.logger.error(
"Insights for Ansible Automation Platform TAR not found")
return False

if not os.path.exists(self.tar_path):
self.logger.error(
f"Insights for Ansible Automation Platform TAR {self.tar_path} not found"
f"Insights for Ansible Automation Platform TAR "
f"{self.tar_path} not found"
)
return False

Expand Down Expand Up @@ -153,7 +156,8 @@ def make_tgz(self):
self.tar_path = f.name
return True
except Exception as e:
self.logger.exception(f"Failed to write analytics archive file: {e}")
self.logger.exception(
f"Failed to write analytics archive file: {e}")
return False

def ship(self):
Expand All @@ -167,29 +171,27 @@ def ship(self):
self.logger.debug(f"shipping analytics file: {self.tar_path}")

with open(self.tar_path, "rb") as f:
files = {
"file": (
os.path.basename(self.tar_path),
f,
self._payload_content_type(),
)
}
s = requests.Session()
if self.shipping_auth_mode() == self.SHIPPING_AUTH_CERTIFICATES:
# as a single file (containing the private key and the certificate) or
# as a tuple of both files paths (cert_file, keyfile)
s.cert = self._get_client_certificates()

s.headers = self._get_http_request_headers()
s.headers.pop("Content-Type")

if self.shipping_auth_mode() == self.SHIPPING_AUTH_IDENTITY:
s.headers["x-rh-identity"] = self._get_x_rh_identity()

url = self.get_ingress_url()
self.shipping_successful = self._send_data(url, files, s)
uploader = get_uploader(
self.shipping_auth_mode(),
url=self.get_ingress_url(),
files={
"file": (
os.path.basename(self.tar_path),
f,
self._payload_content_type(),
)
},
username=self._get_rh_user(),
password=self._get_rh_password(),
cert_path=self.CERT_PATH,
cert=self._get_client_certificates(),
xrhidentity=self._get_x_rh_identity(),
headers=self._get_http_request_headers(),
proxy=self.get_proxy_url(),
)

return self.shipping_successful
uploader.headers.pop("Content-Type")
return uploader.send_data()

def shipping_auth_mode(self):
return self.SHIPPING_AUTH_USERPASS
Expand All @@ -214,32 +216,6 @@ def _collection_to_tar(self, tar, collection):
)
return None

def _send_data(self, url, files, session):
if self.shipping_auth_mode() == self.SHIPPING_AUTH_USERPASS:
response = session.post(
url,
files=files,
verify=self.CERT_PATH,
auth=(self._get_rh_user(), self._get_rh_password()),
headers=session.headers,
timeout=(31, 31),
)
else:
response = session.post(
url, files=files, headers=session.headers, timeout=(31, 31)
)

# Accept 2XX status_codes
if response.status_code >= 300:
self.logger.error(
"Upload failed with status {}, {}".format(
response.status_code, response.text
)
)
return False

return True

def _config_to_tar(self, tar):
if self.collector.collections["config"] is None:
self.logger.error(
Expand Down Expand Up @@ -310,7 +286,8 @@ def _data_collection_status_to_tar(self, tar):
self.manifest.add_collection(self.data_collection_status)
except Exception as e:
self.logger.exception(
f"Could not generate {self.data_collection_status.filename}: {e}"
f"Could not generate " # nopep8
f"{self.data_collection_status.filename}: {e}" # nopep8
)

def _manifest_to_tar(self, tar):
Expand All @@ -319,7 +296,10 @@ def _manifest_to_tar(self, tar):
self.manifest.add_to_tar(tar)
self.add_collection(self.manifest)
except Exception as e:
self.logger.exception(f"Could not generate {self.manifest.filename}: {e}")
self.logger.exception(
f"Could not generate " # nopep8
f"{self.manifest.filename}: {e}"
) # nopep8

def _payload_content_type(self):
return self.PAYLOAD_CONTENT_TYPE
Expand Down
Empty file.
Loading

0 comments on commit 04e8128

Please sign in to comment.