-
Notifications
You must be signed in to change notification settings - Fork 722
/
context.go
95 lines (85 loc) · 3.64 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package sensors
/*
Copyright 2020 BlackRock, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"net/http"
"time"
eventhubs "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Shopify/sarama"
"github.com/apache/openwhisk-client-go/whisk"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/aws/aws-sdk-go/service/lambda"
natslib "github.com/nats-io/go-nats"
"google.golang.org/grpc"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
sensormetrics "github.com/argoproj/argo-events/metrics"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)
// SensorContext contains execution context for Sensor
type SensorContext struct {
// kubeClient is the kubernetes client
kubeClient kubernetes.Interface
// dynamic clients.
dynamicClient dynamic.Interface
// Sensor object
sensor *v1alpha1.Sensor
// EventBus config
eventBusConfig *eventbusv1alpha1.BusConfig
// EventBus subject
eventBusSubject string
hostname string
// httpClients holds the reference to HTTP clients for HTTP triggers.
httpClients map[string]*http.Client
// customTriggerClients holds the references to the gRPC clients for the custom trigger servers
customTriggerClients map[string]*grpc.ClientConn
// http client to send slack messages.
slackHTTPClient *http.Client
// kafkaProducers holds references to the active kafka producers
kafkaProducers map[string]sarama.AsyncProducer
// pulsarProducers holds references to the active pulsar producers
pulsarProducers map[string]pulsar.Producer
// natsConnections holds the references to the active nats connections.
natsConnections map[string]*natslib.Conn
// awsLambdaClients holds the references to active AWS Lambda clients.
awsLambdaClients map[string]*lambda.Lambda
// openwhiskClients holds the references to active OpenWhisk clients.
openwhiskClients map[string]*whisk.Client
// azureEventHubsClients holds the references to active Azure Event Hub clients.
azureEventHubsClients map[string]*eventhubs.Hub
metrics *sensormetrics.Metrics
}
// NewSensorContext returns a new sensor execution context.
func NewSensorContext(kubeClient kubernetes.Interface, dynamicClient dynamic.Interface, sensor *v1alpha1.Sensor, eventBusConfig *eventbusv1alpha1.BusConfig, eventBusSubject, hostname string, metrics *sensormetrics.Metrics) *SensorContext {
return &SensorContext{
kubeClient: kubeClient,
dynamicClient: dynamicClient,
sensor: sensor,
eventBusConfig: eventBusConfig,
eventBusSubject: eventBusSubject,
hostname: hostname,
httpClients: make(map[string]*http.Client),
customTriggerClients: make(map[string]*grpc.ClientConn),
slackHTTPClient: &http.Client{
Timeout: time.Minute * 5,
},
kafkaProducers: make(map[string]sarama.AsyncProducer),
pulsarProducers: make(map[string]pulsar.Producer),
natsConnections: make(map[string]*natslib.Conn),
awsLambdaClients: make(map[string]*lambda.Lambda),
openwhiskClients: make(map[string]*whisk.Client),
azureEventHubsClients: make(map[string]*eventhubs.Hub),
metrics: metrics,
}
}