Skip to content

Commit

Permalink
core: create Major Tom
Browse files Browse the repository at this point in the history
  • Loading branch information
voorloopnul authored and Williangalvani committed Aug 24, 2023
1 parent 95750fa commit 7d2ef8d
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 3 deletions.
8 changes: 7 additions & 1 deletion core/frontend/src/utils/pull_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class PullTracker {
// As the data consists of multiple jsons, the following like is a hack to split them
const dataList = (this.left_over_data + dataChunk.replaceAll('}{', '}\n\n{')).split('\n\n')
this.left_over_data = ''

let errored: string | false = false
for (const line of dataList) {
try {
const data = JSON.parse(line)
Expand All @@ -100,6 +100,9 @@ class PullTracker {
if ('status' in data) {
this.layer_status[id] = data.status
}
if ('error' in data) {
errored = data.error
}
if (data?.progressDetail?.total !== undefined) {
this.layer_progress_detail[id] = data.progressDetail
}
Expand All @@ -118,6 +121,9 @@ class PullTracker {
this.left_over_data = line
}
}
if (errored) {
throw(Error(errored))
}
this.pull_output = ''
this.layers.forEach((image) => {
this.pull_output = `${this.pull_output}[${image}] ${this.layer_status[image]}`
Expand Down
1 change: 1 addition & 0 deletions core/services/install-services.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ SERVICES=(
ping
versionchooser
wifi
major_tom
)

# We need to install loguru and appdirs since they may be used inside setup.py
Expand Down
79 changes: 79 additions & 0 deletions core/services/major_tom/major_tom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#! /usr/bin/env python3
import datetime
import time
import uuid
from dataclasses import asdict, dataclass
from typing import Any, Dict, Optional
from zoneinfo import ZoneInfo

import machineid

from src.core import DefaultPayload, TelemetryEngine, get_latency
from src.metrics import Metrics
from src.typedefs import ExtensionInfo

LOG_SESSION_UUID = str(uuid.uuid4())

SERVICE_NAME = "major_tom"
LOG_FOLDER_PATH = f"/var/logs/blueos/{SERVICE_NAME}/"

TELEMETRY_ENDPOINT = "https://telemetry.blueos.online/api/v1/anonymous/"
S3_TELEMETRY_ENDPOINT = "https://telemetry.blueos.online/api/v1/anonymous/s3/"


# pylint: disable=too-many-instance-attributes
@dataclass
class AnonymousTelemetryRecord:
uptime: float
latency: float
memory_size: int
memory_usage: int
disk_size: int
disk_usage: int
extensions: Optional[list[ExtensionInfo]]
blueos_version: str
probe_time: float

def json(self) -> dict[str, Any]:
return asdict(self)


def compose_default_record(order: int) -> Dict[str, Any]:
date_time_utc = datetime.datetime.now(ZoneInfo("UTC")).isoformat()
payload = DefaultPayload(
log_session_uuid=LOG_SESSION_UUID,
order=order,
timestamp=date_time_utc,
machine_id=machineid.hashed_id(""),
data={},
)

start_probing = time.time()
metrics = Metrics()
record = AnonymousTelemetryRecord(
time.clock_gettime(time.CLOCK_BOOTTIME),
get_latency(),
metrics.memory.total,
metrics.memory.used,
metrics.disk.total,
metrics.disk.used,
metrics.installed_extensions,
"v0.0.1",
0,
)
record.probe_time = time.time() - start_probing
payload.data = record.json()
return payload.json()


if __name__ == "__main__":
TelemetryEngine(
label="anonymous", # used to tag telemetry type. we may have non-anonymous telemetry in the future
endpoint=TELEMETRY_ENDPOINT,
s3_endpoint=S3_TELEMETRY_ENDPOINT,
create_record=compose_default_record,
interval=60 * 10, # 10 minutes
max_file_size=1024 * 1024, # 1Mb
max_file_retention=10,
buffer_folder=LOG_FOLDER_PATH,
)()
6 changes: 6 additions & 0 deletions core/services/major_tom/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
psutil==5.9.5
requests==2.31.0
speedtest-cli==2.1.3
Flask==2.3.2
py-machineid==0.4.3
loguru==0.7.0
Empty file.
Empty file.
146 changes: 146 additions & 0 deletions core/services/major_tom/src/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import copy
import gzip
import json
import os
import shutil
import time
from dataclasses import asdict, dataclass
from typing import Any, Callable, Dict, List

import loguru
import requests
import speedtest

loguru.logger.remove()


def formatter(record: "loguru.Record") -> str:
# Note this function returns the string to be formatted, not the actual message to be logged
record["extra"]["serialized"] = json.dumps(record["message"])
return "{extra[serialized]}\n"


def is_online() -> bool:
return get_latency() > 0


def get_latency() -> float:
try:
servers: List[str] = []
st = speedtest.Speedtest()
st.get_servers(servers)
best_server = st.get_best_server()
ping = best_server["latency"]
return float(ping)
except Exception:
return -1.0


@dataclass
class DefaultPayload:
log_session_uuid: str
order: int
timestamp: str
machine_id: str
data: dict[str, Any]

def json(self) -> dict[str, Any]:
return asdict(self)


class TelemetryEngine:
# pylint: disable=too-many-arguments
def __init__(
self,
label: str,
endpoint: str,
s3_endpoint: str,
create_record: Callable[[Any], Any],
interval: float,
max_file_size: int,
max_file_retention: int,
buffer_folder: str,
):
self.buffer_file = f"{buffer_folder}/{label}_usage.log"
self.buffer_folder = buffer_folder

self.telemetry_endpoint = endpoint
self.telemetry_s3_endpoint = s3_endpoint
self.create_record = create_record
self.interval = interval

self.buffer = copy.deepcopy(loguru.logger)
self.buffer.add(
self.buffer_file,
rotation=max_file_size,
retention=max_file_retention,
format=formatter,
compression="gz",
)

def __call__(self) -> None:
order = 0
while True:
order += 1
record = self.create_record(order)
if self.save(record) == "online":
self.process_buffered_records()
time.sleep(self.interval)

def upload_file(self, file: str) -> bool:
"""
This method requests to telemetry API a presigned url and upload the local archived files.
"""
print(f"uploading file... {file}")
try:
response = requests.get(self.telemetry_s3_endpoint, timeout=5).json()
with open(file, "rb") as fh:
files = {"file": (file, fh)}
r = requests.post(response["url"], data=response["fields"], files=files, timeout=300)
if r.status_code == 204:
print("[Success!]")
return True
except Exception:
pass

return False

def process_buffered_records(self) -> None:
"""
Check in the buffered folder if there are archived logs to upload. If the agent connects before an archive
is created it will also archive the current buffer file and upload it.
"""
for file in os.listdir(self.buffer_folder):
file_path = os.path.join(self.buffer_folder, file)

# Upload regular archive
if file_path.endswith(".log.gz"):
if self.upload_file(file_path):
os.remove(file_path)

# Archive current buffer and upload it
if file_path.endswith(".log") and os.path.getsize(file_path):
timestamp = int(time.time())
tmp_name = self.buffer_file.replace(".log", f".{timestamp}.log.gz")
with open(self.buffer_file, "rb") as f_in:
with gzip.open(tmp_name, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
if self.upload_file(tmp_name):
os.remove(tmp_name)
with open(self.buffer_file, "w", encoding="utf-8"):
# create new empty file if not there
pass

def save(self, record: Dict[str, Any]) -> str:
"""
Try to POST the telemetry payload, if it fails for any reason, we buffer it locally.
"""
try:
r = requests.post(self.telemetry_endpoint, json=record, timeout=5)
if r.status_code == 201:
return "online"
except Exception as err:
print(err)

self.buffer.info(record)
return "offline"
27 changes: 27 additions & 0 deletions core/services/major_tom/src/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from functools import cached_property
from typing import List, Optional

import psutil
import requests

from src.typedefs import ExtensionInfo


class Metrics:
@cached_property
def installed_extensions(self) -> Optional[List[ExtensionInfo]]:
try:
req = requests.get("http://localhost:9134/v1.0/installed_extensions", timeout=3)
if req.status_code == 200:
return [ExtensionInfo(identifier=rec["identifier"], tag=rec["tag"]) for rec in req.json()]
except Exception:
return None
return []

@cached_property
def disk(self) -> psutil._common.sdiskusage:
return psutil.disk_usage("/")

@cached_property
def memory(self) -> psutil._pslinux.svmem:
return psutil.virtual_memory()
7 changes: 7 additions & 0 deletions core/services/major_tom/src/typedefs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dataclasses import dataclass


@dataclass
class ExtensionInfo:
identifier: str
tag: str
8 changes: 6 additions & 2 deletions core/services/versionchooser/utils/chooser.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,12 @@ async def pull_version(self, request: web.Request, repository: str, tag: str) ->
await response.prepare(request)

# Stream every line of the output back to the client
async for line in self.client.images.pull(f"{repository}:{tag}", repo=repository, tag=tag, stream=True):
await response.write(json.dumps(line).encode("utf-8"))
try:
async for line in self.client.images.pull(f"{repository}:{tag}", repo=repository, tag=tag, stream=True):
await response.write(json.dumps(line).encode("utf-8"))
except Exception as e:
logger.error(f"pull of {repository}:{tag} failed: {e}")
await response.write(json.dumps({"error": "error while pulling image"}).encode("utf-8"))
await response.write_eof()
# TODO: restore pruning
return response
Expand Down
1 change: 1 addition & 0 deletions core/start-blueos-core
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ SERVICES=(
'nginx',"nice --18 nginx -g \"daemon off;\" -c $TOOLS_PATH/nginx/nginx.conf"
'log_zipper',"$SERVICES_PATH/log_zipper/main.py '/shortcuts/system_logs/**/*.log' --max-age-minutes 60"
'bag_of_holding',"$SERVICES_PATH/bag_of_holding/main.py"
'major_tom',"$SERVICES_PATH/major_tom/major_tom.py"
)

tmux -f /etc/tmux.conf start-server
Expand Down

0 comments on commit 7d2ef8d

Please sign in to comment.