Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Judgment logic for events in Trigger #97

Closed
Tracked by #87
tpiperatgod opened this issue Aug 12, 2021 · 4 comments · Fixed by #99
Closed
Tracked by #87

Judgment logic for events in Trigger #97

tpiperatgod opened this issue Aug 12, 2021 · 4 comments · Fixed by #99
Assignees
Labels
enhancement New feature or request

Comments

@tpiperatgod
Copy link
Member

No description provided.

@tpiperatgod
Copy link
Member Author

/assign @tpiperatgod

@tpiperatgod tpiperatgod self-assigned this Aug 12, 2021
@tpiperatgod tpiperatgod added the enhancement New feature or request label Aug 12, 2021
@tpiperatgod
Copy link
Member Author

Added the events-handlers repo

@tpiperatgod
Copy link
Member Author

Overview

Conditions performs a logical operation on input.name to determine whether to trigger the subscriber based on the result of the operation.

Design

Struct

TriggerEnvConfig is used to receive and process events passed by the OpenFunction events framework.

TriggerMgr is used to store informations during the event triggering process, providing the ability for condition determination.

type TriggerEnvConfig struct {
	BusComponent string                 `json:"busComponent"`
	Inputs       []*Input               `json:"busTopic,omitempty"`
	Subscribers  map[string]*Subscriber `json:"subscribers,omitempty"`
	Port         string                 `json:"port,omitempty"`
}

type Input struct {
	Name        string `json:"name"`
	Namespace   string `json:"namespace,omitempty"`
	EventSource string `json:"eventSource"`
	Event       string `json:"event"`
}

type Subscriber struct {
	SinkComponent   string `json:"sinkComponent,omitempty"`
	DLSinkComponent string `json:"deadLetterSinkComponent,omitempty"`
	Topic           string `json:"topic,omitempty"`
	DLTopic         string `json:"deadLetterTopic,omitempty"`
}

type TriggerMgr struct {
  // key: topic, value: *InputStatus
	TopicInputMap          *sync.Map
	TopicEventMap          map[string]chan *common.TopicEvent
	CelEnv                 *cel.Env
  // key: condition, value: *Subscriber
	ConditionSubscriberMap *sync.Map
}

type InputStatus struct {
	Name        string
	LastMsgTime int64
	LastEvent   *common.TopicEvent
	Status      bool
}

Logic

  1. topic is associated with input on a one-to-one basis
  2. Create a goroutine and a channel for each input (i.e. topic)
  3. Incoming events are sent to the channel corresponding to the topic
  4. When the channel receives an event, it will:
    1. Reset timer ticker (Default 60s Timeout)
    2. Update the status of the input to true
    3. Check if any condition matches at this point
      1. If there is a matched condition, then get the subscriber configuration corresponding to the condition
      2. Sending event to the final function by the subscriber configuration
  5. Reset input status to false when timer ticker is end up

Sample

Suppose we have defined a Trigger according to the following configuration:

apiVersion: events.openfunction.io/v1alpha1
kind: Trigger
metadata:
  name: trigger-a
spec:
  eventBus: "default"
  inputs:
    - name: "A"
      eventSourceName: "my-es-a"
      eventName: "event-a"
    - name: "B"
      eventSourceName: "my-es-b"
      eventName: "event-b"
  subscribers:
  - condition: A || B
    sink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: function-sample-serving-ksvc
        namespace: default
  - condition: A && B
    topic: "metrics"

According to the topic naming rules in the event bus ({namespace}-{eventSourceName}-{eventName}), the topic names used for the two inputs are as follows:

Input name: A -> Topic name: default-my-es-a-event-a

Input name: B -> Topic name: default-my-es-b-event-b

Initilization

triggerManager.TopicInputMap:

&sync.map{
    "default-my-es-a-event-a": &InputStatus{
        LastMsgTime: 0, 
        LastEvent: nil, 
        Status: false, 
        Name: "A",
    },
    "default-my-es-b-event-b": &InputStatus{
        LastMsgTime: 0, 
        LastEvent: nil, 
        Status: false, 
        Name: "B",
    }
}

triggerManager.ConditionSubscriberMap:

&sync.map{
    "A || B": &Subscriber{
        SinkComponent: "http-sink", 
    },
    "A && B": &Subscriber{
        Topic: "metrics",
    }
}

When events incoming

Input A receives an event

triggerManager.TopicInputMap:

&sync.map{
    "default-my-es-a-event-a": &InputStatus{
        LastMsgTime: time.Now().Unix(), 
        LastEvent: event1, 
        Status: true, 
        Name: "A",
    },
    "default-my-es-b-event-b": &InputStatus{
        LastMsgTime: 0, 
        LastEvent: nil, 
        Status: false, 
        Name: "B",
    }
}

The condition "A || B" will be matched by cel and the event event1 will be sent to "http-sink".

Input B receives an event in 60s

triggerManager.TopicInputMap:

&sync.map{
    "default-my-es-a-event-a": &InputStatus{
        LastMsgTime: <Time when event1 is received>, 
        LastEvent: event1, 
        Status: true, 
        Name: "A",
    },
    "default-my-es-b-event-b": &InputStatus{
        LastMsgTime: time.Now().Unix(), 
        LastEvent: event2, 
        Status: true, 
        Name: "B",
    }
}

After cel has determined the condition, the "A || B" condition and "A && B" will both be matched and the events event1 and event2 will be sent to "http-sink" and "metrics"

Input B receives an event after 60s but Input A does not

triggerManager.TopicInputMap:

&sync.map{
    "default-my-es-a-event-a": &InputStatus{
        LastMsgTime: 0, 
        LastEvent: nil, 
        Status: false, 
        Name: "A",
    },
    "default-my-es-b-event-b": &InputStatus{
        LastMsgTime: time.Now().Unix(), 
        LastEvent: event2, 
        Status: true, 
        Name: "B",
    }
}

The condition "A || B" will be matched by cel and the event2 will be sent to "http-sink"

Performance

Add the domain name of the nats server to the ip address mapping in /etc/hosts on the node:

# nats
<svc address> nats.default nats-0.nats.default.svc.cluster.local

Import the configuration:

export CONFIG="eyJidXNDb21wb25lbnQiOiJ0cmlnZ2VyIiwiYnVzVG9waWMiOlt7Im5hbWUiOiJBIiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImV2ZW50U291cmNlIjoibXktZXZlbnRzb3VyY2UiLCJldmVudCI6InNhbXBsZS1vbmUifSx7Im5hbWUiOiJCIiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImV2ZW50U291cmNlIjoibXktZXZlbnRzb3VyY2UiLCJldmVudCI6InNhbXBsZS10d28ifV0sInN1YnNjcmliZXJzIjp7IkEgXHUwMDI2XHUwMDI2IEIiOnsidG9waWMiOiJtZXRyaWNzIn0sIkEgfHwgQiI6eyJzaW5rQ29tcG9uZW50IjoiaHR0cC1zaW5rIn19LCJwb3J0IjoiNTA1MCJ9"

Clone OpenFunction/events-handlers to local and go to trigger/handler.

Start the program using the dapr command line, specifying the --profile-port and --enable-profiling:

dapr run --app-id trigger-handler --enable-profiling --profile-port 7777 --app-protocol grpc --app-port 5050 --components-path ../example/deploy/ go run ./main.go

Now that the connection has been established, we can use pprof to profile the Dapr runtime.

The following example will create a cpu.pprof file containing samples from a profile session that lasts 120 seconds:

curl "http://localhost:7777/debug/pprof/profile?seconds=120" > cpu.pprof

Use the following command to display the profile (You need to install graphviz first) :

go tool pprof -http=":8081" cpu.pprof

You can refer to Profiling & Debugging to learn more.

@tpiperatgod
Copy link
Member Author

pprof result of heap alloc-objects.pdf

@tpiperatgod tpiperatgod linked a pull request Aug 17, 2021 that will close this issue
sachinparihar pushed a commit to sachinparihar/OpenFunction that referenced this issue Nov 22, 2022
xwm1992 pushed a commit to xwm1992/OpenFunction that referenced this issue Aug 14, 2023
Previously we had been running conformance tests for background
function signature types. This enables these tests in the conformance
github action.

Running the tests uncovered one bug in which JSON fiels with null values
were being dropped from the data param passed to RawBackgroundFunctions.
Fixing this is a non-breaking change so I bumped the minor verison of
the maven package.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant