Skip to content

Commit

Permalink
cache reporter (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
lucamrgs committed May 6, 2024
1 parent 50bb786 commit d828dd0
Show file tree
Hide file tree
Showing 19 changed files with 1,265 additions and 38 deletions.
49 changes: 43 additions & 6 deletions docs/content/en/docs/core-components/reporting.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ SOARCA utilizes push-based reporting to provide information on the instantiation

For the execution of a playbook, a *Decomposer* and invoked *Executor*s are injected with a *Reporter*. The *Reporter* maintains the reporting logic that reports execution information to a set of specified and available targets.

A reporting target can be internal to SOARCA, such as the [database](https://cossas.github.io/SOARCA/docs/core-components/database/), and the [report] API.
A reporting target can also be a third-party tool, such as an external SOAR/ SIEM, or incident case management system.
A reporting target can be internal to SOARCA, such as a [Cache](#cache-reporter). A reporting target can also be a third-party tool, such as an external SOAR/ SIEM, or incident case management system.

Upon execution trigger for a playbook, information about the chain of playbook steps to be executed will be pushed to the targets via dedicated reporting classes.

Along the execution of the workflow steps, the reporting classes will dynamically update the steps execution information such as output variables, and step execution success or failure.

The reporting features will enable the population and updating of views and data concerning workflow composition and its dynamic execution results. This data can be transmitted to SOARCA internal reporting components such as databases and APIs, as well as to third-party tools.
The reporting features will enable the population and updating of views and data concerning workflow composition and its dynamic execution results. This data can be transmitted to SOARCA internal reporting components such as a cache, as well as to third-party tools.

The schema below represents the architecture concept.

Expand Down Expand Up @@ -52,7 +51,6 @@ class Reporter {
ReportStep()
}
class Database
class Cache
class 3PTool
Expand All @@ -66,8 +64,6 @@ Reporter .up.|> IStepReporter
Reporter .up.|> IWorkflowReporter
Reporter -right-> IDownStreamReporter
Database .up.|> IDownStreamReporter
Cache .up.|> IDownStreamReporter
3PTool .up.|> IDownStreamReporter
Expand All @@ -86,3 +82,44 @@ The *DownStream* reporters will implement push-based reporting functions specifi
At this stage, third-party tools integrations may be built in SOARCA via packages implementing reporting logic for the specific tools. Alternatively, third-party tools may implement pull-based mechanisms (via the API) to get information from the execution of a playbook via SOARCA.

In the near future, we will (also) make available a SOARCA Report API that can establish a WebSocket connection to a third-party tool. As such, this will thus allow SOARCA to push execution updates as they come to third-party tools, without external tools having to poll SOARCA.

## Native Reporters

SOARCA implements internally reporting modules to handle database and caches reporting.

### Cache reporter

The *Cache* reporter mediates between [decomposer](https://cossas.github.io/SOARCA/docs/core-components/decomposer/) and [executors](https://cossas.github.io/SOARCA/docs/core-components/executer/), [database](https://cossas.github.io/SOARCA/docs/core-components/database/), and reporting APIs. As *DownStreamReporter*, the *Cache* stores workflow and step reports in-memory for an ongoing execution. As *IExecutionInformant*, the *Cache* provides information to the reporting API. The schema below shows how it is positioned in the SOARCA architecture.

```plantuml
@startuml
protocol /reporter
interface IDownStreamReporter {
ReportWorkflow() error
ReportStep() error
}
interface IDatabase
interface IExecutionInformer
class ReporterApi
class Reporter
class Cache {
cache []ExecutionEntry
}
"/reporter" -right-> ReporterApi
Reporter -> IDownStreamReporter
Cache -left-> IDatabase
Cache .up.|> IDownStreamReporter
Cache .up.|> IExecutionInformer
ReporterApi -down-> IExecutionInformer
```

The *Cache* thus reports the execution information downstream both in the database, and in memory. Upon execution information requests from the `/reporter` API, the cache can provide information fetching either from memory, or querying the database.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module soarca

go 1.22
go 1.22.0

require (
github.com/eclipse/paho.mqtt.golang v1.4.3
Expand Down
4 changes: 3 additions & 1 deletion internal/decomposer/decomposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ func (decomposer *Decomposer) Execute(playbook cacao.Playbook) (*ExecutionDetail
variables.Merge(playbook.PlaybookVariables)

// Reporting workflow instantiation
decomposer.reporter.ReportWorkflow(decomposer.details.ExecutionId, playbook)
decomposer.reporter.ReportWorkflowStart(decomposer.details.ExecutionId, playbook)

outputVariables, err := decomposer.ExecuteBranch(stepId, variables)

decomposer.details.Variables = outputVariables
// Reporting workflow end
decomposer.reporter.ReportWorkflowEnd(decomposer.details.ExecutionId, playbook, err)
return &decomposer.details, err
}

Expand Down
7 changes: 5 additions & 2 deletions internal/executors/action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ type Executor struct {

func (executor *Executor) Execute(meta execution.Metadata, metadata PlaybookStepMetadata) (cacao.Variables, error) {

executor.reporter.ReportStepStart(meta.ExecutionId, metadata.Step, metadata.Variables)

if metadata.Step.Type != cacao.StepTypeAction {
err := errors.New("the provided step type is not compatible with this executor")
log.Error(err)
executor.reporter.ReportStepEnd(meta.ExecutionId, metadata.Step, cacao.NewVariables(), err)
return cacao.NewVariables(), err
}
returnVariables := cacao.NewVariables()
Expand Down Expand Up @@ -76,14 +79,14 @@ func (executor *Executor) Execute(meta execution.Metadata, metadata PlaybookStep

if err != nil {
log.Error("Error executing Command ", err)
executor.reporter.ReportStep(meta.ExecutionId, metadata.Step, returnVariables, err)
executor.reporter.ReportStepEnd(meta.ExecutionId, metadata.Step, returnVariables, err)
return cacao.NewVariables(), err
} else {
log.Debug("Command executed")
}
}
}
executor.reporter.ReportStep(meta.ExecutionId, metadata.Step, returnVariables, nil)
executor.reporter.ReportStepEnd(meta.ExecutionId, metadata.Step, returnVariables, nil)
return returnVariables, nil
}

Expand Down
7 changes: 5 additions & 2 deletions internal/executors/playbook_action/playbook_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ func (playbookAction *PlaybookAction) Execute(metadata execution.Metadata,
variables cacao.Variables) (cacao.Variables, error) {
log.Trace(metadata.ExecutionId)

playbookAction.reporter.ReportStepStart(metadata.ExecutionId, step, variables)

if step.Type != cacao.StepTypePlaybookAction {
err := errors.New(fmt.Sprint("step type is not of type ", cacao.StepTypePlaybookAction))
log.Error(err)
playbookAction.reporter.ReportStepEnd(metadata.ExecutionId, step, cacao.NewVariables(), nil)
return cacao.NewVariables(), err
}

Expand All @@ -56,10 +59,10 @@ func (playbookAction *PlaybookAction) Execute(metadata execution.Metadata,
if err != nil {
err = errors.New(fmt.Sprint("execution of playbook failed with error: ", err))
log.Error(err)
playbookAction.reporter.ReportStep(metadata.ExecutionId, step, playbook.PlaybookVariables, err)
playbookAction.reporter.ReportStepEnd(metadata.ExecutionId, step, playbook.PlaybookVariables, err)
return cacao.NewVariables(), err
}
playbookAction.reporter.ReportStep(metadata.ExecutionId, step, playbook.PlaybookVariables, nil)
playbookAction.reporter.ReportStepEnd(metadata.ExecutionId, step, playbook.PlaybookVariables, nil)
return details.Variables, nil

}
203 changes: 203 additions & 0 deletions internal/reporter/downstream_reporter/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package cache

import (
"errors"
"fmt"
"reflect"
"soarca/logger"
"soarca/models/cacao"
"soarca/models/report"
"soarca/utils"
itime "soarca/utils/time"
"strconv"
"time"

"github.com/google/uuid"
)

var component = reflect.TypeOf(Cache{}).PkgPath()
var log *logger.Log

func init() {
log = logger.Logger(component, logger.Info, "", logger.Json)
}

const MaxExecutions int = 10
const MaxSteps int = 10

type Cache struct {
Size int
timeUtil itime.ITime
Cache map[string]report.ExecutionEntry // Cached up to max
fifoRegister []string // Used for O(1) FIFO cache management
}

func New(timeUtil itime.ITime) *Cache {
maxExecutions, _ := strconv.Atoi(utils.GetEnv("MAX_EXECUTIONS", strconv.Itoa(MaxExecutions)))
return &Cache{
Size: maxExecutions,
Cache: make(map[string]report.ExecutionEntry),
timeUtil: timeUtil,
}
}

func (cacheReporter *Cache) getExecution(executionKey uuid.UUID) (report.ExecutionEntry, error) {
executionKeyStr := executionKey.String()
executionEntry, ok := cacheReporter.Cache[executionKeyStr]
if !ok {
err := errors.New("execution is not in cache")
log.Warning("execution is not in cache. consider increasing cache size.")
return report.ExecutionEntry{}, err
// TODO Retrieve from database
}
return executionEntry, nil
}
func (cacheReporter *Cache) getExecutionStep(executionKey uuid.UUID, stepKey string) (report.StepResult, error) {
executionEntry, err := cacheReporter.getExecution(executionKey)
if err != nil {
return report.StepResult{}, err
}
executionStep, ok := executionEntry.StepResults[stepKey]
if !ok {
err := errors.New("execution step is not in cache")
return report.StepResult{}, err
}
return executionStep, nil
}

// Adding executions in FIFO logic
func (cacheReporter *Cache) addExecution(newExecutionEntry report.ExecutionEntry) error {

if !(len(cacheReporter.fifoRegister) == len(cacheReporter.Cache)) {
return errors.New("cache fifo register and content are desynchronized")
}

newExecutionEntryKey := newExecutionEntry.ExecutionId.String()

if len(cacheReporter.fifoRegister) >= cacheReporter.Size {
firstExecution := cacheReporter.fifoRegister[0]
cacheReporter.fifoRegister = cacheReporter.fifoRegister[1:]
delete(cacheReporter.Cache, firstExecution)

cacheReporter.fifoRegister = append(cacheReporter.fifoRegister, newExecutionEntryKey)
cacheReporter.Cache[newExecutionEntryKey] = newExecutionEntry
return nil
}

cacheReporter.fifoRegister = append(cacheReporter.fifoRegister, newExecutionEntryKey)
cacheReporter.Cache[newExecutionEntryKey] = newExecutionEntry
return nil
}

func (cacheReporter *Cache) ReportWorkflowStart(executionId uuid.UUID, playbook cacao.Playbook) error {
newExecutionEntry := report.ExecutionEntry{
ExecutionId: executionId,
PlaybookId: playbook.ID,
Started: cacheReporter.timeUtil.Now(),
Ended: time.Time{},
StepResults: map[string]report.StepResult{},
Status: report.Ongoing,
}
err := cacheReporter.addExecution(newExecutionEntry)
if err != nil {
return err
}
return nil
}

func (cacheReporter *Cache) ReportWorkflowEnd(executionId uuid.UUID, playbook cacao.Playbook, workflowError error) error {

executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
return err
}

if workflowError != nil {
executionEntry.PlaybookResult = workflowError
executionEntry.Status = report.Failed
} else {
executionEntry.Status = report.SuccessfullyExecuted
}
executionEntry.Ended = cacheReporter.timeUtil.Now()
cacheReporter.Cache[executionId.String()] = executionEntry

return nil
}

func (cacheReporter *Cache) ReportStepStart(executionId uuid.UUID, step cacao.Step, variables cacao.Variables) error {
executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
return err
}

if executionEntry.Status != report.Ongoing {
return errors.New("trying to report on the execution of a step for an already reported completed or failed execution")
}

fmt.Println(executionEntry)

_, alreadyThere := executionEntry.StepResults[step.ID]
if alreadyThere {
log.Warning("a step execution was already reported for this step. overwriting.")
}

newStepEntry := report.StepResult{
ExecutionId: executionId,
StepId: step.ID,
Started: cacheReporter.timeUtil.Now(),
Ended: time.Time{},
Variables: variables,
Status: report.Ongoing,
Error: nil,
}
executionEntry.StepResults[step.ID] = newStepEntry
return nil
}

func (cacheReporter *Cache) ReportStepEnd(executionId uuid.UUID, step cacao.Step, returnVars cacao.Variables, stepError error) error {
executionEntry, err := cacheReporter.getExecution(executionId)
if err != nil {
return err
}

if executionEntry.Status != report.Ongoing {
return errors.New("trying to report on the execution of a step for an already reported completed or failed execution")
}

executionStepResult, err := cacheReporter.getExecutionStep(executionId, step.ID)
if err != nil {
return err
}

if executionStepResult.Status != report.Ongoing {
return errors.New("trying to report on the execution of a step that was already reported completed or failed")
}

if stepError != nil {
executionStepResult.Error = stepError
executionStepResult.Status = report.ServerSideError
} else {
executionStepResult.Status = report.SuccessfullyExecuted
}
executionStepResult.Ended = cacheReporter.timeUtil.Now()
executionStepResult.Variables = returnVars
executionEntry.StepResults[step.ID] = executionStepResult

return nil
}

func (cacheReporter *Cache) GetExecutionsIDs() []string {
executions := make([]string, len(cacheReporter.fifoRegister))
_ = copy(executions, cacheReporter.fifoRegister)
return executions
}

func (cacheReporter *Cache) GetExecutionReport(executionKey uuid.UUID) (report.ExecutionEntry, error) {
executionEntry, err := cacheReporter.getExecution(executionKey)
if err != nil {
return report.ExecutionEntry{}, err
}
report := executionEntry

return report, nil
}
7 changes: 5 additions & 2 deletions internal/reporter/downstream_reporter/downstream_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
)

type IDownStreamReporter interface {
ReportWorkflow(executionId uuid.UUID, playbook cacao.Playbook) error
ReportStep(executionId uuid.UUID, step cacao.Step, stepResults cacao.Variables, err error) error
ReportWorkflowStart(executionId uuid.UUID, playbook cacao.Playbook) error
ReportWorkflowEnd(executionId uuid.UUID, playbook cacao.Playbook, err error) error

ReportStepStart(executionId uuid.UUID, step cacao.Step, stepResults cacao.Variables) error
ReportStepEnd(executionId uuid.UUID, step cacao.Step, stepResults cacao.Variables, err error) error
}
Loading

0 comments on commit d828dd0

Please sign in to comment.