Skip to content

arquivei/gomsgprocessor

Repository files navigation

GoMsgProcessor

A Golang library for parallel processing of messages to structured documents


Table of Contents

GoMsgProcessor is a generic library to read messages, in a recursively and parallel way, requiring only a builder to transform then to final documents. It is possible to set multiple builders, associating each one with a message type, allowing to work with messages from different sources. Through the namespaces, it is also be able to work with different targets. In addition, a deduplication function can be injected to clean up the slice of documents after the process.

Stack Version
Golang v1.18
golangci-lint v1.46.2
    • Any Golang programming language version installed, preferred 1.18 or later.
  • go get -u github.com/arquivei/gomsgprocessor
    
  • go mod vendor
    go mod tidy
    
    • Import the package

      import (
          "github.com/arquivei/gomsgprocessor"
      )
    • Define a incoming message struct

      type ExampleMessage struct {
          ID            int      `json:"id"`
          Name          string   `json:"name"`
          Age           int      `json:"age"`
          City          string   `json:"city"`
          State         string   `json:"state"`
          ChildrenNames []string `json:"childrenNames"`
          Namespace     string   `json:"namespace"`
      }
    • Implement the Message interface, witch is the input of ParallelProcessor's MakeDocuments.

      func (e *ExampleMessage) GetNamespace() gomsgprocessor.Namespace {
          // Namespace is a logical separator that will be used to group messages while processing then.
          return gomsgprocessor.Namespace(e.Namespace)
      }
      
      func (e *ExampleMessage) GetType() gomsgprocessor.MessageType {
          // MessageType is used to decide which DocumentBuilder to use for each Message.
          return gomsgprocessor.MessageType("typeExample")
      }
      
      func (e *ExampleMessage) UpdateLogWithData(ctx context.Context) {
          // Optional logger method
          log.Ctx(ctx).UpdateContext(func(zc zerolog.Context) zerolog.Context {
              return zc.
                  Int("msg_id", e.ID).
                  Str("msg_name", e.Name).
                  Strs("msg_children_names", e.ChildrenNames).
                  Int("msg_age", e.Age).
                  Str("msg_city", e.City).
                  Str("msg_state", e.State).
                  Str("msg_type", string(e.GetType())).
                  Str("msg_namespace", e.Namespace)
          })
      }
    • Define a outcoming document struct, witch is the result of a DocumentBuilder's Build.

      type ExampleDocument struct {
          ID              string
          CreatedAt       time.Time
          ParentName      string
          ParentBirthYear int
          ChildName       string
          CityAndState    string
          Namespace       string
      }
    • Implement the DocumentBuilder interface, witch transforms a Message into a slice of Documents.

      type ExampleBuilder struct{}
      
      // Build transforms a Message into []Document.
      func (b *ExampleBuilder) Build(_ context.Context, msg gomsgprocessor.Message) ([]gomsgprocessor.Document, error) {
          exampleMsg, ok := msg.(*ExampleMessage)
          if !ok {
              return nil, errors.New("failed to cast message")
          }
      
          // Parallel Processor will ignore this message
          if len(exampleMsg.ChildrenNames) == 0 {
              return nil, nil
          }
      
          documents := make([]gomsgprocessor.Document, 0, len(exampleMsg.ChildrenNames))
      
          for _, childName := range exampleMsg.ChildrenNames {
              documents = append(documents, ExampleDocument{
                  ID:              strconv.Itoa(exampleMsg.ID) + "_" + childName,
                  CreatedAt:       time.Now(),
                  ParentName:      exampleMsg.Name,
                  CityAndState:    exampleMsg.City + " - " + exampleMsg.State,
                  ChildName:       childName,
                  ParentBirthYear: time.Now().Year() - exampleMsg.Age,
                  Namespace:       exampleMsg.Namespace,
              })
          }
      
          return documents, nil
      }
    • Define a (optional) function, used for deduplicate the slice of documents.

        func ExampleDeduplicateDocuments(documents []gomsgprocessor.Document) ([]gomsgprocessor.Document, error) {
            examplesDocuments := make([]ExampleDocument, 0, len(documents))
            for _, document := range documents {
                exampleDocument, ok := document.(ExampleDocument)
                if !ok {
                  return nil, errors.New("failed to cast document")
                }
                examplesDocuments = append(examplesDocuments, exampleDocument)
            }
      
            documentsByID := make(map[string]ExampleDocument, len(examplesDocuments))
            for _, exampleDocument := range examplesDocuments {
                documentsByID[exampleDocument.ID] = exampleDocument
            }
      
            deduplicatedDocuments := make([]gomsgprocessor.Document, 0, len(documentsByID))
            for _, documentByID := range documentsByID {
                deduplicatedDocuments = append(deduplicatedDocuments, documentByID)
            }
            return deduplicatedDocuments, nil
        }
    • And now, it's time!

      func main() {
      
      	// NewParallelProcessor returns a new ParallelProcessor with a map of
      	// DocumentBuilder for each MessageType.
      	//
      	// A list of Option is also available for this method. See option.go for more
      	// information.
      	parallelProcessor := gomsgprocessor.NewParallelProcessor(
      		map[gomsgprocessor.MessageType]gomsgprocessor.DocumentBuilder{
      			"typeExample": &ExampleBuilder{},
      		},
      		gomsgprocessor.WithDeduplicateDocumentsOption(ExampleDeduplicateDocuments),
      	)
      
      	messages := []gomsgprocessor.Message{
      		&ExampleMessage{
      			ID:            1,
      			Name:          "John",
      			Age:           30,
      			City:          "New York",
      			State:         "NY",
      			ChildrenNames: []string{"John", "Jane", "Mary"},
      			Namespace:     "namespace1",
      		},
      		&ExampleMessage{
      			ID:            2,
      			Name:          "Poul",
      			Age:           25,
      			City:          "New Jersey",
      			State:         "NY",
      			ChildrenNames: []string{},
      			Namespace:     "namespace1",
      		},
      		&ExampleMessage{
      			ID:            3,
      			Name:          "Chris",
      			Age:           35,
      			City:          "Washington",
      			State:         "DC",
      			ChildrenNames: []string{"Bob"},
      			Namespace:     "namespace1",
      		},
      		&ExampleMessage{
      			ID:            3,
      			Name:          "Chris",
      			Age:           35,
      			City:          "Washington",
      			State:         "DC",
      			ChildrenNames: []string{"Bob"},
      			Namespace:     "namespace2",
      		},
      		&ExampleMessage{
      			ID:            1,
      			Name:          "John",
      			Age:           30,
      			City:          "New York",
      			State:         "NY",
      			ChildrenNames: []string{"John", "Jane", "Mary"},
      			Namespace:     "namespace1",
      		},
      	}
      
      	// MakeDocuments creates in parallel a slice of Document for given []Message
      	// using the map of DocumentBuilder (see NewParallelProcessor).
      	//
      	// This method returns a []Document and a (foundationkit/errors).Error.
      	// If not nil, this error has a (foundationkit/errors).Code associated with and
      	// can be a ErrCodeBuildDocuments or a ErrCodeDeduplicateDocuments.
      	documents, err := parallelProcessor.MakeDocuments(context.Background(), messages)
      	if err != nil {
      		panic(err)
      	}
      
      	examplesDocuments := make([]ExampleDocument, 0, len(documents))
      	for _, document := range documents {
      		exampleDocument, ok := document.(ExampleDocument)
      		if !ok {
      			panic("failed to cast document")
      		}
      		examplesDocuments = append(examplesDocuments, exampleDocument)
      	}
      
      	fmt.Println(JSONMarshal(examplesDocuments))
      }
      
      // Simple json marshaler with indentation
      func JSONMarshal(t interface{}) (string, error) {
      	buffer := &bytes.Buffer{}
      	encoder := json.NewEncoder(buffer)
      	encoder.SetEscapeHTML(false)
      	encoder.SetIndent("", "  ")
      	err := encoder.Encode(t)
      	return buffer.String(), err
      }
  • GoMsgProcessor 0.1.0 (May 20, 2022)

    • [New] Decoupling this package from Arquivei's API projects.
    • [New] Setting github's workflow with golangci-lint
    • [New] Example for usage.
    • [New] Documents: Code of Conduct, Contributing, License and Readme.

Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.

We use Semantic Versioning for versioning. For the versions available, see the tags on this repository.

This project is licensed under the BSD 3-Clause - see the LICENSE.md file for details.

Contacts can be made by email: rilder.almeida@arquivei.com.br