Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 50 additions & 10 deletions cloudquery/sdk/serve/plugin.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import argparse
import logging
import os
from concurrent import futures

import grpc
import structlog
import sys
import os

from cloudquery.discovery_v1 import discovery_pb2_grpc
from cloudquery.plugin_v3 import plugin_pb2_grpc
from structlog import wrap_logger

from cloudquery.sdk.docs.generator import Generator
from cloudquery.sdk.internal.servers.discovery_v1.discovery import DiscoveryServicer
Expand All @@ -16,7 +17,6 @@

DOC_FORMATS = ["json", "markdown"]


_IS_WINDOWS = sys.platform == "win32"

try:
Expand All @@ -33,12 +33,27 @@


def get_logger(args):
log_level_map = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}

logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=log_level_map.get(args.log_level.lower(), logging.INFO),
)

processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False),
structlog.stdlib.filter_by_level,
structlog.processors.TimeStamper(fmt="%Y-%m-%dT%H:%M:%SZ", utc=True),
]
if args.log_format == "text":
processors.append(
Expand All @@ -58,9 +73,7 @@ def get_logger(args):
else:
processors.append(structlog.processors.JSONRenderer())

# if args.log_format == "json":
# processors.append(structlog.processors.JSONRenderer())
log = structlog.get_logger(processors=processors)
log = wrap_logger(logging.getLogger(), processors=processors)
return log


Expand All @@ -73,16 +86,42 @@ def run(self, args):
subparsers = parser.add_subparsers(dest="command", required=True)

serve_parser = subparsers.add_parser("serve", help="Start plugin server")
serve_parser.add_argument(
"--log-format",
type=str,
default="text",
choices=["text", "json"],
help="logging format",
)
serve_parser.add_argument(
"--log-level",
type=str,
default="info",
choices=["trace", "debug", "info", "warn", "error"],
help="log level",
)

# ignored for now
serve_parser.add_argument(
"--log-format", type=str, default="text", choices=["text", "json"]
"--no-sentry",
action="store_true",
help="disable sentry (placeholder for future use)",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the plugin to still run if this is passed in by the CLI

)
# ignored for now
serve_parser.add_argument(
"--otel-endpoint",
type=str,
default="",
help="Open Telemetry HTTP collector endpoint (placeholder for future use)",
)
# ignored for now
serve_parser.add_argument(
"--otel-endpoint-insecure",
type=str,
default="",
help="Open Telemetry HTTP collector endpoint (for development only) (placeholder for future use)",
)

serve_parser.add_argument(
"--address",
type=str,
Expand Down Expand Up @@ -140,15 +179,16 @@ def _serve(self, args):
PluginServicer(self._plugin, logger), self._server
)
self._server.add_insecure_port(args.address)
print("Starting server. Listening on " + args.address)
logger.info("Starting server", address=args.address)
self._server.start()
self._server.wait_for_termination()

def stop(self):
self._server.stop(5)

def _generate_docs(self, args):
print("Generating docs in format: " + args.format)
logger = get_logger(args)
logger.info("Generating docs", format=args.format)
generator = Generator(
self._plugin.name(),
self._plugin.get_tables(
Expand Down