From ebfc167c5816dcc4dc00b64db6bd80ffcb2c2192 Mon Sep 17 00:00:00 2001 From: Quan Phuong Date: Thu, 22 Jul 2021 23:52:52 +0700 Subject: [PATCH] Init project --- .gitignore | 2 + README.md | 251 ++++++++++++++++++++++++++++++++ acls.go | 70 +++++++++ acls_test.go | 121 ++++++++++++++++ authorize.go | 51 +++++++ authorize_test.go | 72 ++++++++++ clusters.go | 72 ++++++++++ clusters_test.go | 129 +++++++++++++++++ configs.go | 62 ++++++++ configs_test.go | 170 ++++++++++++++++++++++ confluent.go | 32 +++++ confluent_test.go | 27 ++++ example/README.md | 6 + example/main.go | 199 ++++++++++++++++++++++++++ go.mod | 8 ++ go.sum | 79 ++++++++++ helper.go | 145 +++++++++++++++++++ http_client.go | 64 +++++++++ kafka.go | 140 ++++++++++++++++++ kafka_client.go | 121 ++++++++++++++++ kafka_client_test.go | 99 +++++++++++++ login.go | 36 +++++ login_test.go | 57 ++++++++ partitions.go | 27 ++++ partitions_test.go | 124 ++++++++++++++++ rbac.go | 117 +++++++++++++++ rbac_test.go | 304 +++++++++++++++++++++++++++++++++++++++ topics.go | 334 +++++++++++++++++++++++++++++++++++++++++++ topics_test.go | 285 ++++++++++++++++++++++++++++++++++++ 29 files changed, 3204 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 acls.go create mode 100644 acls_test.go create mode 100644 authorize.go create mode 100644 authorize_test.go create mode 100644 clusters.go create mode 100644 clusters_test.go create mode 100644 configs.go create mode 100644 configs_test.go create mode 100644 confluent.go create mode 100644 confluent_test.go create mode 100644 example/README.md create mode 100644 example/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 helper.go create mode 100644 http_client.go create mode 100644 kafka.go create mode 100644 kafka_client.go create mode 100644 kafka_client_test.go create mode 100644 login.go create mode 100644 login_test.go create mode 100644 partitions.go create mode 100644 partitions_test.go create mode 100644 rbac.go create mode 100644 rbac_test.go create mode 100644 topics.go create mode 100644 topics_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a127117 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +example/certs/* +vendor \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..4e39402 --- /dev/null +++ b/README.md @@ -0,0 +1,251 @@ +# Confluent Platform Client + +- Maintainer: Quan Phuong +- Provide Go client for Confluent platform, reference [Confluent API document](https://docs.confluent.io/platform/current/kafka-rest/api.html) and [Confluent Metadata API document](https://docs.confluent.io/platform/current/security/rbac/mds-api.html) + +## Context + +We're running Confluent Platform (Not Confluent Cloud) that hosted on our infrastructure, and we can't find any clients that support us to integrate with Confluent Platform API. This project for our Confluent Platform with some configurations: + +- SASL Authentication with username + password (Support both basic auth and LDAP user) +- Kafka Brokers embeded MDS server +- RBAC enabled + +## Usages: + +### Installation + +``` +go get github.com/wayarmny/gonfluent +``` + +### Implementation example + +- I'm using 2 clients to connect with Confluent: HTTP clients and [Sarama Client](https://github.com/Shopify/sarama) to connect with 1 Confluent cluster, so that when you do initiate `Gonfluent`, you need to initiate 2 authentications methods. + +``` +package main + +import ( + "fmt" + "os" + + confluent "github.com/wayarmy/gonfluent" +) + +const ( + ClientVersion = "0.1" + UserAgent = "confluent-client-go-sdk-" + ClientVersion +) + +func main() { + baseUrl := "https://localhost:8090" + + username := os.Getenv("CONFLUENT_USER") + password := os.Getenv("CONFLUENT_PASSWORD") + + // Initialize the client + httpClient := confluent.NewDefaultHttpClient(baseUrl, username, password) + httpClient.UserAgent = UserAgent + bootstrapServer := []string{ + "localhost:9093", + } + kConfig := &confluent.Config{ + BootstrapServers: &bootstrapServer, + CACert: "certs/ca.pem", + ClientCert: "certs/cert.pem", + ClientCertKey: "certs/key.pem", + SkipTLSVerify: true, + SASLMechanism: "plain", + TLSEnabled: true, + Timeout: 120, + } + kClient, err := confluent.NewSaramaClient(kConfig) + if err != nil { + panic(err) + } + + client := confluent.NewClient(httpClient, kClient) + bearerToken, err := client.Login() + if err != nil { + panic(err) + } + httpClient.Token = bearerToken + + // Get the list of clusters in Confluent platform + listCluster, err := client.ListKafkaCluster() + if err != nil { + panic(err) + } + + fmt.Println(listCluster) + + // Get the cluster information + clusterId := listCluster[0].ClusterID + cluster, err := client.GetKafkaCluster(clusterId) + if err != nil { + panic(err) + } + fmt.Printf("%#v", cluster) + fmt.Println(cluster) + // + //topicConfig, err := client.GetTopicConfigs(clusterId, "example_topic_name") + //if err != nil { + // panic(err) + //} + //fmt.Printf("%#v", topicConfig) + + + //topic, err := client.GetTopic(clusterId, "test7-terraform-confluent-provider") + //if err != nil { + // panic(err) + //} + //fmt.Println(topic.Partitions) + + + ////Create a new topic + //partitionConfig := []confluent.TopicConfig{ + // { + // Name: "compression.type", + // Value: "gzip", + // }, + // { + // Name: "cleanup.policy", + // Value: "compact", + // }, + // //{ + // // Name: "retention.ms", + // // Value: "20000", + // //}, + //} + //err = client.CreateTopic(clusterId, "test_topic_name", 3, 3, partitionConfig, nil) + //if err != nil { + // panic(err) + //} + //fmt.Println("Topic Created!") + + d := []confluent.TopicConfig{ + { + Name: "compression.type", + Value: "gzip", + }, + { + Name: "cleanup.policy", + Value: "compact", + }, + { + Name: "retention.ms", + Value: "300000", + }, + } + + err = client.UpdateTopicConfigs(clusterId, "test_topic_name", d) + if err != nil { + panic(err) + } + fmt.Println("Topic Updated!") + // + + + // // Create Principal but don't know use case for it + // testPrincipals := []confluent.UserPrincipalAction{ + // { + // Scope: confluent.Scope{ + // Clusters: confluent.AuthorClusters{ + // KafkaCluster: clusterId, + // }, + // }, + // ResourceName: "Testing-Principal", + // ResourceType: "Cluster", + // Operation: "Read", + // }, + // } + + // newPrincipal, err := client.CreatePrincipal("User:system-platform", testPrincipals) + // if err != nil { + // panic(err) + // } + // fmt.Printf("%s", newPrincipal) + + // + + // Get the topics information + //getTopic, err := client.GetTopic(clusterId, "test2_topic_name") + //if err != nil { + // panic(err) + //} + //fmt.Printf("%v", getTopic) + + //Delete the existing topic + //deleteTopic := client.DeleteTopic(clusterId, "test_topic_name") + //if deleteTopic != nil { + // panic(deleteTopic) + //} + //fmt.Println("Topic deleted") + + //// Add role assignment + //c := confluent.ClusterDetails{} + //c.Clusters.KafkaCluster = clusterId + //err = client.BindPrincipalToRole("User:manh.do", "Operator", c) + //if err != nil { + // panic(err) + //} + //fmt.Println("Role Binded!") + + // // Add role assignment for principal to topic + // r := confluent.UpdateRoleBinding{ + // Scope: c, + // } + // r.ResourcePatterns = []confluent.RoleBindings{ + // { + // ResourceType: "Topic", + // Name: "system-platform", + // PatternType: "PREFIXED", + // }, + // } + // principalCN := "User:CN=common-name.example.com" + // err = client.IncreaseRoleBinding(principalCN, "ResourceOwner", r) + // if err != nil { + // panic(err) + // } + // fmt.Println("Role Binded!") + //t := confluent.Topic{ + // Name: "test_topic_name", + // Partitions: 10, + // ReplicationFactor: 4, + //} + //err = client.UpdatePartitions(t) + //if err != nil { + // panic(err) + //} + //fmt.Println("Update partition successfully") + //err = client.UpdateReplicationsFactor(t) + //if err != nil { + // panic(err) + //} + //fmt.Println("Update RF successfully") + +} + +``` + +## Contributing + +- Clone this project + +``` +git clone ... +``` + +- Install any requirements + +``` +go mod download +``` + +- Add your code and logics +- Push to new branch and submit merge request after all test success + +### Testing + +`go test` \ No newline at end of file diff --git a/acls.go b/acls.go new file mode 100644 index 0000000..ebb5f5c --- /dev/null +++ b/acls.go @@ -0,0 +1,70 @@ +package confluent + +import ( + "bytes" + "encoding/json" +) + +const ( + aclsPath = "acls" +) + +type Acl struct { + ClusterId string `json:"cluster_id,omitempty"` + ResourceType string `json:"resource_type,omitempty"` + ResourceName string `json:"resource_name,omitempty"` + PatternType string `json:"pattern_type,omitempty"` + Principal string `json:"principal,omitempty"` + Host string `json:"host,omitempty"` + Operation string `json:"operation,omitempty"` + Permission string `json:"permission,omitempty"` +} + +func (c *Client) ListAcls(clusterId string) ([]Acl, error) { + u := "/clusters/" + clusterId + "/" + aclsPath + r, err := c.DoRequest("GET", u, nil) + if err != nil { + return nil, err + } + + body := struct{ + Data []Acl `json:"data"` + }{} + + err = json.Unmarshal(r, &body) + if err != nil { + return nil, err + } + + return body.Data, nil +} + +func (c *Client) CreateAcl(clusterId string, aclConfig *Acl) error { + u := "/clusters/" + clusterId + "/" + aclsPath + + payloadBuf := new(bytes.Buffer) + json.NewEncoder(payloadBuf).Encode(aclConfig) + _, err := c.DoRequest("POST", u, payloadBuf) + if err != nil { + return err + } + + return nil +} + +func (c *Client) DeleteAcl(clusterId, resourceName string) error { + u := "/clusters/" + clusterId + "/" + aclsPath + + aclDetele := Acl{ + ResourceName: resourceName, + } + + payloadBuf := new(bytes.Buffer) + json.NewEncoder(payloadBuf).Encode(aclDetele) + _, err := c.DoRequest("DELETE", u, payloadBuf) + if err != nil { + return err + } + + return nil +} diff --git a/acls_test.go b/acls_test.go new file mode 100644 index 0000000..bd5ca9f --- /dev/null +++ b/acls_test.go @@ -0,0 +1,121 @@ +package confluent + +import ( + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAcls_ListAclsSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/clusters/cluster-1/acls", uri) + return []byte(` + { + "kind": "KafkaAclList", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/acls?principal=alice" + }, + "data": [ + { + "kind": "KafkaAcl", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/acls?resource_type=TOPIC&resource_name=topic-&pattern_type=PREFIXED&principal=alice&host=*&operation=ALL&permission=ALLOW" + }, + "cluster_id": "cluster-1", + "resource_type": "TOPIC", + "resource_name": "topic-", + "pattern_type": "PREFIXED", + "principal": "alice", + "host": "*", + "operation": "ALL", + "permission": "ALLOW" + }, + { + "kind": "KafkaAcl", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/acls?resource_type=CLUSTER&resource_name=cluster-1&pattern_type=LITERAL&principal=bob&host=*&operation=DESCRIBE&permission=DENY" + }, + "cluster_id": "cluster-1", + "resource_type": "CLUSTER", + "resource_name": "cluster-2", + "pattern_type": "LITERAL", + "principal": "alice", + "host": "*", + "operation": "DESCRIBE", + "permission": "DENY" + } + ] + } +`), 200, "200 OK", nil + } + c := NewClient(&mock, &mk) + acls, err := c.ListAcls("cluster-1") + assert.NoError(t, err) + assert.Equal(t, 2, len(acls)) + assert.Equal(t, "alice", acls[0].Principal) +} + +func TestAcls_CreateAclsSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPost, method, "Expected method 'POST', got %s", method) + assert.Equal(t, "/clusters/cluster-1/acls", uri) + return []byte(``), 201, "201", nil + } + c := NewClient(&mock, &mk) + aclConfig := Acl{} + err := c.CreateAcl("cluster-1", &aclConfig) + assert.NoError(t, err) +} + +func TestAcls_DeleteAclSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodDelete, method, "Expected method 'Delete', got %s", method) + assert.Equal(t, "/clusters/cluster-1/acls", uri) + return []byte(` + { + "data": [ + { + "kind": "KafkaAcl", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/acls?resource_type=TOPIC&resource_name=topic-&pattern_type=PREFIXED&principal=alice&host=*&operation=ALL&permission=ALLOW" + }, + "cluster_id": "cluster-1", + "resource_type": "TOPIC", + "resource_name": "topic-", + "pattern_type": "PREFIXED", + "principal": "alice", + "host": "*", + "operation": "ALL", + "permission": "ALLOW" + }, + { + "kind": "KafkaAcl", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/acls?resource_type=CLUSTER&resource_name=cluster-1&pattern_type=LITERAL&principal=bob&host=*&operation=DESCRIBE&permission=DENY" + }, + "cluster_id": "cluster-1", + "resource_type": "CLUSTER", + "resource_name": "cluster-2", + "pattern_type": "LITERAL", + "principal": "alice", + "host": "*", + "operation": "DESCRIBE", + "permission": "DENY" + } + ] + } +`), 200, "200", nil + } + c := NewClient(&mock, &mk) + err := c.DeleteAcl("cluster-1", "") + assert.NoError(t, err) +} diff --git a/authorize.go b/authorize.go new file mode 100644 index 0000000..85ea475 --- /dev/null +++ b/authorize.go @@ -0,0 +1,51 @@ +package confluent + +import ( + "bytes" + "encoding/json" +) + +const ( + authorPath = "/security/1.0/authorize" +) + +type AuthorClusters struct { + KafkaCluster string `json:"kafka-cluster"` +} + +type Scope struct { + Clusters AuthorClusters `json:"clusters"` +} + +type UserPrincipalAction struct { + Scope Scope `json:"scope"` + ResourceName string `json:"resourceName"` + ResourceType string `json:"resourceType"` + Operation string `json:"operation"` +} + +type UserPrincipal struct { + // UserPrincipal example: User: + UserPrincipal string `json:"userPrincipal"` + + // Actions allow or deny for this principal + Actions []UserPrincipalAction `json:"actions"` +} + +func (c *Client) CreatePrincipal(userPrincipal string, principals []UserPrincipalAction) (*UserPrincipal, error) { + u := authorPath + + principal := &UserPrincipal{ + UserPrincipal: userPrincipal, + Actions: principals, + } + + payloadBuf := new(bytes.Buffer) + json.NewEncoder(payloadBuf).Encode(principal) + + _, err := c.DoRequest("PUT", u, payloadBuf) + if err != nil { + return nil, err + } + return principal, nil +} diff --git a/authorize_test.go b/authorize_test.go new file mode 100644 index 0000000..86e738f --- /dev/null +++ b/authorize_test.go @@ -0,0 +1,72 @@ +package confluent + +import ( + "errors" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +var ( + testPrincipals = []UserPrincipalAction{ + { + Scope: Scope{ + Clusters: AuthorClusters{ + KafkaCluster: clusterId, + }, + }, + ResourceName: "Testing-Principal", + ResourceType: "Cluster", + Operation: "ClusterAdmin", + }, + } +) + +func TestAuthorize_CreatePrincipalSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPut, method, "Expected method 'PUT', got %s", method) + assert.Equal(t, "/security/1.0/authorize", uri) + return []byte(` + [ + "ALLOWED", + "DENIED" + ] +`), 200, "200", nil + } + c := NewClient(&mock, &mk) + newPrincipal, err := c.CreatePrincipal("User:testing", testPrincipals) + assert.NoError(t, err) + assert.Equal(t, "Testing-Principal", newPrincipal.Actions[0].ResourceName) +} + +func TestAuthorize_CreatePrincipalFail(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPut, method, "Expected method 'PUT', got %s", method) + assert.Equal(t, "/security/1.0/authorize", uri) + return []byte(` + { + "status_code": 400, + "error_code": 0, + "type": "INVALID REQUEST DATA", + "message": "Bad request", + "errors": [ + { + "error_type": "string", + "message": "INVALID REQUEST DATA" + } + ] + } +`), 400, "400 Bad Request", nil + } + c := NewClient(&mock, &mk) + newPrincipal, err := c.CreatePrincipal("User:testing", testPrincipals) + assert.NotNil(t, err) + assert.Nil(t, newPrincipal) + assert.Equal(t, errors.New("error with status: 400 Bad Request INVALID REQUEST DATA"), err) +} diff --git a/clusters.go b/clusters.go new file mode 100644 index 0000000..c46ad76 --- /dev/null +++ b/clusters.go @@ -0,0 +1,72 @@ +package confluent + +import ( + "encoding/json" +) + +const ( + clusterUri = "/kafka/v3/clusters" +) + +//Clusters active in Confluent system +// Support: +// - kafka cluster +// - Kafka connect cluster +// - KSql cluster +// - Schema Registry cluster +type Clusters struct { + // Kafka cluster ID + KafkaCluster string `json:"kafka-cluster,omitempty"` + + // Kafka Connect Cluster ID + ConnectCluster string `json:"connect-cluster,omitempty"` + + // kSQL cluster ID + KSqlCluster string `json:"ksql-cluster,omitempty"` + + // Schema Registry Cluster ID + SchemaRegistryCluster string `json:"schema-registry-cluster,omitempty"` +} + +type KafkaCluster struct { + ClusterID string `json:"cluster_id"` +} + +type Related struct { + Related string `json:"related"` +} + +func (c *Client) ListKafkaCluster() ([]KafkaCluster, error) { + resp, err := c.DoRequest("GET", clusterUri, nil) + if err != nil { + return nil, err + } + + body := struct{ + Data []KafkaCluster `json:"data"` + }{} + + err = json.Unmarshal(resp, &body) + if err != nil { + return nil, err + } + + return body.Data, nil +} + +func (c *Client) GetKafkaCluster(clusterId string) (*KafkaCluster, error) { + pathUri := clusterUri + "/" + clusterId + resp, err := c.DoRequest("GET", pathUri, nil) + if err != nil { + return nil, err + } + + var cluster *KafkaCluster + + err = json.Unmarshal(resp, &cluster) + if err != nil { + return nil, err + } + + return cluster, nil +} diff --git a/clusters_test.go b/clusters_test.go new file mode 100644 index 0000000..05ddbe1 --- /dev/null +++ b/clusters_test.go @@ -0,0 +1,129 @@ +package confluent + +import ( + "errors" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestClusters_ListKafkaCluster(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/kafka/v3/clusters", uri) + return []byte(` +{ + "kind": "KafkaClusterList", + "metadata": { + "self": "http://localhost:9391/v3/clusters", + "next": null + }, + "data": [ + { + "kind": "KafkaCluster", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1", + "resource_name": "crn:///kafka=cluster-1" + }, + "cluster_id": "cluster-1", + "controller": { + "related": "http://localhost:9391/v3/clusters/cluster-1/brokers/1" + }, + "acls": { + "related": "http://localhost:9391/v3/clusters/cluster-1/acls" + }, + "brokers": { + "related": "http://localhost:9391/v3/clusters/cluster-1/brokers" + }, + "broker_configs": { + "related": "http://localhost:9391/v3/clusters/cluster-1/broker-configs" + }, + "consumer_groups": { + "related": "http://localhost:9391/v3/clusters/cluster-1/consumer-groups" + }, + "topics": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics" + }, + "partition_reassignments": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/-/partitions/-/reassignment" + } + } + ] + } +`), 200, "200 OK", nil + } + c := NewClient(&mock, &mk) + clusters, err := c.ListKafkaCluster() + if assert.NoError(t, err) { + assert.Equal(t, 1, len(clusters)) + assert.Equal(t, "cluster-1", clusters[0].ClusterID) + } +} + +func TestClusters_GetNonExistingKafkaCluster(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1", uri) + return []byte(` + { + "error_code": 404, + "message": "HTTP 404 Not Found" + } +`), 404, "404 Not Found", nil + } + c := NewClient(&mock, &mk) + cluster, err := c.GetKafkaCluster("cluster-1") + assert.Equal(t, errors.New("error with status: 404 Not Found HTTP 404 Not Found"), err) + assert.Nil(t, cluster) +} + +func TestClusters_GetExistingKafkaCluster(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1", uri) + return []byte(` + { + "kind": "KafkaCluster", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1", + "resource_name": "crn:///kafka=cluster-1" + }, + "cluster_id": "cluster-1", + "controller": { + "related": "http://localhost:9391/v3/clusters/cluster-1/brokers/1" + }, + "acls": { + "related": "http://localhost:9391/v3/clusters/cluster-1/acls" + }, + "brokers": { + "related": "http://localhost:9391/v3/clusters/cluster-1/brokers" + }, + "broker_configs": { + "related": "http://localhost:9391/v3/clusters/cluster-1/broker-configs" + }, + "consumer_groups": { + "related": "http://localhost:9391/v3/clusters/cluster-1/consumer-groups" + }, + "topics": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics" + }, + "partition_reassignments": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/-/partitions/-/reassignment" + } + } +`), 200, "200 OK", nil + } + c := NewClient(&mock, &mk) + cluster, err := c.GetKafkaCluster("cluster-1") + if assert.NoError(t, err) { + assert.Equal(t, "cluster-1", cluster.ClusterID) + } +} diff --git a/configs.go b/configs.go new file mode 100644 index 0000000..47ac6b9 --- /dev/null +++ b/configs.go @@ -0,0 +1,62 @@ +package confluent + +import ( + "bytes" + "encoding/json" +) + +type TopicConfig struct { + ClusterId string `json:"cluster_id,omitempty"` + TopicName string `json:"topic_name,omitempty"` + Name string `json:"name"` + Value string `json:"value"` + IsDefault bool `json:"is_default,omitempty"` + IsReadOnly bool `json:"is_read_only,omitempty"` + IsSensitive bool `json:"is_sensitive,omitempty"` + Source string `json:"source,omitempty"` + Synonyms []Synonyms `json:"synonyms,omitempty"` +} + +type Synonyms struct { + Name string `json:"name"` + Value string `json:"value,omitempty"` + Source string `json:"source,omitempty"` + Operation string `json:"operation,omitempty"` +} + +func (c *Client) GetTopicConfigs(clusterId string, topicName string) ([]TopicConfig, error) { + u := "/kafka/v3/clusters/" + clusterId + "/topics/" + topicName + "/configs" + + r, err := c.DoRequest("GET", u, nil) + if err != nil { + return nil, err + } + + res := struct{ + Data []TopicConfig `json:"data"` + }{} + err = json.Unmarshal(r, &res) + if err != nil { + return nil, err + } + return res.Data, nil +} + +func (c *Client) UpdateTopicConfigs(clusterId string, topicName string, data []TopicConfig) error { + u := "/kafka/v3/clusters/" + clusterId + "/topics/" + topicName + "/configs:alter" + + reqBody := struct{ + Data []TopicConfig `json:"data"` + }{} + reqBody.Data = data + + payloadBuf := new(bytes.Buffer) + json.NewEncoder(payloadBuf).Encode(reqBody) + + _, err := c.DoRequest("POST", u, payloadBuf) + if err != nil { + return err + } + return nil +} + diff --git a/configs_test.go b/configs_test.go new file mode 100644 index 0000000..0a4c1b3 --- /dev/null +++ b/configs_test.go @@ -0,0 +1,170 @@ +package confluent + +import ( + "github.com/stretchr/testify/assert" + "io" + "net/http" + "testing" +) + +func TestConfigs_GetTopicConfigs(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics/topic-1/configs", uri) + return []byte(` + { + "kind": "KafkaTopicConfigList", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/configs", + "next": null + }, + "data": [ + { + "kind": "KafkaTopicConfig", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/configs/cleanup.policy", + "resource_name": "crn:///kafka=cluster-1/topic=topic-1/config=cleanup.policy" + }, + "cluster_id": "cluster-1", + "topic_name": "topic-1", + "name": "cleanup.policy", + "value": "compact", + "is_default": false, + "is_read_only": false, + "is_sensitive": false, + "source": "DYNAMIC_TOPIC_CONFIG", + "synonyms": [ + { + "name": "cleanup.policy", + "value": "compact", + "source": "DYNAMIC_TOPIC_CONFIG" + }, + { + "name": "cleanup.policy", + "value": "delete", + "source": "DEFAULT_CONFIG" + } + ] + }, + { + "kind": "KafkaTopicConfig", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/configs/compression.type", + "resource_name": "crn:///kafka=cluster-1/topic=topic-1/config=compression.type" + }, + "cluster_id": "cluster-1", + "topic_name": "topic-1", + "name": "compression.type", + "value": "gzip", + "is_default": false, + "is_read_only": false, + "is_sensitive": false, + "source": "DYNAMIC_TOPIC_CONFIG", + "synonyms": [ + { + "name": "compression.type", + "value": "gzip", + "source": "DYNAMIC_TOPIC_CONFIG" + }, + { + "name": "compression.type", + "value": "producer", + "source": "DEFAULT_CONFIG" + } + ] + } + ] + } +`), 200, "200 OK", nil + } + c := NewClient(&mock, &mk) + configs, err := c.GetTopicConfigs("cluster-1", "topic-1") + if assert.NoError(t, err) { + assert.Equal(t, 2, len(configs)) + assert.Equal(t, "cluster-1", configs[0].ClusterId) + } +} + +func TestConfigs_GetTopicConfigsFailWithResourceNotFound(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics/topic-1/configs", uri) + return nil, 404, "404 Not Found", nil + } + c := NewClient(&mock, &mk) + _, err := c.GetTopicConfigs("cluster-1", "topic-1") + if assert.NotNil(t, err) { + assert.Contains(t, err.Error(), "404 Not Found") + } +} + +func TestConfigs_GetTopicConfigsFailWithWrongData(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics/topic-1/configs", uri) + return []byte("<"), 200, "200 OK", nil + } + c := NewClient(&mock, &mk) + _, err := c.GetTopicConfigs("cluster-1", "topic-1") + if assert.NotNil(t, err) { + assert.Contains(t, err.Error(), "invalid character '<'") + } +} + +func TestConfigs_UpdateTopicConfigs(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPost, method, "Expected method 'POST', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics/topic-1/configs:alter", uri) + return nil, 204, "204 Accepted", nil + } + + c := NewClient(&mock, &mk) + mockSynonyms := []TopicConfig{ + { + Name: "cleanup.policy", + Value: "DELETE", + }, + { + Name: "compression.type", + Value: "gzip", + }, + } + + err := c.UpdateTopicConfigs("cluster-1", "topic-1", mockSynonyms) + assert.Nil(t, err) +} + +func TestConfigs_UpdateTopicConfigsFailWithTopicNotFound(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPost, method, "Expected method 'POST', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics/topic-1/configs:alter", uri) + return nil, 404, "404 Not Found", nil + } + + c := NewClient(&mock, &mk) + mockSynonyms := []TopicConfig{ + { + Name: "cleanup.policy", + Value: "DELETE", + }, + { + Name: "compression.type", + Value: "gzip", + }, + } + + err := c.UpdateTopicConfigs("cluster-1", "topic-1", mockSynonyms) + if assert.NotNil(t, err) { + assert.Contains(t, err.Error(), "404 Not Found") + } +} \ No newline at end of file diff --git a/confluent.go b/confluent.go new file mode 100644 index 0000000..d1d5419 --- /dev/null +++ b/confluent.go @@ -0,0 +1,32 @@ +package confluent + +const ( + clientVersion = "0.1" + userAgent = "confluent-client-go-sdk-" + clientVersion +) + +type Client struct { + httpClient HttpClient + saramaClient KClient +} + +type ErrorResponse struct { + StatusCode int `json:"status_code,omitempty"` + ErrorCode int `json:"error_code"` + Type string `json:"type,omitempty"` + Message string `json:"message"` + Errors []struct { + ErrorType string `json:"error_type,omitempty"` + Message string `json:"message,omitempty"` + } `json:"errors,omitempty"` +} + +type Metadata struct { + Self string `json:"self"` + ResourceName string `json:"resource_name,omitempty"` + Next string `json:"next,omitempty"` +} + +func NewClient(httpClient HttpClient, kClient KClient) *Client { + return &Client{httpClient: httpClient, saramaClient: kClient} +} diff --git a/confluent_test.go b/confluent_test.go new file mode 100644 index 0000000..d1a2dcb --- /dev/null +++ b/confluent_test.go @@ -0,0 +1,27 @@ +package confluent + +import ( + "io" +) + +var ( + partitionConfig = []TopicConfig{ + { + Name: "compression.type", + Value: "gzip", + }, + { + Name: "cleanup.policy", + Value: "compact", + }, + } + clusterId = "cluster-1" +) + +type MockHttpClient struct { + DoRequestFn func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) +} +func (mock *MockHttpClient) DoRequest(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + return mock.DoRequestFn(method, uri, reqBody) +} + diff --git a/example/README.md b/example/README.md new file mode 100644 index 0000000..3b3c64e --- /dev/null +++ b/example/README.md @@ -0,0 +1,6 @@ +# Run testcase + +```bash +go mod tidy +go run main.go +``` diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..01ed64e --- /dev/null +++ b/example/main.go @@ -0,0 +1,199 @@ +package main + +import ( + "fmt" + "os" + + confluent "github.com/wayarmy/gonfluent" +) + +const ( + ClientVersion = "0.1" + UserAgent = "confluent-client-go-sdk-" + ClientVersion +) + +func main() { + baseUrl := "https://localhost:8090" + + username := os.Getenv("CONFLUENT_USER") + password := os.Getenv("CONFLUENT_PASSWORD") + + // Initialize the client + httpClient := confluent.NewDefaultHttpClient(baseUrl, username, password) + httpClient.UserAgent = UserAgent + bootstrapServer := []string{ + "localhost:9093", + } + kConfig := &confluent.Config{ + BootstrapServers: &bootstrapServer, + CACert: "certs/ca.pem", + ClientCert: "certs/cert.pem", + ClientCertKey: "certs/key.pem", + SkipTLSVerify: true, + SASLMechanism: "plain", + TLSEnabled: true, + Timeout: 120, + } + kClient, err := confluent.NewSaramaClient(kConfig) + if err != nil { + panic(err) + } + + client := confluent.NewClient(httpClient, kClient) + bearerToken, err := client.Login() + if err != nil { + panic(err) + } + httpClient.Token = bearerToken + + // Get the list of clusters in Confluent platform + listCluster, err := client.ListKafkaCluster() + if err != nil { + panic(err) + } + + fmt.Println(listCluster) + + // Get the cluster information + clusterId := listCluster[0].ClusterID + cluster, err := client.GetKafkaCluster(clusterId) + if err != nil { + panic(err) + } + fmt.Printf("%#v", cluster) + fmt.Println(cluster) + // + //topicConfig, err := client.GetTopicConfigs(clusterId, "example_topic_name") + //if err != nil { + // panic(err) + //} + //fmt.Printf("%#v", topicConfig) + + //topic, err := client.GetTopic(clusterId, "test7-terraform-confluent-provider") + //if err != nil { + // panic(err) + //} + //fmt.Println(topic.Partitions) + + ////Create a new topic + //partitionConfig := []confluent.TopicConfig{ + // { + // Name: "compression.type", + // Value: "gzip", + // }, + // { + // Name: "cleanup.policy", + // Value: "compact", + // }, + // //{ + // // Name: "retention.ms", + // // Value: "20000", + // //}, + //} + //err = client.CreateTopic(clusterId, "test_topic_name", 3, 3, partitionConfig, nil) + //if err != nil { + // panic(err) + //} + //fmt.Println("Topic Created!") + + d := []confluent.TopicConfig{ + { + Name: "compression.type", + Value: "gzip", + }, + { + Name: "cleanup.policy", + Value: "compact", + }, + { + Name: "retention.ms", + Value: "300000", + }, + } + + err = client.UpdateTopicConfigs(clusterId, "test_topic_name", d) + if err != nil { + panic(err) + } + fmt.Println("Topic Updated!") + // + + // // Create Principal but don't know use case for it + // testPrincipals := []confluent.UserPrincipalAction{ + // { + // Scope: confluent.Scope{ + // Clusters: confluent.AuthorClusters{ + // KafkaCluster: clusterId, + // }, + // }, + // ResourceName: "Testing-Principal", + // ResourceType: "Cluster", + // Operation: "Read", + // }, + // } + + // newPrincipal, err := client.CreatePrincipal("User:system-platform", testPrincipals) + // if err != nil { + // panic(err) + // } + // fmt.Printf("%s", newPrincipal) + + // + + // Get the topics information + //getTopic, err := client.GetTopic(clusterId, "test2_topic_name") + //if err != nil { + // panic(err) + //} + //fmt.Printf("%v", getTopic) + + //Delete the existing topic + //deleteTopic := client.DeleteTopic(clusterId, "test_topic_name") + //if deleteTopic != nil { + // panic(deleteTopic) + //} + //fmt.Println("Topic deleted") + + //// Add role assignment + //c := confluent.ClusterDetails{} + //c.Clusters.KafkaCluster = clusterId + //err = client.BindPrincipalToRole("User:manh.do", "Operator", c) + //if err != nil { + // panic(err) + //} + //fmt.Println("Role Binded!") + + // // Add role assignment for principal to topic + // r := confluent.UpdateRoleBinding{ + // Scope: c, + // } + // r.ResourcePatterns = []confluent.RoleBindings{ + // { + // ResourceType: "Topic", + // Name: "system-platform", + // PatternType: "PREFIXED", + // }, + // } + // principalCN := "User:CN=common-name.example.com" + // err = client.IncreaseRoleBinding(principalCN, "ResourceOwner", r) + // if err != nil { + // panic(err) + // } + // fmt.Println("Role Binded!") + //t := confluent.Topic{ + // Name: "test_topic_name", + // Partitions: 10, + // ReplicationFactor: 4, + //} + //err = client.UpdatePartitions(t) + //if err != nil { + // panic(err) + //} + //fmt.Println("Update partition successfully") + //err = client.UpdateReplicationsFactor(t) + //if err != nil { + // panic(err) + //} + //fmt.Println("Update RF successfully") + +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d3e2e1f --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module github.com/wayarmy/gonfluent + +go 1.16 + +require ( + github.com/Shopify/sarama v1.29.1 + github.com/stretchr/testify v1.7.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..63cca80 --- /dev/null +++ b/go.sum @@ -0,0 +1,79 @@ +github.com/Shopify/sarama v1.29.1 h1:wBAacXbYVLmWieEA/0X/JagDdCZ8NVFOfS6l6+2u5S0= +github.com/Shopify/sarama v1.29.1/go.mod h1:mdtqvCSg8JOxk8PmpTNGyo6wzd4BMm4QXSfDnTXmgkE= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q= +github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= +github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= +github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8= +github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= +github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/xdg/scram v1.0.3/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/helper.go b/helper.go new file mode 100644 index 0000000..12f8b97 --- /dev/null +++ b/helper.go @@ -0,0 +1,145 @@ +package confluent + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "io/ioutil" + "log" + "time" + + "github.com/Shopify/sarama" +) + +func (co *Config) saslEnabled() bool { + return co.SASLUsername != "" || co.SASLPassword != "" +} + +func (co *Config) newKafkaConfig() (*sarama.Config, error) { + kafkaConfig := sarama.NewConfig() + kafkaConfig.Version = sarama.V2_4_0_0 + kafkaConfig.ClientID = "confluent-go-client" + kafkaConfig.Admin.Timeout = time.Duration(co.Timeout) * time.Second + kafkaConfig.Metadata.Full = true // the default, but just being clear + + if co.saslEnabled() { + switch co.SASLMechanism { + case "scram-sha512": + case "scram-sha256": + case "plain": + default: + return nil, errors.New("[ERROR] Invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\" or \"plain\"" + co.SASLMechanism) + } + kafkaConfig.Net.SASL.Enable = true + kafkaConfig.Net.SASL.Password = co.SASLPassword + kafkaConfig.Net.SASL.User = co.SASLUsername + kafkaConfig.Net.SASL.Handshake = true + } else { + log.Printf("[WARN] SASL disabled username: '%s', password '%s'", co.SASLUsername, "****") + } + + if co.TLSEnabled { + tlsConfig, err := newTLSConfig( + co.ClientCert, + co.ClientCertKey, + co.CACert, + co.ClientCertKeyPassphrase, + ) + + if err != nil { + return kafkaConfig, err + } + kafkaConfig.Net.TLS.Enable = true + kafkaConfig.Net.TLS.Config = tlsConfig + kafkaConfig.Net.TLS.Config.InsecureSkipVerify = co.SkipTLSVerify + } + + return kafkaConfig, nil +} + +//func NewTLSConfig(clientCert, clientKey, caCert, clientKeyPassphrase string) (*tls.Config, error) { +// return newTLSConfig(clientCert, clientKey, caCert, clientKeyPassphrase) +//} + +func parsePemOrLoadFromFile(input string) (*pem.Block, []byte, error) { + // attempt to parse + var inputBytes = []byte(input) + inputBlock, _ := pem.Decode(inputBytes) + + if inputBlock == nil { + //attempt to load from file + log.Printf("[INFO] Attempting to load from file '%s'", input) + var err error + inputBytes, err = ioutil.ReadFile(input) + if err != nil { + return nil, nil, err + } + inputBlock, _ = pem.Decode(inputBytes) + if inputBlock == nil { + return nil, nil, fmt.Errorf("[ERROR] Error unable to decode pem") + } + } + return inputBlock, inputBytes, nil +} + +func newTLSConfig(clientCert, clientKey, caCert, clientKeyPassphrase string) (*tls.Config, error) { + tlsConfig := tls.Config{} + + if clientCert != "" && clientKey != "" { + keyBlock, keyBytes, err := parsePemOrLoadFromFile(clientKey) + if err != nil { + return &tlsConfig, err + } + + keyBytes = pem.EncodeToMemory(&pem.Block{ + Type: keyBlock.Type, + Bytes: keyBytes, + }) + + cert, err := tls.LoadX509KeyPair(clientCert, clientKey) + if err != nil { + return &tlsConfig, err + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + if caCert == "" { + return &tlsConfig, nil + } + + caCertPool, _ := x509.SystemCertPool() + if caCertPool == nil { + caCertPool = x509.NewCertPool() + } + + _, caBytes, err := parsePemOrLoadFromFile(caCert) + if err != nil { + return &tlsConfig, err + } + ok := caCertPool.AppendCertsFromPEM(caBytes) + if !ok { + return &tlsConfig, errors.New("couldn't add the caPem") + } + + tlsConfig.RootCAs = caCertPool + return &tlsConfig, nil +} + +func (co *Config) copyWithMaskedSensitiveValues() Config { + c := Config{ + co.BootstrapServers, + co.Timeout, + co.CACert, + co.ClientCert, + "*****", + "*****", + co.TLSEnabled, + co.SkipTLSVerify, + co.SASLUsername, + "*****", + co.SASLMechanism, + } + return c +} diff --git a/http_client.go b/http_client.go new file mode 100644 index 0000000..d565d22 --- /dev/null +++ b/http_client.go @@ -0,0 +1,64 @@ +package confluent + +import ( + "crypto/tls" + "encoding/base64" + "io" + "io/ioutil" + "net/http" +) + +type HttpClient interface { + DoRequest(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) +} +type DefaultHttpClient struct { + // BaseURL : https://localhost:8090 + // API endpoint of Confluent platform + BaseUrl string + + // Define the user-agent would be sent to confluent api + // Default: confluent-client-go-sdk + Username string + + Password string + Token string + UserAgent string +} +func NewDefaultHttpClient(baseUrl string, username string, password string) *DefaultHttpClient { + return &DefaultHttpClient{ + BaseUrl: baseUrl, + Username: username, + Password: password, + UserAgent: userAgent, + } +} +func (c *DefaultHttpClient) DoRequest(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + client := http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } + req, err := http.NewRequest(method, c.BaseUrl+uri, reqBody) + if err != nil { + return nil, 0, "", err + } + if c.Token != "" { + req.Header.Set("Authorization", "Bearer "+c.Token) + } else { + auth := c.Username + ":" + c.Password + token := base64.StdEncoding.EncodeToString([]byte(auth)) + req.Header.Set("Authorization", "Basic " + token) + } + req.Header.Set("User-Agent", c.UserAgent) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + + res, respErr := client.Do(req) + + if respErr != nil { + return nil, 0, "", respErr + } + + respBody, bodyErr := ioutil.ReadAll(res.Body) + return respBody, res.StatusCode, res.Status, bodyErr +} diff --git a/kafka.go b/kafka.go new file mode 100644 index 0000000..5f1d8e3 --- /dev/null +++ b/kafka.go @@ -0,0 +1,140 @@ +package confluent + +import ( + "errors" + "github.com/Shopify/sarama" + "log" +) + +type Config struct { + BootstrapServers *[]string + Timeout int + CACert string + ClientCert string + ClientCertKey string + ClientCertKeyPassphrase string + TLSEnabled bool + SkipTLSVerify bool + SASLUsername string + SASLPassword string + SASLMechanism string +} + +type void struct{} + +var member void + +type SaramaClient struct { + client sarama.Client + kafkaConfig *sarama.Config + config *Config + supportedAPIs map[int]int + topics map[string]void +} + +func (k *SaramaClient) extractTopics() error { + topics, err := k.client.Topics() + if err != nil { + return err + } + k.topics = make(map[string]void) + for _, t := range topics { + k.topics[t] = member + } + return nil +} + +func (k *SaramaClient) populateAPIVersions() error { + ch := make(chan []*sarama.ApiVersionsResponseBlock) + errCh := make(chan error) + + brokers := k.client.Brokers() + kafkaConfig := k.kafkaConfig + for _, broker := range brokers { + go apiVersionsFromBroker(broker, kafkaConfig, ch, errCh) + } + + clusterApiVersions := make(map[int][2]int) // valid api version intervals across all brokers + errs := make([]error, 0) + for i := 0; i < len(brokers); i++ { + select { + case brokerApiVersions := <-ch: + updateClusterApiVersions(&clusterApiVersions, brokerApiVersions) + case err := <-errCh: + errs = append(errs, err) + } + } + + if len(errs) != 0 { + return errors.New(sarama.MultiError{Errors: &errs}.PrettyError()) + } + + k.supportedAPIs = make(map[int]int, len(clusterApiVersions)) + for apiKey, versionMinMax := range clusterApiVersions { + versionMin := versionMinMax[0] + versionMax := versionMinMax[1] + + if versionMax >= versionMin { + k.supportedAPIs[apiKey] = versionMax + } + } + + return nil +} + +func updateClusterApiVersions(clusterApiVersions *map[int][2]int, brokerApiVersions []*sarama.ApiVersionsResponseBlock) { + cluster := *clusterApiVersions + + for _, apiBlock := range brokerApiVersions { + apiKey := int(apiBlock.ApiKey) + brokerMin := int(apiBlock.MinVersion) + brokerMax := int(apiBlock.MaxVersion) + + clusterMinMax, exists := cluster[apiKey] + if !exists { + cluster[apiKey] = [2]int{brokerMin, brokerMax} + } else { + // shrink the cluster interval according to + // the broker interval + + clusterMin := clusterMinMax[0] + clusterMax := clusterMinMax[1] + + if brokerMin > clusterMin { + clusterMinMax[0] = brokerMin + } + + if brokerMax < clusterMax { + clusterMinMax[1] = brokerMax + } + + cluster[apiKey] = clusterMinMax + } + } +} + +func apiVersionsFromBroker(broker *sarama.Broker, config *sarama.Config, ch chan<- []*sarama.ApiVersionsResponseBlock, errCh chan<- error) { + resp, err := rawApiVersionsRequest(broker, config) + + if err != nil { + errCh <- err + } else if resp.Err != sarama.ErrNoError { + errCh <- errors.New(resp.Err.Error()) + } else { + ch <- resp.ApiVersions + } +} + +func rawApiVersionsRequest(broker *sarama.Broker, config *sarama.Config) (*sarama.ApiVersionsResponse, error) { + if err := broker.Open(config); err != nil && err != sarama.ErrAlreadyConnected { + return nil, err + } + + defer func() { + if err := broker.Close(); err != nil && err != sarama.ErrNotConnected { + log.Fatal(err) + } + }() + + return broker.ApiVersions(&sarama.ApiVersionsRequest{}) +} diff --git a/kafka_client.go b/kafka_client.go new file mode 100644 index 0000000..7657e46 --- /dev/null +++ b/kafka_client.go @@ -0,0 +1,121 @@ +package confluent + +import ( + "errors" + "github.com/Shopify/sarama" +) + +type KafkaClusterAdminFn interface { + ListPartitionReassignments(topic string, partitions []int32) (map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, error) + AlterPartitionReassignments(topic string, assignment [][]int32) error +} + +type KafkaAdmin struct { + Fn KafkaClusterAdminFn +} + +type KClient interface { + Controller() (*sarama.Broker, error) + Config() *sarama.Config + RefreshMetadata() error + Partitions(topic string) ([]int32, error) + Brokers() []*sarama.Broker + Replicas(topic string, partitionId int32) ([]int32, error) + Client() sarama.Client + NewClusterAdminFromClient() (*KafkaAdmin, error) + ID(broker *sarama.Broker) int32 +} + +type ClusterAdmin struct { + adminClient sarama.ClusterAdmin +} + +func (ca *ClusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, error) { + return ca.adminClient.ListPartitionReassignments(topic, partitions) +} + +func (ca *ClusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error { + return ca.adminClient.AlterPartitionReassignments(topic, assignment) +} + +func (k *SaramaClient) Client() sarama.Client { + return k.client +} + +func (k *SaramaClient) Replicas(topic string, partitionId int32) ([]int32, error) { + return k.client.Replicas(topic, partitionId) +} + +func (k *SaramaClient) Controller() (*sarama.Broker, error) { + return k.client.Controller() +} + +func (k *SaramaClient) Config() *sarama.Config { + return k.client.Config() +} + +func (k *SaramaClient) RefreshMetadata() error { + return k.client.RefreshMetadata() +} + +func (k *SaramaClient) Partitions(topic string) ([]int32, error) { + return k.client.Partitions(topic) +} + +func (k *SaramaClient) Brokers() []*sarama.Broker { + return k.client.Brokers() +} + +func (k *SaramaClient) ID(broker *sarama.Broker) int32 { + return broker.ID() +} + +func (k *SaramaClient) NewClusterAdminFromClient() (*KafkaAdmin, error) { + a, err := sarama.NewClusterAdminFromClient(k.client) + if err != nil { + return nil, err + } + admin := ClusterAdmin{ + adminClient: a, + } + return &KafkaAdmin{ + Fn: &admin, + }, nil +} + +// Init kafka client point to sarama client +func NewSaramaClient(config *Config) (*SaramaClient, error) { + if config == nil { + return nil, errors.New("Cannot create client without kafka config") + } + + if config.BootstrapServers == nil { + return nil, errors.New("No bootstrap_servers provided") + } + + kc, err := config.newKafkaConfig() + if err != nil { + return nil, err + } + + bootstrapServers := *(config.BootstrapServers) + c, err := sarama.NewClient(bootstrapServers, kc) + if err != nil { + return nil, err + } + + kafkaClient := &SaramaClient{ + client: c, + config: config, + kafkaConfig: kc, + } + + err = kafkaClient.populateAPIVersions() + if err != nil { + return kafkaClient, err + } + + err = kafkaClient.extractTopics() + + return kafkaClient, err +} diff --git a/kafka_client_test.go b/kafka_client_test.go new file mode 100644 index 0000000..8edb84b --- /dev/null +++ b/kafka_client_test.go @@ -0,0 +1,99 @@ +package confluent + +import ( + "github.com/Shopify/sarama" + "testing" +) + +var t *testing.T + +type MockKafkaAdmin struct{ + TopicNameExpected string + PartitionExpected int32 + AssignmentExpected [][]int32 +} + +func (mca *MockKafkaAdmin) ListPartitionReassignments(topic string, partitions []int32) (map[string]map[int32]*sarama.PartitionReplicaReassignmentsStatus, error) { + if topic != mca.TopicNameExpected { + t.Fatal("not expected: ", sarama.ErrInvalidTopic) + } + return nil, nil +} + +func (mca *MockKafkaAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error { + return nil +} + +type MockKafkaClient struct { + MockBrokers *sarama.MockBroker + MockVersion sarama.KafkaVersion + client sarama.Client + TopicNameExpected string + PartitionExpected int32 + AssignmentExpected [][]int32 +} + +func (mk *MockKafkaClient) InitKafkaClient() sarama.Client { + seedBroker := mk.MockBrokers + client, err := sarama.NewClient([]string{seedBroker.Addr()}, mk.newTestConfig()) + if err != nil { + t.Fatal(err) + } + return client +} + +func (mk *MockKafkaClient) newTestConfig() *sarama.Config { + config := sarama.NewConfig() + config.Version = mk.MockVersion + return config +} + +func (mk *MockKafkaClient) Client() sarama.Client { + return mk.client +} + +func (mk *MockKafkaClient) Replicas(topic string, partitionId int32) ([]int32, error) { + if partitionId == 1 { + return []int32{1}, nil + } + if partitionId == 2 { + return []int32{2}, nil + } + return nil, nil +} + +func (mk *MockKafkaClient) NewClusterAdminFromClient() (*KafkaAdmin, error) { + return &KafkaAdmin{ + Fn: &MockKafkaAdmin{ + TopicNameExpected: mk.TopicNameExpected, + PartitionExpected: mk.PartitionExpected, + AssignmentExpected: mk.AssignmentExpected, + }, + }, nil +} + +func (mk *MockKafkaClient) Controller() (*sarama.Broker, error) { + c := mk.InitKafkaClient() + return c.Controller() +} + +func (mk *MockKafkaClient) Config() *sarama.Config { + c := mk.InitKafkaClient() + return c.Config() +} + +func (mk *MockKafkaClient) RefreshMetadata() error { + return nil +} + +func (mk *MockKafkaClient) Partitions(topic string) ([]int32, error) { + return []int32{1,2}, nil +} + +func (mk *MockKafkaClient) Brokers() []*sarama.Broker { + return nil +} + +func (mk *MockKafkaClient) ID(broker *sarama.Broker) int32 { + return 3 +} diff --git a/login.go b/login.go new file mode 100644 index 0000000..e85b293 --- /dev/null +++ b/login.go @@ -0,0 +1,36 @@ +package confluent + +import ( + "encoding/json" +) + +type Authenticate struct { + AuthToken string `json:"auth_token"` + TokenType string `json:"token_type"` + ExpiresIn int `json:"expires_in"` +} + +type AuthenticateError struct { + StatusCode int `json:"status_code"` + ErrrorCode int `json:"errror_code"` + Type string `json:"type"` + Message string `json:"message"` + Errors []struct { + ErrorType string `json:"error_type"` + Message string `json:"message"` + } `json:"errors"` +} + +func (c *Client) Login() (string, error) { + u := "/security/1.0/authenticate" + authenReq, err := c.DoRequest("GET", u, nil) + if err != nil { + return "", err + } + var authenticate *Authenticate + err = json.Unmarshal(authenReq, &authenticate) + if err != nil { + return "", err + } + return authenticate.AuthToken, nil +} diff --git a/login_test.go b/login_test.go new file mode 100644 index 0000000..dc77282 --- /dev/null +++ b/login_test.go @@ -0,0 +1,57 @@ +package confluent + +import ( + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLogin_Success(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/security/1.0/authenticate", uri) + return []byte(` + { + "auth_token": "abcdefghizk", + "auth_type": "Basic Auth", + "expires_in": 3600 + } +`), 204, "204", nil + } + c := NewClient(&mock, &mk) + token, err := c.Login() + assert.Nil(t, err) + assert.Equal(t, "abcdefghizk", token) +} + +func TestLogin_Fail(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/security/1.0/authenticate", uri) + return nil, 403, "403", nil + } + c := NewClient(&mock, &mk) + _, err := c.Login() + assert.NotNil(t, err) +} + +func TestLogin_FailWithWrongResponse(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/security/1.0/authenticate", uri) + return []byte(` + < +`), 200, "200", nil + } + c := NewClient(&mock, &mk) + _, err := c.Login() + assert.NotNil(t, err) +} diff --git a/partitions.go b/partitions.go new file mode 100644 index 0000000..2d8c492 --- /dev/null +++ b/partitions.go @@ -0,0 +1,27 @@ +package confluent + +import "encoding/json" + +type Partition struct { + ClusterID string `json:"cluster_id"` + TopicName string `json:"topic_name"` + PartitionId int `json:"partition_id"` +} + +func (c *Client) GetTopicPartitions(clusterId, topicName string) ([]Partition, error) { + u := "/kafka/v3/clusters/"+clusterId+"/topics/"+topicName+"/partitions" + r, err := c.DoRequest("GET", u, nil) + if err != nil { + return nil, err + } + + body := struct{ + Data []Partition `json:"data"` + }{} + + err = json.Unmarshal(r, &body) + if err != nil { + return nil, err + } + return body.Data, nil +} diff --git a/partitions_test.go b/partitions_test.go new file mode 100644 index 0000000..1779445 --- /dev/null +++ b/partitions_test.go @@ -0,0 +1,124 @@ +package confluent + +import ( + "errors" + "github.com/stretchr/testify/assert" + "io" + "net/http" + "testing" +) + +func TestRBac_GetTopicPartitionsSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics/topic-1/partitions", uri) + return []byte(` + { + "kind": "KafkaPartitionList", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions", + "next": null + }, + "data": [ + { + "kind": "KafkaPartition", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/1", + "resource_name": "crn:///kafka=cluster-1/topic=topic-1/partition=1" + }, + "cluster_id": "cluster-1", + "topic_name": "topic-1", + "partition_id": 1, + "leader": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/1/replicas/1" + }, + "replicas": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/1/replicas" + }, + "reassignment": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/1/reassignment" + } + }, + { + "kind": "KafkaPartition", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/2", + "resource_name": "crn:///kafka=cluster-1/topic=topic-1/partition=2" + }, + "cluster_id": "cluster-1", + "topic_name": "topic-1", + "partition_id": 2, + "leader": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/2/replicas/2" + }, + "replicas": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/2/replicas" + }, + "reassignment": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/2/reassignment" + } + }, + { + "kind": "KafkaPartition", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/3", + "resource_name": "crn:///kafka=cluster-1/topic=topic-1/partition=3" + }, + "cluster_id": "cluster-1", + "topic_name": "topic-1", + "partition_id": 3, + "leader": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/3/replicas/3" + }, + "replicas": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/3/replicas" + }, + "reassignment": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/3/reassignment" + } + } + ] + } +`), 200, "200 OK", nil + } + c := NewClient(&mock, &mk) + partitions, err := c.GetTopicPartitions("cluster-1", "topic-1") + if assert.Nil(t, err) { + assert.Equal(t, 3, len(partitions)) + assert.Equal(t, "topic-1", partitions[0].TopicName) + assert.Equal(t, 3, partitions[2].PartitionId) + } +} + +func TestRBac_GetTopicPartitionsFailWithNotExist(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics/topic-1/partitions", uri) + return nil, 404, "404 Not Found", nil + } + c := NewClient(&mock, &mk) + _, err := c.GetTopicPartitions("cluster-1", "topic-1") + + if assert.NotNil(t, err) { + assert.Equal(t, errors.New("error with status: 404 Not Found"), err) + } +} + +func TestRBac_GetTopicPartitionsFailWithWrongResponse(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics/topic-1/partitions", uri) + return []byte(` + < +`), 200, "200 OK", nil + } + c := NewClient(&mock, &mk) + _, err := c.GetTopicPartitions("cluster-1", "topic-1") + assert.NotNil(t, err) +} diff --git a/rbac.go b/rbac.go new file mode 100644 index 0000000..803f90a --- /dev/null +++ b/rbac.go @@ -0,0 +1,117 @@ +package confluent + +import ( + "bytes" + "encoding/json" +) + +const ( + principalPath = "/security/1.0/principals/" +) + +type ClusterDetails struct { + ClusterName string `json:"clusterName,omitempty"` + Clusters Clusters `json:"clusters"` +} + +type ResourcePattern struct { + ResourceType string `json:"resourceType,omitempty"` + Name string `json:"name,omitempty"` + PatternType string `json:"patternType,omitempty"` +} + +type RoleBinding struct { + Scope ClusterDetails `json:"scope"` + ResourcePatterns []ResourcePattern `json:"resourcePatterns"` +} + +// BindPrincipalToRole will bind the principal to a cluster-scoped role for a specific cluster or in a given scope +func (c *Client) BindPrincipalToRole(principal, roleName string, cDetails ClusterDetails) error { + u := principalPath + principal + "/roles/" + roleName + + payloadBuf := new(bytes.Buffer) + json.NewEncoder(payloadBuf).Encode(cDetails) + + _, err := c.DoRequest("POST", u, payloadBuf) + if err != nil { + return err + } + return nil +} + +// DeleteRoleBinding remove the role (cluster or resource scoped) from the principal at the give scope/cluster +func (c *Client) DeleteRoleBinding(principal, roleName string, cDetails ClusterDetails) error { + u := principalPath + principal + "/roles/" + roleName + + payloadBuf := new(bytes.Buffer) + json.NewEncoder(payloadBuf).Encode(cDetails) + + _, err := c.DoRequest("DELETE", u, payloadBuf) + if err != nil { + return err + } + return nil +} + +// LookupRoleBinding will lookup the role-bindings for the principal at the given scope/cluster using the given role +func (c *Client) LookupRoleBinding(principal, roleName string, cDetails ClusterDetails) ([]ResourcePattern, error) { + u := principalPath + principal + "/roles/" + roleName + "/resources" + + payloadBuf := new(bytes.Buffer) + json.NewEncoder(payloadBuf).Encode(cDetails) + + r, err := c.DoRequest("POST", u, payloadBuf) + if err != nil { + return nil, err + } + + var patterns []ResourcePattern + + err = json.Unmarshal(r, &patterns) + if err != nil { + return nil, err + } + return patterns, nil +} + +// IncreaseRoleBinding : incrementally grant the resources to the principal at the given scope/cluster using the given role +func (c *Client) IncreaseRoleBinding(principal, roleName string, uRoleBinding RoleBinding) error { + u := principalPath + principal + "/roles/" + roleName + "/bindings" + + payloadBuf := new(bytes.Buffer) + json.NewEncoder(payloadBuf).Encode(uRoleBinding) + + _, err := c.DoRequest("POST", u, payloadBuf) + if err != nil { + return err + } + return nil +} + +// DecreaseRoleBinding : Incrementally remove the resources from the principal at the given scope/cluster using the given role +func (c *Client) DecreaseRoleBinding(principal, roleName string, uRoleBinding RoleBinding) error { + u := principalPath + principal + "/roles/" + roleName + "/bindings" + + payloadBuf := new(bytes.Buffer) + json.NewEncoder(payloadBuf).Encode(uRoleBinding) + + _, err := c.DoRequest("DELETE", u, payloadBuf) + if err != nil { + return err + } + return nil +} + +// OverwriteRoleBinding will overwrite existing resource grants +func (c *Client) OverwriteRoleBinding(principal, roleName string, uRoleBinding RoleBinding) error { + u := principalPath + principal + "/roles/" + roleName + "/bindings" + + payloadBuf := new(bytes.Buffer) + json.NewEncoder(payloadBuf).Encode(uRoleBinding) + + _, err := c.DoRequest("PUT", u, payloadBuf) + if err != nil { + return err + } + return nil +} diff --git a/rbac_test.go b/rbac_test.go new file mode 100644 index 0000000..da9ce76 --- /dev/null +++ b/rbac_test.go @@ -0,0 +1,304 @@ +package confluent + +import ( + "errors" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +var ( + cDetails = ClusterDetails{ + Clusters: Clusters{ + KafkaCluster: clusterId, + }, + } + principal = "User:confluent-test" + roleName = "Operator" + uRoleBinding = RoleBinding{ + Scope: cDetails, + ResourcePatterns: []ResourcePattern{ + { + ResourceType: "TOPIC", + Name: "Test-binding", + PatternType: "PREFIXED", + }, + }, + } +) + +func TestRBac_BindPrincipalToRoleSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPost, method, "Expected method 'POST', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator", uri) + return []byte(``), 204, "204", nil + } + c := NewClient(&mock, &mk) + err := c.BindPrincipalToRole("User:confluent-test", "Operator", cDetails) + assert.Nil(t, err) +} + +func TestRBac_BindPrincipalToRoleFailWithNonExistedRole(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPost, method, "Expected method 'POST', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator", uri) + return []byte(` + { + "status_code": 422, + "error_code": 4221, + "type": "NonExisted", + "message": "Cannot find role Operator", + "errors": [ + { + "error_type": "string", + "message": "Cannot find role Operator" + } + ] + } + `), 422, "422 Unprocessable Entity", nil + } + c := NewClient(&mock, &mk) + err := c.BindPrincipalToRole("User:confluent-test", "Operator", cDetails) + assert.NotNil(t, err) + assert.Equal(t, errors.New("error with status: 422 Unprocessable Entity Cannot find role Operator"), err) +} + +func TestRBac_DeleteRoleBindingSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodDelete, method, "Expected method 'DELETE', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator", uri) + return []byte(``), 204, "204", nil + } + c := NewClient(&mock, &mk) + err := c.DeleteRoleBinding(principal, roleName, cDetails) + assert.Nil(t, err) +} + +func TestRBac_DeleteRoleBindingFailWithNonExistedRole(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodDelete, method, "Expected method 'DELETE', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator", uri) + return []byte(` + { + "status_code": 422, + "error_code": 4221, + "type": "NonExisted", + "message": "Cannot find role Operator", + "errors": [ + { + "error_type": "string", + "message": "Cannot find role Operator" + } + ] + } +`), 422, "422 Unprocessable Entity", nil + } + c := NewClient(&mock, &mk) + err := c.DeleteRoleBinding("User:confluent-test", "Operator", ClusterDetails{ + Clusters: Clusters{ + KafkaCluster: "cluster-1", + }, + }) + assert.NotNil(t, err) + assert.Equal(t, errors.New("error with status: 422 Unprocessable Entity Cannot find role Operator"), err) +} + +func TestRBac_LookupRoleBindingSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPost, method, "Expected method 'POST', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator/resources", uri) + return []byte(` + [ + { + "resourceType": "Topic", + "name": "clicksTopic1", + "patternType": "LITERAL" + }, + { + "resourceType": "Topic", + "name": "orders-2019", + "patternType": "PREFIXED" + } + ] +`), 200, "200 OK", nil + } + c := NewClient(&mock, &mk) + roleBinding, err := c.LookupRoleBinding(principal, roleName, cDetails) + if assert.Nil(t, err) { + assert.Equal(t, 2, len(roleBinding)) + assert.Equal(t, "clicksTopic1", roleBinding[0].Name) + } +} + +func TestRBac_LookupRoleBindingFailWithNonExistedRole(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPost, method, "Expected method 'POST', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator/resources", uri) + return []byte(` + { + "status_code": 422, + "error_code": 4221, + "type": "NonExisted", + "message": "Cannot find role `+roleName+`", + "errors": [ + { + "error_type": "string", + "message": "Cannot find role `+roleName+`" + } + ] + } +`), 400, "404 Not Found", nil + } + c := NewClient(&mock, &mk) + roleBinding, err := c.LookupRoleBinding(principal, roleName, cDetails) + assert.NotNil(t, err) + assert.Equal(t, errors.New("error with status: 404 Not Found Cannot find role "+roleName), err) + assert.Nil(t, roleBinding) +} + +func TestRBac_IncreaseRoleBindingSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPost, method, "Expected method 'POST', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator/bindings", uri) + return []byte(``), 200, "200 OK", nil + } + c := NewClient(&mock, &mk) + err := c.IncreaseRoleBinding(principal, roleName, uRoleBinding) + assert.Nil(t, err) +} + +func TestRBac_IncreaseRoleBindingFailWithNonExistedRole(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPost, method, "Expected method 'POST', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator/bindings", uri) + return []byte(` + { + "status_code": 422, + "error_code": 4221, + "type": "NonExisted", + "message": "Cannot find role `+roleName+`", + "errors": [ + { + "error_type": "string", + "message": "Cannot find role `+roleName+`" + } + ] + } +`), 404, "404 Not Found", nil + } + c := NewClient(&mock, &mk) + err := c.IncreaseRoleBinding(principal, roleName, uRoleBinding) + assert.NotNil(t, err) + assert.Equal(t, errors.New("error with status: 404 Not Found Cannot find role "+roleName), err) +} + +func TestRBac_DecreaseRoleBindingSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodDelete, method, "Expected method 'DELETE', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator/bindings", uri) + return []byte(``), 204, "204", nil + } + c := NewClient(&mock, &mk) + err := c.DecreaseRoleBinding(principal, roleName, uRoleBinding) + assert.Nil(t, err) +} + +func TestRBac_DecreaseRoleBindingFailWithNonExistedRole(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodDelete, method, "Expected method 'DELETE', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator/bindings", uri) + return []byte(` + { + "status_code": 422, + "error_code": 4221, + "type": "NonExisted", + "message": "Cannot find role `+roleName+`", + "errors": [ + { + "error_type": "string", + "message": "Cannot find role `+roleName+`" + } + ] + } +`), 404, "404 Not Found", nil + } + c := NewClient(&mock, &mk) + err := c.DecreaseRoleBinding(principal, roleName, uRoleBinding) + assert.NotNil(t, err) + assert.Equal(t, errors.New("error with status: 404 Not Found Cannot find role "+roleName), err) +} + +func TestRBac_OverwriteRoleBindingSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPut, method, "Expected method 'PUT', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator/bindings", uri) + return []byte(` + { + "status_code": 422, + "error_code": 4221, + "type": "NonExisted", + "message": "Cannot find role `+roleName+`", + "errors": [ + { + "error_type": "string", + "message": "Cannot find role `+roleName+`" + } + ] + } +`), 204, "", nil + } + c := NewClient(&mock, &mk) + err := c.OverwriteRoleBinding(principal, roleName, uRoleBinding) + assert.Nil(t, err) +} + +func TestRBac_OverwriteRoleBindingFailWithNonExistedRole(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPut, method, "Expected method 'PUT', got %s", method) + assert.Equal(t, "/security/1.0/principals/User:confluent-test/roles/Operator/bindings", uri) + return []byte(` + { + "status_code": 422, + "error_code": 4221, + "type": "NonExisted", + "message": "Cannot find role Operator", + "errors": [ + { + "error_type": "string", + "message": "Cannot find role Operator" + } + ] + } +`), 404, "404 Not Found", nil + } + c := NewClient(&mock, &mk) + err := c.OverwriteRoleBinding(principal, roleName, uRoleBinding) + assert.NotNil(t, err) + assert.Equal(t, errors.New("error with status: 404 Not Found Cannot find role "+roleName), err) +} diff --git a/topics.go b/topics.go new file mode 100644 index 0000000..7b26450 --- /dev/null +++ b/topics.go @@ -0,0 +1,334 @@ +package confluent + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/Shopify/sarama" + "io" + "math/rand" + "time" +) + +const ( + topicPath = "topics" +) + +type relatedTopic struct { + ClusterID string `json:"cluster_id,omitempty"` + TopicName string `json:"topic_name,omitempty"` + IsInternal bool `json:"is_internal,omitempty"` + ReplicationFactor int16 `json:"replication_factor,omitempty"` + Partitions Related `json:"partitions,omitempty"` + Configs Related `json:"configs,omitempty"` + PartitionReassignments Related `json:"partition_reassignments,omitempty"` +} + +type ReplicasAssignment struct { + ParitionId int `json:"partition_id,omitempty"` + BrokerIds []int `json:"broker_ids,omitempty"` +} + +type Topic struct { + ClusterID string `json:"cluster_id,omitempty"` + IsInternal bool `json:"is_internal,omitempty"` + Name string `json:"topic_name"` + Partitions int32 `json:"partitions_count,omitempty"` + ReplicationFactor int16 `json:"replication_factor,omitempty"` + Config []TopicConfig `json:"configs,omitempty"` + ReplicasAssignments []ReplicasAssignment `json:"replicas_assignments,omitempty"` + PartitionsDetails []Partition `json:"partitions_details,omitempty"` +} + +func (c *Client) DoRequest(method string, uri string, reqBody io.Reader) ([]byte, error) { + respBody, statusCode, status, err := c.httpClient.DoRequest(method, uri, reqBody) + if err != nil { + return respBody, err + } + if statusCode > 204 { + var errorBody *ErrorResponse + err = json.Unmarshal(respBody, &errorBody) + if err != nil { + return nil, errors.New("error with status: " + status) + } + + if errorBody.Errors != nil { + return nil, errors.New("error with status: " + status + " " + errorBody.Errors[0].Message) + } + return nil, errors.New("error with status: " + status + " " + errorBody.Message) + } + return respBody, nil +} + +func (c *Client) ListTopics(clusterId string) ([]Topic, error) { + uri := "/kafka/v3/clusters/" + clusterId + "/" + topicPath + r, err := c.DoRequest("GET", uri, nil) + if err != nil { + return nil, err + } + + body := struct { + Data []relatedTopic `json:"data"` + }{} + + err = json.Unmarshal(r, &body) + if err != nil { + return nil, err + } + var topics []Topic + for _, v := range body.Data { + topics = append(topics, Topic{ + ClusterID: v.ClusterID, + IsInternal: v.IsInternal, + Name: v.TopicName, + ReplicationFactor: v.ReplicationFactor, + }) + } + + return topics, nil +} + +func (c *Client) GetTopic(clusterId, topicName string) (*Topic, error) { + uri := "/kafka/v3/clusters/" + clusterId + "/" + topicPath + "/" + topicName + r, err := c.DoRequest("GET", uri, nil) + if err != nil { + return nil, err + } + + body := relatedTopic{} + + err = json.Unmarshal(r, &body) + if err != nil { + return nil, err + } + topic := Topic{ + ClusterID: body.ClusterID, + Name: body.TopicName, + IsInternal: body.IsInternal, + ReplicationFactor: body.ReplicationFactor, + } + + p, err := c.GetTopicPartitions(clusterId, topicName) + if err != nil { + return nil, err + } + topic.Partitions = int32(len(p)) + topic.PartitionsDetails = p + + config, err := c.GetTopicConfigs(clusterId, topicName) + if err != nil { + return nil, err + } + topic.Config = config + + return &topic, nil +} + +func (c *Client) CreateTopic(clusterId, topicName string, partitionsCount, replicationFactor int, configs []TopicConfig, replicasAssignments []ReplicasAssignment) error { + uri := "/kafka/v3/clusters/" + clusterId + "/" + topicPath + topicConfig := &Topic{ + Name: topicName, + Partitions: int32(partitionsCount), + ReplicationFactor: int16(replicationFactor), + Config: configs, + ReplicasAssignments: replicasAssignments, + } + payloadBuf := new(bytes.Buffer) + err := json.NewEncoder(payloadBuf).Encode(topicConfig) + if err != nil { + return err + } + + _, err = c.DoRequest("POST", uri, payloadBuf) + if err != nil { + return err + } + + return nil +} + +func (c *Client) DeleteTopic(clusterId, topicName string) error { + uri := "/kafka/v3/clusters/" + clusterId + "/" + topicPath + "/" + topicName + _, err := c.DoRequest("DELETE", uri, nil) + if err != nil { + return err + } + + return nil +} + +func (c *Client) UpdatePartitions(t Topic) error { + broker, err := c.saramaClient.Controller() + if err != nil { + return err + } + + timeout := time.Duration(c.saramaClient.Config().Admin.Timeout) * time.Second + tp := map[string]*sarama.TopicPartition{ + t.Name: &sarama.TopicPartition{ + Count: int32(t.Partitions), + }, + } + + req := &sarama.CreatePartitionsRequest{ + TopicPartitions: tp, + Timeout: timeout, + ValidateOnly: false, + } + res, err := broker.CreatePartitions(req) + if err == nil { + for _, e := range res.TopicPartitionErrors { + if e.Err != sarama.ErrNoError { + return e.Err + } + } + } + + return err +} + +func (c *Client) UpdateReplicationsFactor(t Topic) error { + if err := c.saramaClient.RefreshMetadata(); err != nil { + return err + } + + admin, err := c.saramaClient.NewClusterAdminFromClient() + if err != nil { + return err + } + + assignment, err := c.buildAssignment(t) + if err != nil { + return err + } + + return admin.Fn.AlterPartitionReassignments(t.Name, *assignment) +} + +func (c *Client) IsReplicationFactorUpdating(topic string) (bool, error) { + if err := c.saramaClient.RefreshMetadata(); err != nil { + return false, err + } + + partitions, err := c.saramaClient.Partitions(topic) + if err != nil { + return false, err + } + + admin, err := c.saramaClient.NewClusterAdminFromClient() + if err != nil { + return false, err + } + + statusMap, err := admin.Fn.ListPartitionReassignments(topic, partitions) + if err != nil { + return false, err + } + + for _, status := range statusMap[topic] { + if isPartitionRFChanging(status) { + return true, nil + } + } + + return false, nil +} + +func (c *Client) allReplicas() *[]int32 { + brokers := c.saramaClient.Brokers() + replicas := make([]int32, 0, len(brokers)) + + for _, b := range brokers { + id := c.saramaClient.ID(b) + fmt.Println(id) + if id != -1 { + replicas = append(replicas, id) + } + } + + return &replicas +} + +func (c *Client) buildAssignment(t Topic) (*[][]int32, error) { + partitions, err := c.saramaClient.Partitions(t.Name) + if err != nil { + return nil, err + } + + allReplicas := c.allReplicas() + newRF := t.ReplicationFactor + rand.Seed(time.Now().UnixNano()) + + assignment := make([][]int32, len(partitions)) + for _, p := range partitions { + oldReplicas, err := c.saramaClient.Replicas(t.Name, p) + if err != nil { + return &assignment, err + } + + oldRF := int16(len(oldReplicas)) + deltaRF := int16(newRF) - oldRF + newReplicas, err := buildNewReplicas(allReplicas, &oldReplicas, deltaRF) + if err != nil { + return &assignment, err + } + + assignment[p] = *newReplicas + } + + return &assignment, nil +} + +func isPartitionRFChanging(status *sarama.PartitionReplicaReassignmentsStatus) bool { + return len(status.AddingReplicas) != 0 || len(status.RemovingReplicas) != 0 +} + +func buildNewReplicas(allReplicas *[]int32, usedReplicas *[]int32, deltaRF int16) (*[]int32, error) { + usedCount := int16(len(*usedReplicas)) + + if deltaRF == 0 { + return usedReplicas, nil + } else if deltaRF < 0 { + end := usedCount + deltaRF + if end < 1 { + return nil, errors.New("dropping too many replicas") + } + + head := (*usedReplicas)[:end] + return &head, nil + } else { + extraCount := int16(len(*allReplicas)) - usedCount + if extraCount < deltaRF { + return nil, errors.New("not enough brokers") + } + + unusedReplicas := *findUnusedReplicas(allReplicas, usedReplicas, extraCount) + newReplicas := *usedReplicas + for i := int16(0); i < deltaRF; i++ { + j := rand.Intn(len(unusedReplicas)) + newReplicas = append(newReplicas, unusedReplicas[j]) + unusedReplicas[j] = unusedReplicas[len(unusedReplicas)-1] + unusedReplicas = unusedReplicas[:len(unusedReplicas)-1] + } + + return &newReplicas, nil + } +} + +func findUnusedReplicas(allReplicas *[]int32, usedReplicas *[]int32, extraCount int16) *[]int32 { + usedMap := make(map[int32]bool, len(*usedReplicas)) + for _, r := range *usedReplicas { + usedMap[r] = true + } + + unusedReplicas := make([]int32, 0, extraCount) + for _, r := range *allReplicas { + _, exists := usedMap[r] + if !exists { + unusedReplicas = append(unusedReplicas, r) + } + } + + return &unusedReplicas +} diff --git a/topics_test.go b/topics_test.go new file mode 100644 index 0000000..a9b4572 --- /dev/null +++ b/topics_test.go @@ -0,0 +1,285 @@ +package confluent + +import ( + "errors" + "github.com/Shopify/sarama" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTopics_CreateTopicsSuccess(t *testing.T) { + mk := MockKafkaClient{} + mock := MockHttpClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPost, method, "Expected method 'POST', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics", uri) + return []byte(`{ + "kind": "KafkaTopic", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-X", + "resource_name": "crn:///kafka=cluster-1/topic=topic-X" + }, + "cluster_id": "cluster-1", + "topic_name": "topic-X", + "is_internal": false, + "replication_factor": 3, + "partitions": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-X/partitions" + }, + "configs": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-X/configs" + }, + "partition_reassignments": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-X/partitions/-/reassignments" + } + } + `), 204, "204 Accepted", nil + } + c := NewClient(&mock, &mk) + err := c.CreateTopic(clusterId, "topic-X", 3, 3, partitionConfig, nil) + assert.NoError(t, err) +} + +func TestTopics_CreateExistingTopic(t *testing.T) { + mk := MockKafkaClient{} + mock := MockHttpClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodPost, method, "Expected method 'POST', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics", uri) + return []byte(` + { + "error_code": 400, + "message": "Topic 'topic-X' already exists" + } + `), 400, "400 Bad Request", nil + } + c := NewClient(&mock, &mk) + err := c.CreateTopic(clusterId, "topic-X", 3, 3, partitionConfig, nil) + assert.Equal(t, errors.New("error with status: 400 Bad Request Topic 'topic-X' already exists"), err) +} + +func TestTopics_GetNonExistingTopic(t *testing.T) { + mk := MockKafkaClient{} + mock := MockHttpClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics/topic-X", uri) + return []byte(` + { + "error_code": 404, + "message": "This server does not host this topic-partition" + } + `), 404, "404 Not Found", nil + } + c := NewClient(&mock, &mk) + newTopic, err := c.GetTopic(clusterId, "topic-X") + assert.Equal(t, errors.New("error with status: 404 Not Found This server does not host this topic-partition"), err) + assert.Nil(t, newTopic) +} + + +func TestTopics_GetAllTopics(t *testing.T) { + mk := MockKafkaClient{} + mock := MockHttpClient{} + mock.DoRequestFn = func(method string, uri string, reqBody io.Reader) (responseBody []byte, statusCode int, status string, err error) { + assert.Equal(t, http.MethodGet, method, "Expected method 'GET', got %s", method) + assert.Equal(t, "/kafka/v3/clusters/cluster-1/topics", uri) + return []byte(` + { + "kind": "KafkaTopicList", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics", + "next": null + }, + "data": [ + { + "kind": "KafkaTopic", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1", + "resource_name": "crn:///kafka=cluster-1/topic=topic-1" + }, + "cluster_id": "cluster-1", + "topic_name": "topic-1", + "is_internal": false, + "replication_factor": 3, + "partitions": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions" + }, + "configs": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/configs" + }, + "partition_reassignments": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-1/partitions/-/reassignments" + } + }, + { + "kind": "KafkaTopic", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-2", + "resource_name": "crn:///kafka=cluster-1/topic=topic-2" + }, + "cluster_id": "cluster-1", + "topic_name": "topic-2", + "is_internal": true, + "replication_factor": 4, + "partitions": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-2/partitions" + }, + "configs": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-2/configs" + }, + "partition_reassignments": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-2/partitions/-/reassignments" + } + }, + { + "kind": "KafkaTopic", + "metadata": { + "self": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-3", + "resource_name": "crn:///kafka=cluster-1/topic=topic-3" + }, + "cluster_id": "cluster-1", + "topic_name": "topic-3", + "is_internal": false, + "replication_factor": 5, + "partitions": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-3/partitions" + }, + "configs": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-3/configs" + }, + "partition_reassignments": { + "related": "http://localhost:9391/v3/clusters/cluster-1/topics/topic-3/partitions/-/reassignments" + } + } + ] + } + `), 200, "OK", nil + } + c := NewClient(&mock, &mk) + topics, err := c.ListTopics(clusterId) + if assert.NoError(t, err) { + assert.Equal(t, 3, len(topics)) + assert.Equal(t, "topic-2", topics[1].Name) + } +} + +func TestTopics_UpdatePartitionsSuccess(t *testing.T) { + mock := MockHttpClient{} + + seedBroker := sarama.NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "CreatePartitionsRequest": sarama.NewMockCreatePartitionsResponse(t), + }) + + mk := MockKafkaClient{} + mk.MockBrokers = seedBroker + mk.MockVersion = sarama.V2_4_0_0 + + c := NewClient(&mock, &mk) + + mockTopic := Topic{ + Name: "my-topic", + Partitions: 3, + } + err := c.UpdatePartitions(mockTopic) + assert.Nil(t, err) +} + +func TestTopics_UpdatePartitionsWithoutAuthorize(t *testing.T) { + mock := MockHttpClient{} + seedBroker := sarama.NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "CreatePartitionsRequest": sarama.NewMockCreatePartitionsResponse(t), + }) + + mk := MockKafkaClient{} + mk.MockBrokers = seedBroker + mk.MockVersion = sarama.V1_0_0_0 + + c := NewClient(&mock, &mk) + + mockTopic := Topic{ + Name: "_internal_topic", + Partitions: 3, + } + want := "The client is not authorized to access this topic" + err := c.UpdatePartitions(mockTopic) + assert.Contains(t, err.Error(), want) +} + +func TestTopics_GetUpdatingSuccess(t *testing.T) { + mock := MockHttpClient{} + mk := MockKafkaClient{} + mk.TopicNameExpected = "my-topic" + + c := NewClient(&mock, &mk) + + mockTopic := Topic{ + Name: "my-topic", + Partitions: 3, + } + + _, err := c.IsReplicationFactorUpdating(mockTopic.Name) + if err != nil { + t.Fatal(err) + } +} + +func TestTopics_IsPartitionRFChanging(t *testing.T) { + var status sarama.PartitionReplicaReassignmentsStatus + status.AddingReplicas = []int32{} + status.RemovingReplicas = []int32{} + assert.Equal(t, isPartitionRFChanging(&status), false) + + status.AddingReplicas = []int32{0, 2} + + assert.Equal(t, isPartitionRFChanging(&status), true) + + status.RemovingReplicas = []int32{0, 2} + + assert.Equal(t, isPartitionRFChanging(&status), true) +} + +func TestTopics_BuildNewReplicas(t *testing.T) { + allReplicas := []int32{1, 2, 3, 4} + usedReplicas := []int32{5, 6, 7} + deltaRF := int16(0) + + b,_ := buildNewReplicas(&allReplicas, &usedReplicas, deltaRF) + assert.Equal(t, &usedReplicas, b) + + deltaRF = int16(-4) + _, err := buildNewReplicas(&allReplicas, &usedReplicas, deltaRF) + assert.NotNil(t, err) + assert.Equal(t, errors.New("dropping too many replicas"), err) + + deltaRF = int16(-1) + b, err = buildNewReplicas(&allReplicas, &usedReplicas, deltaRF) + assert.Nil(t, err) + assert.Equal(t, usedReplicas[:2], *b) + + deltaRF = int16(1) + b, err = buildNewReplicas(&allReplicas, &usedReplicas, deltaRF) + assert.Nil(t, err) + assert.Equal(t, []int32{5, 6, 7, 4}, *b) + + deltaRF = int16(2) + b, err = buildNewReplicas(&allReplicas, &usedReplicas, deltaRF) + assert.NotNil(t, err) + assert.Equal(t, errors.New("not enough brokers"), err) +} +