Skip to content

Commit

Permalink
Merge pull request #24 from techunits/master
Browse files Browse the repository at this point in the history
Updated gRPC server to support ASYNC mode
  • Loading branch information
gluk-w committed Jul 31, 2022
2 parents adfb9d0 + adf5576 commit fcea029
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 24 deletions.
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -35,7 +35,8 @@ GRPCSERVER = {
'credentials': [{
'private_key': 'private_key.pem',
'certificate_chain': 'certificate_chain.pem'
}] # required only if SSL/TLS support is required to be enabled
}], # required only if SSL/TLS support is required to be enabled
'async': False # Default: False, if True then gRPC server will start in ASYNC mode
}
```

Expand Down
2 changes: 1 addition & 1 deletion django_grpc/__version__.py
@@ -1 +1 @@
__version__ = '1.0.15'
__version__ = '1.1.0'
48 changes: 36 additions & 12 deletions django_grpc/management/commands/grpcserver.py
@@ -1,13 +1,15 @@
import datetime
import asyncio

from django.core.management.base import BaseCommand
from django.utils import autoreload

from django.conf import settings
from django_grpc.utils import create_server, extract_handlers


class Command(BaseCommand):
help = 'Run gRPC server'
config = getattr(settings, 'GRPCSERVER', dict())

def add_arguments(self, parser):
parser.add_argument('--max_workers', type=int, help="Number of workers")
Expand All @@ -16,29 +18,51 @@ def add_arguments(self, parser):
parser.add_argument('--list-handlers', action='store_true', default=False, help="Print all registered endpoints")

def handle(self, *args, **options):
if options['autoreload'] is True:
self.stdout.write("ATTENTION! Autoreload is enabled!")
if hasattr(autoreload, "run_with_reloader"):
# Django 2.2. and above
autoreload.run_with_reloader(self._serve, **options)
else:
# Before Django 2.2.
autoreload.main(self._serve, None, options)
is_async = self.config.get('async', False)
if is_async is True:
self._serve_async(**options)
else:
self._serve(**options)
if options['autoreload'] is True:
self.stdout.write("ATTENTION! Autoreload is enabled!")
if hasattr(autoreload, "run_with_reloader"):
# Django 2.2. and above
autoreload.run_with_reloader(self._serve, **options)
else:
# Before Django 2.2.
autoreload.main(self._serve, None, options)
else:
self._serve(**options)

def _serve(self, max_workers, port, *args, **kwargs):
autoreload.raise_last_exception()
self.stdout.write("Starting server at %s" % datetime.datetime.now())
self.stdout.write("gRPC server starting at %s" % datetime.datetime.now())

server = create_server(max_workers, port)
server.start()

self.stdout.write("Server is listening port %s" % port)
self.stdout.write("gRPC server is listening port %s" % port)

if kwargs['list_handlers'] is True:
self.stdout.write("Registered handlers:")
for handler in extract_handlers(server):
self.stdout.write("* %s" % handler)

server.wait_for_termination()

def _serve_async(self, max_workers, port, *args, **kwargs):
self.stdout.write("gRPC async server starting at %s" % datetime.datetime.now())

server = create_server(max_workers, port)

async def _main_routine():
await server.start()
self.stdout.write("gRPC async server is listening port %s" % port)

if kwargs['list_handlers'] is True:
self.stdout.write("Registered handlers:")
for handler in extract_handlers(server):
self.stdout.write("* %s" % handler)

await server.wait_for_termination()

asyncio.get_event_loop().run_until_complete(_main_routine())
26 changes: 19 additions & 7 deletions django_grpc/utils.py
Expand Up @@ -3,7 +3,10 @@
from concurrent import futures

import grpc

from django.utils.module_loading import import_string
from django_grpc.signals.wrapper import SignalWrapper
from django.conf import settings

from django_grpc.signals.wrapper import SignalWrapper

Expand All @@ -12,21 +15,30 @@


def create_server(max_workers, port, interceptors=None):
from django.conf import settings
config = getattr(settings, 'GRPCSERVER', dict())
servicers_list = config.get('servicers', []) # callbacks to add servicers to the server
interceptors = load_interceptors(config.get('interceptors', []))
maximum_concurrent_rpcs = config.get('maximum_concurrent_rpcs', None)
options = config.get('options', [])
credentials = config.get('credentials', None)
is_async = config.get('async', False)


# create a gRPC server
server = grpc.server(
thread_pool=futures.ThreadPoolExecutor(max_workers=max_workers),
interceptors=interceptors,
maximum_concurrent_rpcs=maximum_concurrent_rpcs,
options=options
)
if is_async is True:
server = grpc.aio.server(
interceptors=interceptors,
maximum_concurrent_rpcs=maximum_concurrent_rpcs,
options=options
)
else:
server = grpc.server(
thread_pool=futures.ThreadPoolExecutor(max_workers=max_workers),
interceptors=interceptors,
maximum_concurrent_rpcs=maximum_concurrent_rpcs,
options=options
)

add_servicers(server, servicers_list)

if credentials is None:
Expand Down
6 changes: 3 additions & 3 deletions requirements.txt
@@ -1,6 +1,6 @@
django>=2.0

# gRPC
protobuf==3.15.0
grpcio==1.25.0
grpcio-tools==1.25.0
protobuf==4.21.4
grpcio==1.47.0
grpcio-tools==1.47.0

0 comments on commit fcea029

Please sign in to comment.