Skip to content

Commit

Permalink
Async Plugins Server and bump pydantic+fastapi (#844)
Browse files Browse the repository at this point in the history
* implemented asycn plugin api

* implemented asycn plugin api

* update pydantic to v2, it provides latest v1 pydantic

* changed get list of plugins path to /plugins

* added support for Custom Params

* fixed prettier on mac: prettier/prettier-vscode#3114

* add custom params to async plugins

* fix Uncontrolled data used in path expression

* async plugins frontend

* introduced new output format

* integrated new output format into client

* fm_receiver plugin now works

* fixed inital pluging call and loading bars default value

* removed artificial time delay

* moved plugin logic into custom hook

* moved plugins to seperate directory

* added lightwheigt container with gnuradio for pure python plugins

* added docker-compose example to add custom plugins

* install extra pip packages in docker container

* use same version of pydantic for api

* add version of email-validator package that pydantic wants

* cleanups

* misc cleanup

* cleanups

* fixed bug in IQ Output from Plugin screen that was already there

* fixed bug related to SecretStr

* bump fastapi and starlette to latest

* fix unit test

* added encrypt for imageURL

* switch back to imageURL being just a string, not SecretStr

* fixed unit tests and removed unused code

* fixed last broken unit test

* ran linter

* disable use cloud storage option on client side

---------

Co-authored-by: Marc Lichtman <marcll@vt.edu>
  • Loading branch information
romanziske and 777arc committed May 26, 2024
1 parent 6c5799d commit baa24f9
Show file tree
Hide file tree
Showing 58 changed files with 1,606 additions and 1,350 deletions.
4 changes: 4 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ insert_final_newline = true
[Makefile]
indent_style = tab
indent_size = 1

[*py]
indent_style = space
indent_size = 4
2 changes: 1 addition & 1 deletion .github/linters/.flake8
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[flake8]
max-line-length = 120
max-line-length = 150
ignore = E121,E123,E126,E226,E24,E704,W503,W504,E261
2 changes: 2 additions & 0 deletions .github/linters/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tool.black]
line-length = 150
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,4 @@ tiling/spectrogram.pmtiles
**/sdriq2sigmf/**/*.sigmf*

e2e/test.zip
python-env
8 changes: 5 additions & 3 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"editor.defaultFormatter": "esbenp.prettier-vscode"
},
"prettier.printWidth": 120,
"prettier.prettierPath": "./client/node_modules/prettier",
"azureFunctions.deploySubpath": "azure-functions",
"azureFunctions.scmDoBuildDuringDeployment": true,
"azureFunctions.projectLanguage": "Python",
Expand All @@ -31,8 +30,11 @@
"python.testing.pytestEnabled": true,
"vitest.enable": true,
"[python]": {
"editor.formatOnSave": false,
"editor.defaultFormatter": "ms-python.autopep8"
"editor.formatOnSave": true,
"editor.defaultFormatter": "ms-python.autopep8",
"editor.tabSize": 4,
"editor.insertSpaces": true,
"editor.detectIndentation": false
},
"autopep8.args": ["--max-line-length=150", "--ignore=E302"],
"python.formatting.provider": "black",
Expand Down
84 changes: 43 additions & 41 deletions api/app/datasources.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
from typing import Optional
import asyncio
import json
import os
import time
import asyncio
from typing import Optional

import numpy as np
from fastapi import Depends
from pymongo.operations import ReplaceOne
from bson import encode
from bson.raw_bson import RawBSONDocument

from .azure_client import AzureBlobClient
from .models import DataSource
from fastapi import Depends
from helpers.cipher import decrypt, encrypt
from helpers.samples import get_bytes_per_iq_sample
from .models import DataSource
from helpers.datasource_access import check_access
from helpers.samples import get_bytes_per_iq_sample
from pydantic import SecretStr
from pymongo.operations import ReplaceOne

from .azure_client import AzureBlobClient
from .database import db
from .metadata import validate_metadata, create
from .metadata import create, validate_metadata
from .models import DataSource


async def get(account, container) -> DataSource | None:
# Get a datasource by account and container
Expand All @@ -26,9 +28,11 @@ async def get(account, container) -> DataSource | None:
return None
return DataSource(**datasource)


async def datasource_exists(account, container) -> bool:
return await get(account, container) is not None


async def sync(account: str, container: str):
print(f"[SYNC] Starting sync for {account}/{container} on PID {os.getpid()} at {time.time()}")
azure_blob_client = AzureBlobClient(account, container)
Expand All @@ -48,7 +52,7 @@ async def sync(account: str, container: str):
for name in files:
if name.endswith(".sigmf-meta"):
filepath = os.path.join(path, name)
if '..' in filepath:
if ".." in filepath:
raise Exception("Invalid filepath")
with open(filepath, "r") as f:
content = f.read()
Expand All @@ -60,7 +64,7 @@ async def sync(account: str, container: str):
print(f"[SYNC] Error parsing metadata file {filepath}: {e}")
continue
if metadata:
metadatas.append((os.path.join(path, name).replace(azure_blob_client.base_filepath, '')[1:], metadata))
metadatas.append((os.path.join(path, name).replace(azure_blob_client.base_filepath, "")[1:], metadata))
# Even though the below code is similar to Azure version, the Azure version had to be tweaked to parallelize it
for metadata in metadatas:
filepath = metadata[0].replace(".sigmf-meta", "") # TODO: clean up the tuple messiness
Expand All @@ -77,9 +81,7 @@ async def sync(account: str, container: str):
}
metadata["global"]["traceability:revision"] = 0
file_length = await azure_blob_client.get_file_length(filepath + ".sigmf-data")
metadata["global"]["traceability:sample_length"] = (
file_length / get_bytes_per_iq_sample(metadata["global"]["core:datatype"])
)
metadata["global"]["traceability:sample_length"] = file_length / get_bytes_per_iq_sample(metadata["global"]["core:datatype"])
await create(metadata, user=None) # creates or updates the metadata object
# print(f"[SYNC] Created metadata for {filepath}") # commented out, caused too much spam
except Exception as e:
Expand All @@ -103,7 +105,7 @@ async def sync(account: str, container: str):
print("[SYNC] found", len(meta_blob_names), "meta files") # the process above took about 15s for 36318 metas

async def get_metadata(meta_blob_name):
if not meta_blob_name.replace(".sigmf-meta", ".sigmf-data") in data_blob_sizes: # bail early if data file doesnt exist
if meta_blob_name.replace(".sigmf-meta", ".sigmf-data") not in data_blob_sizes: # bail early if data file doesnt exist
print(f"[SYNC] Data file for {meta_blob_name} wasn't found")
return None
blob_client = container_client.get_blob_client(meta_blob_name)
Expand All @@ -129,7 +131,7 @@ async def get_metadata(meta_blob_name):
metadata["global"]["traceability:revision"] = 0
file_length = data_blob_sizes[filepath + ".sigmf-data"]
bytes_per_iq_sample = get_bytes_per_iq_sample(metadata["global"]["core:datatype"])
metadata["global"]["traceability:sample_length"] = (file_length / bytes_per_iq_sample)
metadata["global"]["traceability:sample_length"] = file_length / bytes_per_iq_sample
return metadata

# Running all coroutines at once failed for datasets with 10k's metas, so we need to break it up into batches
Expand All @@ -139,7 +141,7 @@ async def get_metadata(meta_blob_name):
metadatas = []
for i in range(num_batches):
coroutines = []
for meta_blob_name in meta_blob_names[i * batch_size:(i + 1) * batch_size]:
for meta_blob_name in meta_blob_names[i * batch_size : (i + 1) * batch_size]:
coroutines.append(get_metadata(meta_blob_name))
ret = await asyncio.gather(*coroutines) # Wait for all the coroutines to finish
metadatas.extend([x for x in ret if x is not None]) # remove the Nones
Expand All @@ -150,7 +152,7 @@ async def get_metadata(meta_blob_name):

bulk_writes = []
for metadata in metadatas:
meta_name = metadata['global']['traceability:origin']['file_path']
meta_name = metadata["global"]["traceability:origin"]["file_path"]
filter = {
"global.traceability:origin.account": account,
"global.traceability:origin.container": container,
Expand All @@ -164,7 +166,7 @@ async def get_metadata(meta_blob_name):
metadata_collection = db().metadata
metadata_collection.bulk_write(bulk_writes)

''' At some point we may remove the versions thing
""" At some point we may remove the versions thing
# audit document
audit_document = {
"metadata": metadata,
Expand All @@ -173,12 +175,13 @@ async def get_metadata(meta_blob_name):
}
versions: AgnosticCollection = versions_collection()
await versions.insert_one(audit_document)
'''
"""

print(f"[SYNC] Finished syncing {account}/{container}")
await azure_blob_client.close_blob_clients() # Close all the blob clients to avoid unclosed connection errors

async def create_datasource(datasource: DataSource, user: Optional[dict]) -> DataSource:

async def create_datasource(datasource: DataSource, user: Optional[dict]) -> bool:
"""
Create a new datasource. The datasource will be henceforth identified by account/container which
must be unique or this function will return a 400.
Expand All @@ -187,16 +190,14 @@ async def create_datasource(datasource: DataSource, user: Optional[dict]) -> Dat
datasource_collection = db().datasources
if await datasource_exists(datasource.account, datasource.container):
print("Datasource Already Exists!")
return None
return False
datasource_dict = datasource.dict(by_alias=True, exclude_unset=True)

# encrypt takes in the SecretStr and returns a string with the encrypted value, which then gets stored in mongo
if datasource.sasToken:
datasource.sasToken = encrypt(datasource.sasToken)
else:
datasource.sasToken = ""
datasource_dict["sasToken"] = encrypt(datasource.sasToken)
if datasource.accountKey:
datasource.accountKey = encrypt(datasource.accountKey)
else:
datasource.accountKey = ""
datasource_dict = datasource.dict(by_alias=True, exclude_unset=True)
datasource_dict["accountKey"] = encrypt(datasource.accountKey)

if "owners" not in datasource_dict:
datasource_dict["owners"] = []
Expand All @@ -206,13 +207,15 @@ async def create_datasource(datasource: DataSource, user: Optional[dict]) -> Dat
datasource_dict["readers"] = []
if "public" not in datasource_dict:
datasource_dict["public"] = True

await datasource_collection.insert_one(datasource_dict)
return datasource
return True


async def import_datasources_from_env():
connection_info = os.getenv("IQENGINE_CONNECTION_INFO", None)
base_filepath = os.getenv("IQENGINE_BACKEND_LOCAL_FILEPATH", None)
base_filepath = base_filepath.replace('"', '') if base_filepath else None
base_filepath = base_filepath.replace('"', "") if base_filepath else None

# For those using MSAL to enter in datasource connection info, leave IQENGINE_CONNECTION_INFO and IQENGINE_BACKEND_LOCAL_FILEPATH empty
if not connection_info and not base_filepath:
Expand All @@ -229,8 +232,8 @@ async def import_datasources_from_env():
if base_filepath and os.path.exists(base_filepath):
try:
datasource = DataSource(
account='local',
container='local',
account="local",
container="local",
sasToken=None,
accountKey=None,
name="Local to Backend",
Expand All @@ -245,7 +248,7 @@ async def import_datasources_from_env():
if create_ret:
await sync("local", "local")
except Exception as e:
print(f"Failed to import datasource local to backend", e)
print("Failed to import datasource local to backend", e)

# Add all cloud datasources
if connection_info:
Expand All @@ -254,21 +257,20 @@ async def import_datasources_from_env():
datasource = DataSource(
account=connection["accountName"],
container=connection["containerName"],
sasToken=connection["sasToken"],
accountKey=connection["accountKey"] if "accountKey" in connection else None,
sasToken=SecretStr(connection["sasToken"]),
accountKey=SecretStr(connection["accountKey"]) if "accountKey" in connection else None,
name=connection["name"],
description=connection["description"]
if "description" in connection
else None,
description=connection["description"] if "description" in connection else None,
imageURL=connection["imageURL"] if "imageURL" in connection else None,
type="api",
public=connection["public"] if "public" in connection else True,
owners=connection["owners"] if "owners" in connection else ["IQEngine-Admin"],
readers=connection["readers"] if "readers" in connection else [],
)
create_ret = await create_datasource(datasource=datasource, user=None)

if create_ret:
await sync(connection["accountName"], connection["containerName"])
except Exception as e:
print(f"Failed to import datasource {connection['name']}", e)
print(f"Failed to import datasource {connection['name']}.", e)
continue
Loading

0 comments on commit baa24f9

Please sign in to comment.