diff --git a/core/frontend/src/utils/pull_tracker.ts b/core/frontend/src/utils/pull_tracker.ts index 96fa9b4e2d..628e48f4e5 100755 --- a/core/frontend/src/utils/pull_tracker.ts +++ b/core/frontend/src/utils/pull_tracker.ts @@ -85,7 +85,7 @@ class PullTracker { // As the data consists of multiple jsons, the following like is a hack to split them const dataList = (this.left_over_data + dataChunk.replaceAll('}{', '}\n\n{')).split('\n\n') this.left_over_data = '' - + let errored: string | false = false for (const line of dataList) { try { const data = JSON.parse(line) @@ -100,6 +100,9 @@ class PullTracker { if ('status' in data) { this.layer_status[id] = data.status } + if ('error' in data) { + errored = data.error + } if (data?.progressDetail?.total !== undefined) { this.layer_progress_detail[id] = data.progressDetail } @@ -118,6 +121,9 @@ class PullTracker { this.left_over_data = line } } + if (errored) { + throw(Error(errored)) + } this.pull_output = '' this.layers.forEach((image) => { this.pull_output = `${this.pull_output}[${image}] ${this.layer_status[image]}` diff --git a/core/services/install-services.sh b/core/services/install-services.sh index b177138fea..a052d503fb 100755 --- a/core/services/install-services.sh +++ b/core/services/install-services.sh @@ -35,6 +35,7 @@ SERVICES=( ping versionchooser wifi + major_tom ) # We need to install loguru and appdirs since they may be used inside setup.py diff --git a/core/services/major_tom/major_tom.py b/core/services/major_tom/major_tom.py new file mode 100755 index 0000000000..be5c9e373b --- /dev/null +++ b/core/services/major_tom/major_tom.py @@ -0,0 +1,79 @@ +#! /usr/bin/env python3 +import datetime +import time +import uuid +from dataclasses import asdict, dataclass +from typing import Any, Dict, Optional +from zoneinfo import ZoneInfo + +import machineid + +from src.core import DefaultPayload, TelemetryEngine, get_latency +from src.metrics import Metrics +from src.typedefs import ExtensionInfo + +LOG_SESSION_UUID = str(uuid.uuid4()) + +SERVICE_NAME = "major_tom" +LOG_FOLDER_PATH = f"/var/logs/blueos/{SERVICE_NAME}/" + +TELEMETRY_ENDPOINT = "https://telemetry.blueos.online/api/v1/anonymous/" +S3_TELEMETRY_ENDPOINT = "https://telemetry.blueos.online/api/v1/anonymous/s3/" + + +# pylint: disable=too-many-instance-attributes +@dataclass +class AnonymousTelemetryRecord: + uptime: float + latency: float + memory_size: int + memory_usage: int + disk_size: int + disk_usage: int + extensions: Optional[list[ExtensionInfo]] + blueos_version: str + probe_time: float + + def json(self) -> dict[str, Any]: + return asdict(self) + + +def compose_default_record(order: int) -> Dict[str, Any]: + date_time_utc = datetime.datetime.now(ZoneInfo("UTC")).isoformat() + payload = DefaultPayload( + log_session_uuid=LOG_SESSION_UUID, + order=order, + timestamp=date_time_utc, + machine_id=machineid.hashed_id(""), + data={}, + ) + + start_probing = time.time() + metrics = Metrics() + record = AnonymousTelemetryRecord( + time.clock_gettime(time.CLOCK_BOOTTIME), + get_latency(), + metrics.memory.total, + metrics.memory.used, + metrics.disk.total, + metrics.disk.used, + metrics.installed_extensions, + "v0.0.1", + 0, + ) + record.probe_time = time.time() - start_probing + payload.data = record.json() + return payload.json() + + +if __name__ == "__main__": + TelemetryEngine( + label="anonymous", # used to tag telemetry type. we may have non-anonymous telemetry in the future + endpoint=TELEMETRY_ENDPOINT, + s3_endpoint=S3_TELEMETRY_ENDPOINT, + create_record=compose_default_record, + interval=60 * 10, # 10 minutes + max_file_size=1024 * 1024, # 1Mb + max_file_retention=10, + buffer_folder=LOG_FOLDER_PATH, + )() diff --git a/core/services/major_tom/requirements.txt b/core/services/major_tom/requirements.txt new file mode 100644 index 0000000000..23fd65b0bf --- /dev/null +++ b/core/services/major_tom/requirements.txt @@ -0,0 +1,6 @@ +psutil==5.9.5 +requests==2.31.0 +speedtest-cli==2.1.3 +Flask==2.3.2 +py-machineid==0.4.3 +loguru==0.7.0 \ No newline at end of file diff --git a/core/services/major_tom/setup.py b/core/services/major_tom/setup.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/core/services/major_tom/src/__init__.py b/core/services/major_tom/src/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/core/services/major_tom/src/core.py b/core/services/major_tom/src/core.py new file mode 100644 index 0000000000..a2a8bab9e3 --- /dev/null +++ b/core/services/major_tom/src/core.py @@ -0,0 +1,146 @@ +import copy +import gzip +import json +import os +import shutil +import time +from dataclasses import asdict, dataclass +from typing import Any, Callable, Dict, List + +import loguru +import requests +import speedtest + +loguru.logger.remove() + + +def formatter(record: "loguru.Record") -> str: + # Note this function returns the string to be formatted, not the actual message to be logged + record["extra"]["serialized"] = json.dumps(record["message"]) + return "{extra[serialized]}\n" + + +def is_online() -> bool: + return get_latency() > 0 + + +def get_latency() -> float: + try: + servers: List[str] = [] + st = speedtest.Speedtest() + st.get_servers(servers) + best_server = st.get_best_server() + ping = best_server["latency"] + return float(ping) + except Exception: + return -1.0 + + +@dataclass +class DefaultPayload: + log_session_uuid: str + order: int + timestamp: str + machine_id: str + data: dict[str, Any] + + def json(self) -> dict[str, Any]: + return asdict(self) + + +class TelemetryEngine: + # pylint: disable=too-many-arguments + def __init__( + self, + label: str, + endpoint: str, + s3_endpoint: str, + create_record: Callable[[Any], Any], + interval: float, + max_file_size: int, + max_file_retention: int, + buffer_folder: str, + ): + self.buffer_file = f"{buffer_folder}/{label}_usage.log" + self.buffer_folder = buffer_folder + + self.telemetry_endpoint = endpoint + self.telemetry_s3_endpoint = s3_endpoint + self.create_record = create_record + self.interval = interval + + self.buffer = copy.deepcopy(loguru.logger) + self.buffer.add( + self.buffer_file, + rotation=max_file_size, + retention=max_file_retention, + format=formatter, + compression="gz", + ) + + def __call__(self) -> None: + order = 0 + while True: + order += 1 + record = self.create_record(order) + if self.save(record) == "online": + self.process_buffered_records() + time.sleep(self.interval) + + def upload_file(self, file: str) -> bool: + """ + This method requests to telemetry API a presigned url and upload the local archived files. + """ + print(f"uploading file... {file}") + try: + response = requests.get(self.telemetry_s3_endpoint, timeout=5).json() + with open(file, "rb") as fh: + files = {"file": (file, fh)} + r = requests.post(response["url"], data=response["fields"], files=files, timeout=300) + if r.status_code == 204: + print("[Success!]") + return True + except Exception: + pass + + return False + + def process_buffered_records(self) -> None: + """ + Check in the buffered folder if there are archived logs to upload. If the agent connects before an archive + is created it will also archive the current buffer file and upload it. + """ + for file in os.listdir(self.buffer_folder): + file_path = os.path.join(self.buffer_folder, file) + + # Upload regular archive + if file_path.endswith(".log.gz"): + if self.upload_file(file_path): + os.remove(file_path) + + # Archive current buffer and upload it + if file_path.endswith(".log") and os.path.getsize(file_path): + timestamp = int(time.time()) + tmp_name = self.buffer_file.replace(".log", f".{timestamp}.log.gz") + with open(self.buffer_file, "rb") as f_in: + with gzip.open(tmp_name, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + if self.upload_file(tmp_name): + os.remove(tmp_name) + with open(self.buffer_file, "w", encoding="utf-8"): + # create new empty file if not there + pass + + def save(self, record: Dict[str, Any]) -> str: + """ + Try to POST the telemetry payload, if it fails for any reason, we buffer it locally. + """ + try: + r = requests.post(self.telemetry_endpoint, json=record, timeout=5) + if r.status_code == 201: + return "online" + except Exception as err: + print(err) + + self.buffer.info(record) + return "offline" diff --git a/core/services/major_tom/src/metrics.py b/core/services/major_tom/src/metrics.py new file mode 100644 index 0000000000..826a60a554 --- /dev/null +++ b/core/services/major_tom/src/metrics.py @@ -0,0 +1,27 @@ +from functools import cached_property +from typing import List, Optional + +import psutil +import requests + +from src.typedefs import ExtensionInfo + + +class Metrics: + @cached_property + def installed_extensions(self) -> Optional[List[ExtensionInfo]]: + try: + req = requests.get("http://localhost:9134/v1.0/installed_extensions", timeout=3) + if req.status_code == 200: + return [ExtensionInfo(identifier=rec["identifier"], tag=rec["tag"]) for rec in req.json()] + except Exception: + return None + return [] + + @cached_property + def disk(self) -> psutil._common.sdiskusage: + return psutil.disk_usage("/") + + @cached_property + def memory(self) -> psutil._pslinux.svmem: + return psutil.virtual_memory() diff --git a/core/services/major_tom/src/typedefs.py b/core/services/major_tom/src/typedefs.py new file mode 100644 index 0000000000..b04d49154a --- /dev/null +++ b/core/services/major_tom/src/typedefs.py @@ -0,0 +1,7 @@ +from dataclasses import dataclass + + +@dataclass +class ExtensionInfo: + identifier: str + tag: str diff --git a/core/services/versionchooser/utils/chooser.py b/core/services/versionchooser/utils/chooser.py index 59a47b723c..2f885f3bdd 100644 --- a/core/services/versionchooser/utils/chooser.py +++ b/core/services/versionchooser/utils/chooser.py @@ -129,8 +129,12 @@ async def pull_version(self, request: web.Request, repository: str, tag: str) -> await response.prepare(request) # Stream every line of the output back to the client - async for line in self.client.images.pull(f"{repository}:{tag}", repo=repository, tag=tag, stream=True): - await response.write(json.dumps(line).encode("utf-8")) + try: + async for line in self.client.images.pull(f"{repository}:{tag}", repo=repository, tag=tag, stream=True): + await response.write(json.dumps(line).encode("utf-8")) + except Exception as e: + logger.error(f"pull of {repository}:{tag} failed: {e}") + await response.write(json.dumps({"error": "error while pulling image"}).encode("utf-8")) await response.write_eof() # TODO: restore pruning return response diff --git a/core/start-blueos-core b/core/start-blueos-core index 645c02d92e..ce2759c017 100755 --- a/core/start-blueos-core +++ b/core/start-blueos-core @@ -77,6 +77,7 @@ SERVICES=( 'nginx',"nice --18 nginx -g \"daemon off;\" -c $TOOLS_PATH/nginx/nginx.conf" 'log_zipper',"$SERVICES_PATH/log_zipper/main.py '/shortcuts/system_logs/**/*.log' --max-age-minutes 60" 'bag_of_holding',"$SERVICES_PATH/bag_of_holding/main.py" + 'major_tom',"$SERVICES_PATH/major_tom/major_tom.py" ) tmux -f /etc/tmux.conf start-server