Skip to content

Conversation

tpiperatgod
Copy link
Member

@tpiperatgod tpiperatgod commented Jul 22, 2021

What this PR do

In the proposal Add event framework we discuss the feasibility and implementation of the OpenFunction event framework. And in this PR, I have completed the basic functionality of the above event framework.

The specific changes are as follows:

  1. Added three basic CRDs:

    According to the design in the proposal, I added EventSource, EventBus and Trigger CRDs. Here are some examples:

    • EventSource

      apiVersion: event.openfunction.io/v1alpha1
      kind: EventSource
      metadata:
        # name of event source
        name: kafka-es
      spec:
        # kind of event source
        kafka:
          # name of event
          example-kafka:
            version: v1
            type: bindings.kafka
            metadata:
              - name: brokers
                value: dapr-kafka.kafka:9092
              - name: topics
                value: sample
              - name: consumerGroup
                value: group1
              - name: publishTopic
                value: sample
              - name: authRequired
                value: "false"
        # sink means that event source will call the target function synchronously
        # not setting sink means that events from the event source are sent to the event bus
        sink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: function-sample-serving-ksvc
            namespace: default
    • EventBus

      apiVersion: event.openfunction.io/v1alpha1
      kind: EventBus
      metadata:
        # name of event bus
        name: default
      spec:
        # kind of event bus, the default will be to use nats streaming
        nats:
          example-nats:
            version: v1
            type: pubsub.natsstreaming
            metadata:
              - name: natsURL
                value: "nats://nats.default:4222"
              - name: natsStreamingClusterID
                value: "stan"
              - name: subscriptionType
                value: queue
    • Trigger

      apiVersion: event.openfunction.io/v1alpha1
      kind: Trigger
      metadata:
        # name of trigger
        name: my-trigger
      spec:
        # name of EventBus CR
        eventBusName: default
        # an array of event source
        sources:
            # name of this source
          - name: "kafka"
            # name of EventSource CR
            eventSourceName: "kafka-es"
            # name of event
            eventName: "example-kafka"
        # an array of subscriber
        subscribers:
            # determination conditions for triggering events
          - condition: "pending"
            # sink means that trigger will call the target function synchronously
            sink:
              ref:
                apiVersion: serving.knative.dev/v1
                kind: Service
                name: function-sample-serving-ksvc
                namespace: default
            # send the filtered events to a specific topic of the event bus
            topic: "metrics"
  2. Added three corresponding controllers:

    • EventSource Controller flow
      1. Retrieve the configuration of the event bus based on the value of spec.eventBusNames (Used to connect event sources to event bus)
      2. Generate dapr component specs (source and sink) for the event source
      3. Create dapr component by using specs above
      4. Generate an event source configuration called SourceEnvConfig
      5. Create event source handler workload (by passing SourceEnvConfig)
    • EventBus Controller flow
      1. Generate dapr component specs for the event bus
      2. Create dapr component by using specs above
      3. Set the annotation to assign the name of the dapr component associated with the event bus to "component-name"
    • Trigger Controller flow
      1. Create dapr component spec for event bus
      2. Parse subscribers, create dapr component specs for sink and deadLetterSink
      3. Create dapr component by using specs above
      4. Generate an trigger configuration called TriggerEnvConfig
      5. Create trigger handler workload (by passing TriggerEnvConfig)

What can it currently do

  • Ability to handle nats streaming type EventBus
  • Ability to handle redis, kafka type EventSources
  • EventSources can trigger Knative Service synchronization calls
  • The EventSource can send events to the EventBus
  • Triggers can associate EventBus and then trigger target functions (including Knative Service function and OpenFuncAsync function)

TODO:

  • Improve the state management mechanism of EventSource and Trigger

    For example, when the EventBus associated with the EventSource is deleted, we need to change the status of the EventSource to "unavailable"

  • Judgment logic for events in Trigger

  • Abstraction of the specification of the dapr component in CRD

  • Using OpenFunction Serving to drive the workloads of EventSource and Trigger

  • Add more kinds of event sources, event buses and triggers

  • Add samples && concept docs

Signed-off-by: laminar fangtian@kubesphere.io

@benjaminhuo
Copy link
Member

This is a fantastic PR! @tpiperatgod

@tpiperatgod
Copy link
Member Author

I have modified the reconcile logic of EventSource and Trigger:

EventSource
Reconcile conditions:

  1. EventSource CR changed
  2. Associated EventBus CR changed
  3. Owned Components changed
  4. Owned Deployments changed

Controller flow:

  1. Generate a dapr component specification for the EventBus associated with the EventSource (if spec.eventBus is set) and create the dapr component (will check if it needs to be updated) and set the SourceConfig.EventBusComponentName, SourceConfig.EventBusTopic, SourceConfig.EventBusSpecEncode.
  2. Generate a dapr component specification for the Sink set in EventSource (if spec.sink is set) and create the dapr component (will check if it needs to be updated) and set the SourceConfig.SinkComponentName, SourceConfig.SinkSpecEncode.
  3. Generate dapr component specifications for the event sources and create the dapr components (will check if they need to be updated) and set the SourceConfig.EventSourceComponentName, SourceConfig.EventSourceTopic, SourceConfig.EventSourceSpecEncode.
  4. Generate SourceConfig and convert it to a base64-encoded string.
  5. Create an EventSource workload for each event source (will check if they need to be updated) and pass in the SourceConfig as an environment variable.

Trigger
Reconcile conditions:

  1. Trigger CR changed
  2. Associated EventBus CR changed
  3. Owned Components changed
  4. Owned Deployments changed

Controller flow:

  1. Generate a dapr component specification for the EventBus associated with the Trigger and create the dapr component (will check if it needs to be updated) and set the TriggerConfig.EventBusComponentName, TriggerConfig.EventBusTopic, TriggerConfig.EventBusSpecEncode.
  2. Generate dapr component specifications for the subscribers and create the dapr components (will check if they need to be updated) and set the TriggerConfig.Subscribers, TriggerConfig.SinkSpecEncode.
  3. Create a workload for Trigger (will check if they need to be updated) and pass in the TriggerConfig as an environment variable.

Signed-off-by: laminar <fangtian@kubesphere.io>
@tpiperatgod
Copy link
Member Author

New updates:

  1. EventSource can specify a topic name for each source, which is used to get events from the specified topic when using dapr pub/sub mode
  2. Send events to the topic of EventBus according to the following topic naming template:
    EventBusTopicNameTmpl => {namespace}-{eventSourceName}-{sourceKind}-{eventName}
  3. Similarly, the Trigger will fetch events from the corresponding EventBus topic according to the configuration in its spec.inputs

@tpiperatgod
Copy link
Member Author

Added cron and redis type event sources, now can be adapted to kafka, redis and cron three types of event sources

@benjaminhuo
Copy link
Member

  • EventBusTopicNameTmpl => {namespace}-{eventSourceName}-{sourceKind}-{eventName}

@tpiperatgod I think we can remove sourceKind like this, user needn't add kafka when sending to Kafka:
EventBusTopicNameTmpl => {namespace}-{eventSourceName}-{eventName}

@tpiperatgod
Copy link
Member Author

  • EventBusTopicNameTmpl => {namespace}-{eventSourceName}-{sourceKind}-{eventName}

@tpiperatgod I think we can remove sourceKind like this, user needn't add kafka when sending to Kafka:
EventBusTopicNameTmpl => {namespace}-{eventSourceName}-{eventName}

Agree

@tpiperatgod
Copy link
Member Author

ClusterEventBus CRD has been added to provide global event bus capability.

It (after changes) will trigger reconcile of EventSource and Trigger after confirming that there is no EventBus of the same name in the namespace.

Signed-off-by: laminar <fangtian@kubesphere.io>
@tpiperatgod
Copy link
Member Author

  1. Added logic to clear deprecated resources (components, workloads)
  2. Name each Kafka component with a separate consumerGroup name
  3. Added documentation

@benjaminhuo benjaminhuo merged commit d5ca805 into OpenFunction:main Aug 2, 2021
@tpiperatgod tpiperatgod deleted the add_events branch August 3, 2021 09:48
@tpiperatgod tpiperatgod linked an issue Aug 3, 2021 that may be closed by this pull request
6 tasks
sachinparihar pushed a commit to sachinparihar/OpenFunction that referenced this pull request Nov 22, 2022
xwm1992 pushed a commit to xwm1992/OpenFunction that referenced this pull request Aug 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a todo list for OpenFunction events framework
2 participants