New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka integration #831

Merged
merged 14 commits into from Oct 1, 2018

Conversation

Projects
None yet
5 participants
@vishal-biyani
Collaborator

vishal-biyani commented Jul 30, 2018

This change is Reviewable

@vishal-biyani vishal-biyani force-pushed the kafka_integration branch from b0e92b2 to f3f4888 Aug 9, 2018

@vishal-biyani vishal-biyani modified the milestones: 0.11.0, 0.10.0 Aug 9, 2018

@vishal-biyani vishal-biyani changed the title from [WIP] Kafka integration to Kafka integration Aug 9, 2018

@vishal-biyani vishal-biyani modified the milestones: 0.10.0, 0.11.0 Aug 9, 2018

@life1347

Reviewed 1 of 5 files at r1, 2 of 8 files at r2, 4 of 7 files at r3.
Reviewable status: 7 of 12 files reviewed, 18 unresolved discussions (waiting on @vishal-biyani and @life1347)


glide.lock, line 1 at r3 (raw file):

<<<<<<< HEAD

need to fix this


charts/fission-all/values.yaml, line 71 at r3 (raw file):

kafka:
  enabled: true
  brokers: 'broker.kafka:9092'

The value is different from charts/README.md, is it intentional?


charts/fission-all/templates/deployment.yaml, line 532 at r3 (raw file):

---
{{- if .Values.nats.enabled }}

need to add the configuration to values.yaml


charts/fission-all/templates/deployment.yaml, line 533 at r3 (raw file):

---
{{- if .Values.nats.enabled }}
{{- if eq .Values.messageQueue.type "nats-streaming" }}

remove this line and related config in values.yaml since we can use enabledinstead


charts/fission-all/templates/deployment.yaml, line 551 at r3 (raw file):

        image: nats-streaming
        args: [
          "--cluster_id", "{{ .Values.messagequeues.nats.clusterID }}",

it should be .Values.nats.clusterID and .Values.nats.authToken


charts/fission-all/templates/deployment.yaml, line 566 at r3 (raw file):

kind: Deployment
metadata:
  name: mqtrigger-nats-streaming

good change! This way user can access different type of message queues at the same time.


charts/fission-all/templates/deployment.yaml, line 590 at r3 (raw file):

          value: nats://{{ .Values.nats.authToken }}@nats-streaming:4222
        {{- else if eq .Values.messageQueue.type "azure-storage-queue" }}
        - name: AZURE_STORAGE_ACCOUNT_NAME

need to create another deployment for azure.


charts/fission-all/templates/svc.yaml, line 41 at r3 (raw file):

---
{{- if .Values.nats.enabled }}

Need to add svc for kafka


mqtrigger/messageQueue/kafka.go, line 26 at r3 (raw file):

	sarama "github.com/Shopify/sarama"
	cluster "github.com/bsm/sarama-cluster"
	"github.com/fission/fission"

sort import


mqtrigger/messageQueue/kafka.go, line 94 at r3 (raw file):

		for msg := range consumer.Messages() {
			log.Infof("Calling message handler with value " + string(msg.Value[:]))
			if msgHandler1(&kafka, producer, trigger, string(msg.Value[:])) {

msgHandler"1" is that means we will have multiple msgHandler in future?


mqtrigger/messageQueue/kafka.go, line 114 at r3 (raw file):

	}

	url := kafka.routerUrl + "/" + strings.TrimPrefix(fission.UrlForFunction(trigger.Spec.FunctionReference.Name, "default"), "/")

instead of trigger.Spec.FunctionReference.Namespace?


mqtrigger/messageQueue/kafka.go, line 118 at r3 (raw file):

	headers := map[string]string{
		"X-Fission-MQTrigger-Topic":     trigger.Spec.Topic,
		"X-Fission-MQTrigger-RespTopic": trigger.Spec.ResponseTopic,

since nats support, we need to support error topic as well. X-Fission-MQTrigger-ErrorTopic.
I'm ok to open an issue to address this in other PR.


mqtrigger/messageQueue/kafka.go, line 122 at r3 (raw file):

	}
	// Create request
	req, err := http.NewRequest("POST", url, strings.NewReader(value))

need to handle the error


mqtrigger/messageQueue/kafka.go, line 126 at r3 (raw file):

		req.Header.Add(k, v)
	}
	// Make the request

add retry loop here trigger.Spec.MaxRetries. You can take a look at nats' implementation


mqtrigger/messageQueue/messageQueue.go, line 24 at r3 (raw file):

	"github.com/fission/fission"
	"github.com/fission/fission/crd"

sort import


mqtrigger/messageQueue/messageQueue.go, line 31 at r3 (raw file):

const (
	NATS  string = "nats-streaming"
	KAFKA string = "kafka"

use fv1.MessageQueueTypeNats instead


mqtrigger/messageQueue/messageQueue.go, line 234 at r3 (raw file):

	switch mqType {
	case NATS:
		return isTopicValidForNats(topic)

same above


pkg/apis/fission.io/v1/validation.go, line 168 at r3 (raw file):

		return len(topic) >= 3 && len(topic) <= 63 && validAzureQueueName.MatchString(topic)
	case MessageQueueTypeKafka:
		return nsUtil.IsChannelNameValid(topic, false)

need to fix this

@vishal-biyani

Reviewable status: 7 of 12 files reviewed, 18 unresolved discussions (waiting on @life1347 and @vishal-biyani)


glide.lock, line 1 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

need to fix this

Done.


charts/fission-all/values.yaml, line 71 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

The value is different from charts/README.md, is it intentional?

Done.


charts/fission-all/templates/deployment.yaml, line 532 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

need to add the configuration to values.yaml

Done.


charts/fission-all/templates/deployment.yaml, line 533 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

remove this line and related config in values.yaml since we can use enabledinstead

Yes, I figured out there is a lot of mixup in there. I have separated all 3 MQs - so that you can decide to deploy more than one of them at the same time.


charts/fission-all/templates/deployment.yaml, line 551 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

it should be .Values.nats.clusterID and .Values.nats.authToken

Done.


charts/fission-all/templates/deployment.yaml, line 566 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

good change! This way user can access different type of message queues at the same time.

Done.


charts/fission-all/templates/deployment.yaml, line 590 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

need to create another deployment for azure.

Done.


charts/fission-all/templates/svc.yaml, line 41 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

Need to add svc for kafka

This is for actual nats - and in case of Kafka we are not deploying it as a chart, so won't need a Svc. For MQTrigger there is no existing service - mostly because there is no API.


mqtrigger/messageQueue/kafka.go, line 26 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

sort import

Done.


mqtrigger/messageQueue/kafka.go, line 94 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

msgHandler"1" is that means we will have multiple msgHandler in future?

Fixed


mqtrigger/messageQueue/kafka.go, line 114 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

instead of trigger.Spec.FunctionReference.Namespace?

Done.


mqtrigger/messageQueue/kafka.go, line 118 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

since nats support, we need to support error topic as well. X-Fission-MQTrigger-ErrorTopic.
I'm ok to open an issue to address this in other PR.

Done.


mqtrigger/messageQueue/kafka.go, line 122 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

need to handle the error

Done.


mqtrigger/messageQueue/kafka.go, line 126 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

add retry loop here trigger.Spec.MaxRetries. You can take a look at nats' implementation

Done.


mqtrigger/messageQueue/messageQueue.go, line 24 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

sort import

Done.


mqtrigger/messageQueue/messageQueue.go, line 31 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

use fv1.MessageQueueTypeNats instead

Done.


mqtrigger/messageQueue/messageQueue.go, line 234 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

same above

Done.


pkg/apis/fission.io/v1/validation.go, line 168 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

need to fix this

I am sorry, what needs to be fixed here?

@vishal-biyani vishal-biyani force-pushed the kafka_integration branch from f3f4888 to c749770 Sep 11, 2018

@life1347

Reviewed 1 of 5 files at r1, 1 of 11 files at r4, 1 of 7 files at r5, 10 of 10 files at r6.
Reviewable status: all files reviewed, 6 unresolved discussions (waiting on @life1347 and @vishal-biyani)


charts/fission-all/templates/deployment.yaml, line 621 at r6 (raw file):

kind: Deployment
metadata:
  name: mqtrigger-nats-streaming

mqtrigger-azure-storage-queue or something else.


mqtrigger/messageQueue/kafka.go, line 26 at r3 (raw file):

Previously, vishal-biyani (Vishal) wrote…

Done.

fission import should be put at the end of imports


mqtrigger/messageQueue/kafka.go, line 114 at r3 (raw file):

Previously, vishal-biyani (Vishal) wrote…

Done.

Sorry, dunno what I was thinking at that time. it should be trigger.Metadata.Namespace.


mqtrigger/messageQueue/kafka.go, line 158 at r6 (raw file):

	defer resp.Body.Close()
	body, err := ioutil.ReadAll(resp.Body)
	log.Infof("Got response " + string(body))

We should send error message to trigger.Spec.ErrorTopic if err != nil || resp.StatusCode !=nil. Please refer how nats implements here


mqtrigger/messageQueue/messageQueue.go, line 24 at r3 (raw file):

Previously, vishal-biyani (Vishal) wrote…

Done.

fission imports should be put at bottom.


pkg/apis/fission.io/v1/validation.go, line 168 at r6 (raw file):

		return len(topic) >= 3 && len(topic) <= 63 && validAzureQueueName.MatchString(topic)
	case MessageQueueTypeKafka:
		return nsUtil.IsChannelNameValid(topic, false)

sorry, point to the wrong line.
nsUtil.IsChannelNameValid is for nats only.

@vishal-biyani

Reviewable status: all files reviewed, 6 unresolved discussions (waiting on @life1347 and @vishal-biyani)


charts/fission-all/templates/deployment.yaml, line 621 at r6 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

mqtrigger-azure-storage-queue or something else.

Done.


mqtrigger/messageQueue/kafka.go, line 26 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

fission import should be put at the end of imports

Done


mqtrigger/messageQueue/kafka.go, line 114 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

Sorry, dunno what I was thinking at that time. it should be trigger.Metadata.Namespace.

I think the function's namespace is correct? As we are trying to form the URL for function?


mqtrigger/messageQueue/kafka.go, line 158 at r6 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

We should send error message to trigger.Spec.ErrorTopic if err != nil || resp.StatusCode !=nil. Please refer how nats implements here

Done.


mqtrigger/messageQueue/messageQueue.go, line 24 at r3 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

fission imports should be put at bottom.

Done


pkg/apis/fission.io/v1/validation.go, line 168 at r6 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

sorry, point to the wrong line.
nsUtil.IsChannelNameValid is for nats only.

Yes, but right now that also serves as a good enough check for Kafka topic name, so I am reusing it for now.

@vishal-biyani vishal-biyani force-pushed the kafka_integration branch from a809a2f to c1bb8af Sep 17, 2018

@life1347

Reviewed 1 of 7 files at r8, 10 of 10 files at r9.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @life1347)


mqtrigger/messageQueue/kafka.go, line 114 at r3 (raw file):

Previously, vishal-biyani (Vishal) wrote…

I think the function's namespace is correct? As we are trying to form the URL for function?

Currently function reference only contains the function name

FunctionReference struct {
// Type indicates whether this function reference is by name or selector. For now,
// the only supported reference type is by name. Future reference types:
// * Function by label or annotation
// * Branch or tag of a versioned function
// * A "rolling upgrade" from one version of a function to another
Type FunctionReferenceType `json:"type"`
// Name of the function.
Name string `json:"name"`
}


pkg/apis/fission.io/v1/validation.go, line 168 at r6 (raw file):

Previously, vishal-biyani (Vishal) wrote…

Yes, but right now that also serves as a good enough check for Kafka topic name, so I am reusing it for now.

I see, how about adding some comment to explain this so that others will know its intentional?

@vishal-biyani

Reviewable status: 11 of 14 files reviewed, 1 unresolved discussion (waiting on @life1347 and @vishal-biyani)


pkg/apis/fission.io/v1/validation.go, line 168 at r6 (raw file):

Previously, life1347 (Ta-Ching Chen) wrote…

I see, how about adding some comment to explain this so that others will know its intentional?

Done.

@vishal-biyani vishal-biyani force-pushed the kafka_integration branch from 9fac8a1 to e038599 Sep 19, 2018

@vishal-biyani vishal-biyani force-pushed the kafka_integration branch 6 times, most recently from be96d5c to 128117e Sep 24, 2018

@vishal-biyani vishal-biyani force-pushed the kafka_integration branch from 128117e to d656e31 Oct 1, 2018

@vishal-biyani vishal-biyani merged commit 96cbee1 into master Oct 1, 2018

1 of 2 checks passed

code-review/reviewable 16 files, 1 discussion left (life1347, vishal-biyani)
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@cpeeyush

This comment has been minimized.

cpeeyush commented Oct 1, 2018

  • When can we expect 0.11.0 release?
  • Will, it going to support using any existing Kafka installation (Not on kubernetes) as a trigger of fission functions?
@vishal-biyani

This comment has been minimized.

Collaborator

vishal-biyani commented Oct 1, 2018

@cpeeyush The 0.11.0 release is coming in a day or so approx. You can use any Kafka cluster as long as it is reachable from Kubernetes cluster.

@cpeeyush

This comment has been minimized.

cpeeyush commented Oct 2, 2018

@cpeeyush The 0.11.0 release is coming in a day or so approx. You can use any Kafka cluster as long as it is reachable from Kubernetes cluster.

Thanks @vishal-biyani , I am looking forward to it.

@life1347 life1347 deleted the kafka_integration branch Oct 2, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment