Skip to content

Commit

Permalink
Init project
Browse files Browse the repository at this point in the history
  • Loading branch information
Quan Phuong committed Jul 22, 2021
0 parents commit ebfc167
Show file tree
Hide file tree
Showing 29 changed files with 3,204 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
example/certs/*
vendor
251 changes: 251 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
# Confluent Platform Client

- Maintainer: Quan Phuong <quanpc294@gmail.com>
- Provide Go client for Confluent platform, reference [Confluent API document](https://docs.confluent.io/platform/current/kafka-rest/api.html) and [Confluent Metadata API document](https://docs.confluent.io/platform/current/security/rbac/mds-api.html)

## Context

We're running Confluent Platform (Not Confluent Cloud) that hosted on our infrastructure, and we can't find any clients that support us to integrate with Confluent Platform API. This project for our Confluent Platform with some configurations:

- SASL Authentication with username + password (Support both basic auth and LDAP user)
- Kafka Brokers embeded MDS server
- RBAC enabled

## Usages:

### Installation

```
go get github.com/wayarmny/gonfluent
```

### Implementation example

- I'm using 2 clients to connect with Confluent: HTTP clients and [Sarama Client](https://github.com/Shopify/sarama) to connect with 1 Confluent cluster, so that when you do initiate `Gonfluent`, you need to initiate 2 authentications methods.

```
package main
import (
"fmt"
"os"
confluent "github.com/wayarmy/gonfluent"
)
const (
ClientVersion = "0.1"
UserAgent = "confluent-client-go-sdk-" + ClientVersion
)
func main() {
baseUrl := "https://localhost:8090"
username := os.Getenv("CONFLUENT_USER")
password := os.Getenv("CONFLUENT_PASSWORD")
// Initialize the client
httpClient := confluent.NewDefaultHttpClient(baseUrl, username, password)
httpClient.UserAgent = UserAgent
bootstrapServer := []string{
"localhost:9093",
}
kConfig := &confluent.Config{
BootstrapServers: &bootstrapServer,
CACert: "certs/ca.pem",
ClientCert: "certs/cert.pem",
ClientCertKey: "certs/key.pem",
SkipTLSVerify: true,
SASLMechanism: "plain",
TLSEnabled: true,
Timeout: 120,
}
kClient, err := confluent.NewSaramaClient(kConfig)
if err != nil {
panic(err)
}
client := confluent.NewClient(httpClient, kClient)
bearerToken, err := client.Login()
if err != nil {
panic(err)
}
httpClient.Token = bearerToken
// Get the list of clusters in Confluent platform
listCluster, err := client.ListKafkaCluster()
if err != nil {
panic(err)
}
fmt.Println(listCluster)
// Get the cluster information
clusterId := listCluster[0].ClusterID
cluster, err := client.GetKafkaCluster(clusterId)
if err != nil {
panic(err)
}
fmt.Printf("%#v", cluster)
fmt.Println(cluster)
//
//topicConfig, err := client.GetTopicConfigs(clusterId, "example_topic_name")
//if err != nil {
// panic(err)
//}
//fmt.Printf("%#v", topicConfig)
//topic, err := client.GetTopic(clusterId, "test7-terraform-confluent-provider")
//if err != nil {
// panic(err)
//}
//fmt.Println(topic.Partitions)
////Create a new topic
//partitionConfig := []confluent.TopicConfig{
// {
// Name: "compression.type",
// Value: "gzip",
// },
// {
// Name: "cleanup.policy",
// Value: "compact",
// },
// //{
// // Name: "retention.ms",
// // Value: "20000",
// //},
//}
//err = client.CreateTopic(clusterId, "test_topic_name", 3, 3, partitionConfig, nil)
//if err != nil {
// panic(err)
//}
//fmt.Println("Topic Created!")
d := []confluent.TopicConfig{
{
Name: "compression.type",
Value: "gzip",
},
{
Name: "cleanup.policy",
Value: "compact",
},
{
Name: "retention.ms",
Value: "300000",
},
}
err = client.UpdateTopicConfigs(clusterId, "test_topic_name", d)
if err != nil {
panic(err)
}
fmt.Println("Topic Updated!")
//
// // Create Principal but don't know use case for it
// testPrincipals := []confluent.UserPrincipalAction{
// {
// Scope: confluent.Scope{
// Clusters: confluent.AuthorClusters{
// KafkaCluster: clusterId,
// },
// },
// ResourceName: "Testing-Principal",
// ResourceType: "Cluster",
// Operation: "Read",
// },
// }
// newPrincipal, err := client.CreatePrincipal("User:system-platform", testPrincipals)
// if err != nil {
// panic(err)
// }
// fmt.Printf("%s", newPrincipal)
//
// Get the topics information
//getTopic, err := client.GetTopic(clusterId, "test2_topic_name")
//if err != nil {
// panic(err)
//}
//fmt.Printf("%v", getTopic)
//Delete the existing topic
//deleteTopic := client.DeleteTopic(clusterId, "test_topic_name")
//if deleteTopic != nil {
// panic(deleteTopic)
//}
//fmt.Println("Topic deleted")
//// Add role assignment
//c := confluent.ClusterDetails{}
//c.Clusters.KafkaCluster = clusterId
//err = client.BindPrincipalToRole("User:manh.do", "Operator", c)
//if err != nil {
// panic(err)
//}
//fmt.Println("Role Binded!")
// // Add role assignment for principal to topic
// r := confluent.UpdateRoleBinding{
// Scope: c,
// }
// r.ResourcePatterns = []confluent.RoleBindings{
// {
// ResourceType: "Topic",
// Name: "system-platform",
// PatternType: "PREFIXED",
// },
// }
// principalCN := "User:CN=common-name.example.com"
// err = client.IncreaseRoleBinding(principalCN, "ResourceOwner", r)
// if err != nil {
// panic(err)
// }
// fmt.Println("Role Binded!")
//t := confluent.Topic{
// Name: "test_topic_name",
// Partitions: 10,
// ReplicationFactor: 4,
//}
//err = client.UpdatePartitions(t)
//if err != nil {
// panic(err)
//}
//fmt.Println("Update partition successfully")
//err = client.UpdateReplicationsFactor(t)
//if err != nil {
// panic(err)
//}
//fmt.Println("Update RF successfully")
}
```

## Contributing

- Clone this project

```
git clone ...
```

- Install any requirements

```
go mod download
```

- Add your code and logics
- Push to new branch and submit merge request after all test success

### Testing

`go test`
70 changes: 70 additions & 0 deletions acls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package confluent

import (
"bytes"
"encoding/json"
)

const (
aclsPath = "acls"
)

type Acl struct {
ClusterId string `json:"cluster_id,omitempty"`
ResourceType string `json:"resource_type,omitempty"`
ResourceName string `json:"resource_name,omitempty"`
PatternType string `json:"pattern_type,omitempty"`
Principal string `json:"principal,omitempty"`
Host string `json:"host,omitempty"`
Operation string `json:"operation,omitempty"`
Permission string `json:"permission,omitempty"`
}

func (c *Client) ListAcls(clusterId string) ([]Acl, error) {
u := "/clusters/" + clusterId + "/" + aclsPath
r, err := c.DoRequest("GET", u, nil)
if err != nil {
return nil, err
}

body := struct{
Data []Acl `json:"data"`
}{}

err = json.Unmarshal(r, &body)
if err != nil {
return nil, err
}

return body.Data, nil
}

func (c *Client) CreateAcl(clusterId string, aclConfig *Acl) error {
u := "/clusters/" + clusterId + "/" + aclsPath

payloadBuf := new(bytes.Buffer)
json.NewEncoder(payloadBuf).Encode(aclConfig)
_, err := c.DoRequest("POST", u, payloadBuf)
if err != nil {
return err
}

return nil
}

func (c *Client) DeleteAcl(clusterId, resourceName string) error {
u := "/clusters/" + clusterId + "/" + aclsPath

aclDetele := Acl{
ResourceName: resourceName,
}

payloadBuf := new(bytes.Buffer)
json.NewEncoder(payloadBuf).Encode(aclDetele)
_, err := c.DoRequest("DELETE", u, payloadBuf)
if err != nil {
return err
}

return nil
}
Loading

0 comments on commit ebfc167

Please sign in to comment.