Skip to content

Commit

Permalink
[CN-438] Use Agent asynchronous upload API
Browse files Browse the repository at this point in the history
  • Loading branch information
dzeromski-hazelcast committed Jul 14, 2022
1 parent 75effe0 commit bc8411b
Show file tree
Hide file tree
Showing 13 changed files with 586 additions and 319 deletions.
6 changes: 3 additions & 3 deletions config/samples/_v1alpha1_hazelcast_persistence_agent.yaml
Expand Up @@ -3,13 +3,13 @@ kind: Hazelcast
metadata:
name: hazelcast
spec:
clusterSize: 3
clusterSize: 1
repository: 'docker.io/hazelcast/hazelcast-enterprise'
version: '5.1.2'
licenseKeySecret: hazelcast-license-key
agent:
repository: hazelcast/platform-operator-agent
version: 0.1.0
repository: docker.io/dzeromskhazelcast/platform-operator-agent
version: latest
persistence:
backupType: "External"
baseDir: "/data/hot-restart/"
Expand Down
4 changes: 2 additions & 2 deletions config/samples/_v1alpha1_hotbackup_agent.yaml
Expand Up @@ -4,11 +4,11 @@ metadata:
name: hot-backup
spec:
hazelcastResourceName: hazelcast
bucketURI: "s3://operator-backup"
bucketURI: "s3://operator-e2e-external-backup"
secret: "br-secret-s3"

# bucketURI: "gs://operator-agent-backup"
# secret: "br-secret-gcp"

# bucketURI: "azblob://backup"
# secret: "br-secret-az"
# secret: "br-secret-az"
95 changes: 0 additions & 95 deletions controllers/hazelcast/br_agent_rest_client.go
@@ -1,96 +1 @@
package hazelcast

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/hazelcast/hazelcast-platform-operator/api/v1alpha1"
n "github.com/hazelcast/hazelcast-platform-operator/internal/naming"
)

// Section contains the REST API endpoints.
const (
uploadBackup = "/upload"
)

type uploadRequest struct {
BucketURL string `json:"bucket_url"`
BackupFolderPath string `json:"backup_folder_path"`
HazelcastCRName string `json:"hz_cr_name"`
SecretName string `json:"secret_name"`
MemberUUID string `json:"member_uuid"`
}

type AgentRestClient struct {
h *v1alpha1.Hazelcast
bucketURL string
backupFolderPath string
hazelcastCRName string
secretName string
}

func NewAgentRestClient(h *v1alpha1.Hazelcast, hb *v1alpha1.HotBackup) *AgentRestClient {
return &AgentRestClient{
h: h,
bucketURL: hb.Spec.BucketURI,
backupFolderPath: h.Spec.Persistence.BaseDir,
hazelcastCRName: hb.Spec.HazelcastResourceName,
secretName: hb.Spec.Secret,
}
}

func (ac *AgentRestClient) UploadBackup(ctx context.Context) error {
ctxT, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
for _, member := range ac.h.Status.Members {
address := fmt.Sprintf("%s:%d", member.Ip, n.DefaultAgentPort)
upload := uploadRequest{
BucketURL: ac.bucketURL,
BackupFolderPath: ac.backupFolderPath + "/hot-backup",
HazelcastCRName: ac.hazelcastCRName,
SecretName: ac.secretName,
MemberUUID: member.Uid,
}
reqBody, err := json.Marshal(upload)
if err != nil {
return err
}
req, err := postRequestWithBody(ctxT, reqBody, address, uploadBackup)
if err != nil {
return fmt.Errorf("request creation failed: %s, address --> %q , URL --> %q ", err, address, address+uploadBackup)
}
res, err := ac.executeRequest(req)
if err != nil {
return err
}
defer res.Body.Close()
}
return nil
}

func (ac *AgentRestClient) executeRequest(req *http.Request) (*http.Response, error) {
res, err := http.DefaultClient.Do(req)
if err != nil {
return res, err
}
if res.StatusCode < http.StatusOK || res.StatusCode >= http.StatusBadRequest {
buf := new(strings.Builder)
_, _ = io.Copy(buf, res.Body)
return res, fmt.Errorf("unexpected HTTP error: %s, %s", res.Status, buf.String())
}
return res, nil
}

func postRequestWithBody(ctx context.Context, body []byte, address string, endpoint string) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("http://%s%s", address, endpoint), bytes.NewBuffer(body))
if err != nil {
return nil, err
}
return req, nil
}
5 changes: 3 additions & 2 deletions controllers/hazelcast/hazelcast_config_local_run.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hazelcast/hazelcast-go-client/logger"

hazelcastv1alpha1 "github.com/hazelcast/hazelcast-platform-operator/api/v1alpha1"
n "github.com/hazelcast/hazelcast-platform-operator/internal/naming"
)

const localUrl = "127.0.0.1:8000"
Expand All @@ -33,6 +34,6 @@ func restUrl(h *hazelcastv1alpha1.Hazelcast) string {
return fmt.Sprintf("http://%s", hazelcastUrl(h))
}

func hazelcastUrl(_ *hazelcastv1alpha1.Hazelcast) string {
return localUrl
func hazelcastUrl(h *hazelcastv1alpha1.Hazelcast) string {
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", h.Name, h.Namespace, n.DefaultHzPort)
}
49 changes: 49 additions & 0 deletions controllers/hazelcast/hazelcast_rest_client.go
Expand Up @@ -157,3 +157,52 @@ func postRequest(ctx context.Context, data string, url string, endpoint string)
}
return req, nil
}

// func (s *ClusterService) HotBackup(ctx context.Context, clusterName string) (*http.Response, error) {
// u := "hazelcast/rest/management/cluster/hotBackup"

// req, err := s.client.NewRequest("POST", u, clusterName)
// if err != nil {
// return nil, err
// }

// resp, err := s.client.Do(ctx, req, nil)
// if err != nil {
// return resp, err
// }

// return resp, nil
// }

// func (s *ClusterService) State(ctx context.Context) (*State, *http.Response, error) {
// u := "hazelcast/rest/management/cluster/state"

// req, err := s.client.NewRequest("GET", u, nil)
// if err != nil {
// return nil, nil, err
// }

// state := new(State)
// resp, err := s.client.Do(ctx, req, state)
// if err != nil {
// return nil, resp, err
// }

// return resp, nil
// }

// func (s *ClusterService) ForceStart(ctx context.Context, clusterName string) (*http.Response, error) {
// u := "hazelcast/rest/management/cluster/forceStart"

// req, err := s.client.NewRequest("POST", u, clusterName)
// if err != nil {
// return nil, err
// }

// resp, err := s.client.Do(ctx, req, nil)
// if err != nil {
// return resp, err
// }

// return resp, nil
// }

0 comments on commit bc8411b

Please sign in to comment.