Skip to content

Commit

Permalink
Basic working version of Kafka integration
Browse files Browse the repository at this point in the history
  • Loading branch information
vishal-biyani committed Jul 30, 2018
1 parent 04c6b20 commit b0e92b2
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 10 deletions.
7 changes: 3 additions & 4 deletions charts/fission-all/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,8 @@ spec:
# args: ["proxy", "--port", "8001", "--address", "127.0.0.1"]
# serviceAccount: fission-svc


{{- if .Values.messagequeues.nats.enabled }}
---
{{- if .Values.nats.enabled }}
{{- if eq .Values.messageQueue.type "nats-streaming" }}
apiVersion: extensions/v1beta1
kind: Deployment
Expand Down Expand Up @@ -598,8 +597,8 @@ spec:
{{- end }}
serviceAccount: fission-svc
{{- end }}
{{- if .Values.kafka.enabled }}
---
{{- if .Values.kafka.enabled }}
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
Expand All @@ -624,7 +623,7 @@ spec:
- name: MESSAGE_QUEUE_TYPE
value: kafka
- name: MESSAGE_QUEUE_URL
value: "{{.Values.messagequeues.kafka.brokers}}"
value: "{{.Values.kafka.brokers}}"
serviceAccount: fission-svc
{{- end }}
---
Expand Down
5 changes: 3 additions & 2 deletions charts/fission-all/templates/svc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ spec:
selector:
svc: controller

{{- if .Values.messagequeues.nats.enabled }}
---
{{- if .Values.nats.enabled }}
{{- if eq .Values.messageQueue.type "nats-streaming" }}
apiVersion: v1
kind: Service
Expand All @@ -58,6 +58,7 @@ spec:
selector:
svc: nats-streaming
{{- end }}
{{- end }}
---
apiVersion: v1
kind: Service
Expand All @@ -73,4 +74,4 @@ spec:
- port: 80
targetPort: 8000
selector:
svc: storagesvc
svc: storagesvc
4 changes: 2 additions & 2 deletions charts/fission-all/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ azureStorageQueue:
accountName: ""

kafka:
enabled: false
brokers: 'kafka:9092'
enabled: true
brokers: 'broker.kafka:9092'
## Persist data to a persistent volume.
persistence:
enabled: true
Expand Down
2 changes: 1 addition & 1 deletion mqtrigger/messageQueue/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func isTopicValidForKafka(topic string) bool {

func (kafka Kafka) subscribe(trigger *crd.MessageQueueTrigger) (messageQueueSubscription, error) {
log.Infof("Inside kakfa subscribe", trigger)
log.Infof("borkers set to ", kafka.brokers)
log.Infof("brokers set to ", kafka.brokers)

// Create new consumer
consumerConfig := cluster.NewConfig()
Expand Down
4 changes: 3 additions & 1 deletion pkg/apis/fission.io/v1/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ func IsTopicValid(mqType MessageQueueType, topic string) bool {
return nsUtil.IsChannelNameValid(topic, false)
case MessageQueueTypeASQ:
return len(topic) >= 3 && len(topic) <= 63 && validAzureQueueName.MatchString(topic)
case MessageQueueTypeKafka:
return nsUtil.IsChannelNameValid(topic, false)
}
return false
}
Expand Down Expand Up @@ -430,7 +432,7 @@ func (spec MessageQueueTriggerSpec) Validate() error {
result = multierror.Append(result, spec.FunctionReference.Validate())

switch spec.MessageQueueType {
case MessageQueueTypeNats, MessageQueueTypeASQ: // no op
case MessageQueueTypeNats, MessageQueueTypeASQ, MessageQueueTypeKafka: // no op
default:
result = multierror.Append(result, MakeValidationErr(ErrorUnsupportedType, "MessageQueueTriggerSpec.MessageQueueType", spec.MessageQueueType, "not a supported message queue type"))
}
Expand Down

0 comments on commit b0e92b2

Please sign in to comment.