Skip to content
This repository was archived by the owner on Mar 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Dockerfile
**/*_pb2_*
12 changes: 12 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[flake8]
# Recommend matching the black line length (default 88),
# rather than using the flake8 default of 79:
max-line-length = 88
extend-ignore =
# See https://github.com/PyCQA/pycodestyle/issues/373
E203,
exclude =
**/*_pb2.py
**/*_pb2_grpc.py
venv
build
12 changes: 12 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,15 @@
/docs/project/project/
/docs/project/target/
/docs/target/
venv
/.venv/
/dist/
*_pb2.py
*_pb2_grpc.py
/build
**/*.egg*
*.pyc
*.iml
protobuf/frontend
protobuf/protocol
protobuf/example/shoppingcart
8 changes: 7 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@ language: python

python:
- "3.8"
services:
- docker

#before_install:
# - docker pull cloudstateio/cloudstate-proxy-dev-mode:latest

jobs:
include:

- stage: build
install:
- pip install -r requirements.txt
script: pytest
- pip install .
script: pytest --import-mode=importlib
deploy:
provider: pypi
username: "__token__"
Expand Down
22 changes: 22 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM python:3.8.0-slim

WORKDIR /python-support

RUN apt-get update && \
apt-get -y upgrade && \
apt-get install -y curl --no-install-recommends
COPY ./requirements.txt /python-support/requirements.txt
RUN pip install -r /python-support/requirements.txt
COPY ./scripts /python-support/scripts
COPY ./protobuf /python-support/protobuf
COPY ./cloudstate /python-support/cloudstate
COPY ./setup.py /python-support/setup.py
COPY ./Description.md /python-support/Description.md


RUN pip install . -vvv

WORKDIR /
ENTRYPOINT ["python", "-m", "cloudstate.test.tck_services"]

EXPOSE 8080
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,10 @@ python setup.py bdist_wheel

### local install
```
python -m pip install dist/cloudstate-0.1.0-py3-none-any.whl
python -m pip install dist/cloudstate-<the version>-py3-none-any.whl
```

### build and run tck, including provisional tests for stateless functions.
```
./extended_tck.sh
```
2 changes: 1 addition & 1 deletion cloudstate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
from .version import __version__

__all__ = [
'__version__',
"__version__",
]
73 changes: 48 additions & 25 deletions cloudstate/cloudstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,43 @@
Copyright 2020 Lightbend Inc.
Licensed under the Apache License, Version 2.0.
"""
from typing import Optional

from dataclasses import (dataclass, field)
from typing import List
import logging
import multiprocessing
import os

from concurrent import futures
from dataclasses import dataclass, field
from typing import List, Optional

import grpc

from cloudstate.evensourced_servicer import CloudStateEventSourcedServicer
from cloudstate.event_sourced_entity import EventSourcedEntity
from cloudstate.discovery_servicer import CloudStateEntityDiscoveryServicer
from cloudstate.entity_pb2_grpc import add_EntityDiscoveryServicer_to_server

import logging
import multiprocessing

from cloudstate.event_sourced_entity import EventSourcedEntity
from cloudstate.event_sourced_pb2_grpc import add_EventSourcedServicer_to_server
from cloudstate.eventsourced_servicer import CloudStateEventSourcedServicer
from cloudstate.function_pb2_grpc import add_StatelessFunctionServicer_to_server
from cloudstate.function_servicer import CloudStateStatelessFunctionServicer
from cloudstate.stateless_function_entity import StatelessFunction


@dataclass
class CloudState:
logging.basicConfig(format='%(asctime)s - %(filename)s - %(levelname)s: %(message)s', level=logging.INFO)
logging.root.setLevel(logging.NOTSET)

__address: str = ''
__host = '127.0.0.1'
__port = '8080'
logging.basicConfig(
format="%(asctime)s - %(filename)s - %(levelname)s: %(message)s",
level=logging.DEBUG,
)
logging.root.setLevel(logging.DEBUG)

__address: str = ""
__host = "127.0.0.1"
__port = "8080"
__workers = multiprocessing.cpu_count()
__event_sourced_entities: List[EventSourcedEntity] = field(default_factory=list)
__stateless_function_entities: List[StatelessFunction] = field(default_factory=list)

def host(self, address: str):
"""Set the address of the network Host.
Default Address is 127.0.0.1.
Default Address is 127.0.0.1.
"""
self.__host = address
return self
Expand All @@ -48,7 +52,7 @@ def port(self, port: str):

def max_workers(self, workers: Optional[int] = multiprocessing.cpu_count()):
"""Set the gRPC Server number of Workers.
Default is equal than number of CPU Cores in the machine.
Default is equal to the number of CPU Cores in the machine.
"""
self.__workers = workers
return self
Expand All @@ -58,20 +62,39 @@ def register_event_sourced_entity(self, entity: EventSourcedEntity):
self.__event_sourced_entities.append(entity)
return self

def register_stateless_function_entity(self, entity: StatelessFunction):
"""Registry the user Stateless Function entity."""
self.__stateless_function_entities.append(entity)
return self

def start(self):
"""Start the user function and gRPC Server."""

self.__address = '{}:{}'.format(os.environ.get('HOST', self.__host), os.environ.get('PORT', self.__port))
self.__address = "{}:{}".format(
os.environ.get("HOST", self.__host), os.environ.get("PORT", self.__port)
)

server = grpc.server(futures.ThreadPoolExecutor(max_workers=self.__workers))
add_EntityDiscoveryServicer_to_server(CloudStateEntityDiscoveryServicer(self.__event_sourced_entities), server)
add_EventSourcedServicer_to_server(CloudStateEventSourcedServicer(self.__event_sourced_entities), server)

logging.info('Starting Cloudstate on address %s', self.__address)
# event sourced
add_EntityDiscoveryServicer_to_server(
CloudStateEntityDiscoveryServicer(
self.__event_sourced_entities, self.__stateless_function_entities
),
server,
)
add_EventSourcedServicer_to_server(
CloudStateEventSourcedServicer(self.__event_sourced_entities), server
)
add_StatelessFunctionServicer_to_server(
CloudStateStatelessFunctionServicer(self.__stateless_function_entities),
server,
)
logging.info("Starting Cloudstate on address %s", self.__address)
try:
server.add_insecure_port(self.__address)
server.start()
except IOError as e:
logging.error('Error on start Cloudstate %s', e.__cause__)
logging.error("Error on start Cloudstate %s", e.__cause__)

server.wait_for_termination()
return server
16 changes: 11 additions & 5 deletions cloudstate/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@
Licensed under the Apache License, Version 2.0.
"""

from dataclasses import dataclass, field
from typing import List

from cloudstate.entity_pb2 import ClientAction, Failure, Reply, Forward, SideEffect
from cloudstate.entity_pb2 import ClientAction, Failure, Forward, Reply, SideEffect


class Context:
"""Root class of all contexts."""

pass


class ClientActionContext(Context):
"""Context that provides client actions, which include failing and forwarding.
These contexts are typically made available in response to commands."""

def __init__(self,command_id: int):
def __init__(self, command_id: int):
self.command_id: int = command_id
self.errors: List[str] = []
self.effects:List[SideEffect] = []
self.effects: List[SideEffect] = []
self.forward: Forward = None

def fail(self, error_message: str):
Expand All @@ -38,9 +38,15 @@ def create_client_action(self, result, allow_reply):
failure.command_id = self.command_id
failure.description = str(self.errors)
client_action.failure.CopyFrom(failure)

return client_action

elif result:
if self.forward:
raise Exception("Both a reply was returned, and a forward message was sent, choose one or the other.")
raise Exception(
"Both a reply was returned, and a forward message was sent, "
"choose one or the other."
)
else:
reply = Reply()
reply.payload.Pack(result)
Expand Down
80 changes: 64 additions & 16 deletions cloudstate/discovery_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,109 @@

import platform
from dataclasses import dataclass
from logging import getLogger
from pprint import pprint
from typing import List

from google.protobuf.descriptor_pb2 import FileDescriptorSet, FileDescriptorProto
from google.protobuf.descriptor_pb2 import FileDescriptorProto, FileDescriptorSet
from google.protobuf.descriptor_pool import Default
from google.protobuf.empty_pb2 import Empty

from cloudstate import entity_pb2
from cloudstate.entity_pb2_grpc import EntityDiscoveryServicer
from cloudstate.event_sourced_entity import EventSourcedEntity
from cloudstate.stateless_function_entity import StatelessFunction

logger = getLogger()


@dataclass
class CloudStateEntityDiscoveryServicer(EntityDiscoveryServicer):
event_sourced_entities: List[EventSourcedEntity]
stateless_function_entities: List[StatelessFunction]

def discover(self, request, context):
logger.info("discovering.")
pprint(request)
descriptor_set = FileDescriptorSet()
for entity in self.event_sourced_entities:
for entity in self.event_sourced_entities + self.stateless_function_entities:
logger.info(f"entity: {entity.name()}")
for descriptor in entity.file_descriptors:
descriptor_set.file.append(FileDescriptorProto.FromString(descriptor.serialized_pb))
logger.info(f"discovering {descriptor.name}")
logger.info(f"SD: {entity.service_descriptor.full_name}")
from_string = FileDescriptorProto.FromString(descriptor.serialized_pb)
descriptor_set.file.append(from_string)

descriptor_set.file.append(
FileDescriptorProto.FromString(
Default().FindFileByName("google/protobuf/empty.proto").serialized_pb
)
)
descriptor_set.file.append(
FileDescriptorProto.FromString(
Default().FindFileByName("cloudstate/entity_key.proto").serialized_pb
)
)
descriptor_set.file.append(
FileDescriptorProto.FromString(
Default().FindFileByName("cloudstate/eventing.proto").serialized_pb
)
)
descriptor_set.file.append(
FileDescriptorProto.FromString(Default().FindFileByName('google/protobuf/empty.proto').serialized_pb)
FileDescriptorProto.FromString(
Default()
.FindFileByName("google/protobuf/descriptor.proto")
.serialized_pb
)
)
descriptor_set.file.append(
FileDescriptorProto.FromString(Default().FindFileByName('cloudstate/entity_key.proto').serialized_pb)
FileDescriptorProto.FromString(
Default().FindFileByName("google/api/annotations.proto").serialized_pb
)
)
descriptor_set.file.append(
FileDescriptorProto.FromString(Default().FindFileByName('google/protobuf/descriptor.proto').serialized_pb)
FileDescriptorProto.FromString(
Default().FindFileByName("google/api/http.proto").serialized_pb
)
)
descriptor_set.file.append(
FileDescriptorProto.FromString(Default().FindFileByName('google/api/annotations.proto').serialized_pb)
FileDescriptorProto.FromString(
Default().FindFileByName("google/api/httpbody.proto").serialized_pb
)
)
descriptor_set.file.append(
FileDescriptorProto.FromString(Default().FindFileByName('google/api/http.proto').serialized_pb)
FileDescriptorProto.FromString(
Default().FindFileByName("google/protobuf/any.proto").serialized_pb
)
)
spec = entity_pb2.EntitySpec(
service_info=entity_pb2.ServiceInfo(
service_version='0.1.0',
service_runtime='Python ' + platform.python_version() + ' [' + platform.python_implementation() + ' ' +
platform.python_compiler() + ']',
support_library_name='cloudstate-python-support',
support_library_version='0.1.0'
service_name="",
service_version="0.1.0",
service_runtime="Python "
+ platform.python_version()
+ " ["
+ platform.python_implementation()
+ " "
+ platform.python_compiler()
+ "]",
support_library_name="cloudstate-python-support",
support_library_version="0.1.0",
),
entities=[
entity_pb2.Entity(
entity_type=entity.entity_type(),
service_name=entity.service_descriptor.full_name,
persistence_id=entity.persistence_id,
)
for entity in self.event_sourced_entities],
proto=descriptor_set.SerializeToString()
for entity in self.event_sourced_entities
+ self.stateless_function_entities
],
proto=descriptor_set.SerializeToString(),
)
return spec

def reportError(self, request, context):
logger.error(f"Report error: {request}")
pprint(request)
return
return Empty()
Loading