Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

broadcast.SendSync field creation and deletion to all nodes #1132

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions broadcast.go
Expand Up @@ -132,6 +132,8 @@ const (
MessageTypeCreateInputDefinition = 6
MessageTypeDeleteInputDefinition = 7
MessageTypeDeleteView = 8
MessageTypeCreateField = 9
MessageTypeDeleteField = 10
)

// MarshalMessage encodes the protobuf message into a byte slice.
Expand All @@ -148,6 +150,10 @@ func MarshalMessage(m proto.Message) ([]byte, error) {
typ = MessageTypeCreateFrame
case *internal.DeleteFrameMessage:
typ = MessageTypeDeleteFrame
case *internal.CreateFieldMessage:
typ = MessageTypeCreateField
case *internal.DeleteFieldMessage:
typ = MessageTypeDeleteField
case *internal.CreateInputDefinitionMessage:
typ = MessageTypeCreateInputDefinition
case *internal.DeleteInputDefinitionMessage:
Expand Down Expand Up @@ -180,6 +186,10 @@ func UnmarshalMessage(buf []byte) (proto.Message, error) {
m = &internal.CreateFrameMessage{}
case MessageTypeDeleteFrame:
m = &internal.DeleteFrameMessage{}
case MessageTypeCreateField:
m = &internal.CreateFieldMessage{}
case MessageTypeDeleteField:
m = &internal.DeleteFieldMessage{}
case MessageTypeCreateInputDefinition:
m = &internal.CreateInputDefinitionMessage{}
case MessageTypeDeleteInputDefinition:
Expand Down
31 changes: 27 additions & 4 deletions handler.go
Expand Up @@ -648,7 +648,6 @@ func (h *Handler) handlePostFrame(w http.ResponseWriter, r *http.Request) {
}

h.Holder.Stats.CountWithCustomTags("createFrame", 1, 1.0, []string{fmt.Sprintf("index:%s", indexName)})

}

type _postFrameRequest postFrameRequest
Expand Down Expand Up @@ -800,17 +799,30 @@ func (h *Handler) handlePostFrameField(w http.ResponseWriter, r *http.Request) {
return
}

// Create new field.
if err := f.CreateField(&Field{
field := &Field{
Name: fieldName,
Type: req.Type,
Min: req.Min,
Max: req.Max,
}); err != nil {
}

// Create new field.
if err := f.CreateField(field); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// Send the create field message to all nodes.
err := h.Broadcaster.SendSync(
&internal.CreateFieldMessage{
Index: indexName,
Frame: frameName,
Field: encodeField(field),
})
if err != nil {
h.logger().Printf("problem sending CreateField message: %s", err)
}

// Encode response.
if err := json.NewEncoder(w).Encode(postFrameFieldResponse{}); err != nil {
h.logger().Printf("response encoding error: %s", err)
Expand Down Expand Up @@ -844,6 +856,17 @@ func (h *Handler) handleDeleteFrameField(w http.ResponseWriter, r *http.Request)
return
}

// Send the delete field message to all nodes.
err := h.Broadcaster.SendSync(
&internal.DeleteFieldMessage{
Index: indexName,
Frame: frameName,
Field: fieldName,
})
if err != nil {
h.logger().Printf("problem sending DeleteField message: %s", err)
}

// Encode response.
if err := json.NewEncoder(w).Encode(deleteFrameFieldResponse{}); err != nil {
h.logger().Printf("response encoding error: %s", err)
Expand Down