Skip to content

Commit

Permalink
feat: Add base64 Processor (#4)
Browse files Browse the repository at this point in the history
* feat: add base64 processor

* chore: reformatted b64 processor

* feat: added b64 processor to factories

* docs: added b64 processor

* chore: updated README

* docs: fixed README example
  • Loading branch information
jshlbrd committed Apr 26, 2022
1 parent cdd2999 commit cc76318
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 3 deletions.
24 changes: 22 additions & 2 deletions process/README.md
Expand Up @@ -43,8 +43,9 @@ In Substation applications, processors adhere to these rules:
- operate on JSON data

## processors
| Processor | Description |
| --- | --- |
| Processor | Description |
| --- | --- |
| [Base64](#base64) | Encodes and decodes bytes to and from base64 |
| [Capture](#capture) | Applies a capturing regular expression |
| [Case](#case) | Modifies the case of a string |
| [Concat](#concat) | Modifies the case of a string |
Expand All @@ -66,6 +67,25 @@ In Substation applications, processors adhere to these rules:
| [Time](#time) | Converts time values between formats |
| [Zip](#zip) | Concatenates arrays into tuples or JSON objects |

### base64
Processes data by encoding it to or decoding it from base64. This processor should be used for converting entire JSON objects. The processor supports these base64 alphabets:
- std: https://www.rfc-editor.org/rfc/rfc4648.html#section-4
- url: https://www.rfc-editor.org/rfc/rfc4648.html#section-5

The processor uses this Jsonnet configuration:
```
{
// if the input is `eyJoZWxsbyI6IndvcmxkIn0=`, then the output is `{"hello":"world"}`
type: 'base64',
settings: {
options: {
direction: 'from',
alphabet: 'std', // defaults to std
}
},
}
```

### capture
Processes data by applying a capturing regular expression. This processor is array-aware and can output one or many values that are automatically stored as values or arrays of elements.

Expand Down
134 changes: 134 additions & 0 deletions process/base64.go
@@ -0,0 +1,134 @@
package process

import (
"context"
"encoding/base64"

"github.com/brexhq/substation/condition"
"github.com/brexhq/substation/internal/errors"
)

// Base64InvalidDirection is used when an invalid direction setting is given to the processor
const Base64InvalidDirection = errors.Error("Base64InvalidDirection")

// Base64InvalidAlphabet is used when an invalid alphabet setting is given to the processor
const Base64InvalidAlphabet = errors.Error("Base64InvalidAlphabet")

/*
Base64Options contain custom options settings for this processor.
Direction: the direction of the encoding, either to (encode) or from (decode) base64.
Alphabet (optional): the base64 alphabet to use, either std (https://www.rfc-editor.org/rfc/rfc4648.html#section-4) or url (https://www.rfc-editor.org/rfc/rfc4648.html#section-5); defaults to std.
*/
type Base64Options struct {
Direction string `mapstructure:"direction"`
Alphabet string `mapstructure:"alphabet"`
}

// Base64 implements the Byter and Channeler interfaces and converts bytes to and from Base64. More information is available in the README.
type Base64 struct {
Condition condition.OperatorConfig `mapstructure:"condition"`
Options Base64Options `mapstructure:"options"`
}

// Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.
func (p Base64) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error) {
var array [][]byte

op, err := condition.OperatorFactory(p.Condition)
if err != nil {
return nil, err
}

for data := range ch {
ok, err := op.Operate(data)
if err != nil {
return nil, err
}

if !ok {
array = append(array, data)
continue
}

processed, err := p.Byte(ctx, data)
if err != nil {
return nil, err
}
array = append(array, processed)
}

output := make(chan []byte, len(array))
for _, x := range array {
output <- x
}
close(output)
return output, nil
}

// Byte processes a byte slice with this processor.
func (p Base64) Byte(ctx context.Context, data []byte) ([]byte, error) {
if p.Options.Alphabet == "" {
p.Options.Alphabet = "std"
}

if p.Options.Direction == "from" {
res, err := fromBase64(data, p.Options.Alphabet)
if err != nil {
return nil, err
}

return res, nil
} else if p.Options.Direction == "to" {
res, err := toBase64(data, p.Options.Alphabet)
if err != nil {
return nil, err
}

return res, nil
} else {
return nil, Base64InvalidDirection
}
}

func fromBase64(data []byte, alphabet string) ([]byte, error) {
len := len(string(data))

switch s := alphabet; s {
case "std":
res := make([]byte, base64.StdEncoding.DecodedLen(len))
n, err := base64.StdEncoding.Decode(res, data)
if err != nil {
return nil, err
}

return res[:n], nil
case "url":
res := make([]byte, base64.URLEncoding.DecodedLen(len))
n, err := base64.URLEncoding.Decode(res, data)
if err != nil {
return nil, err
}

return res[:n], nil
default:
return nil, Base64InvalidAlphabet
}
}

func toBase64(data []byte, alphabet string) ([]byte, error) {
len := len(data)

switch s := alphabet; s {
case "std":
res := make([]byte, base64.StdEncoding.EncodedLen(len))
base64.StdEncoding.Encode(res, data)
return res, nil
case "url":
res := make([]byte, base64.URLEncoding.EncodedLen(len))
base64.URLEncoding.Encode(res, data)
return res, nil
default:
return nil, Base64InvalidAlphabet
}
}
74 changes: 74 additions & 0 deletions process/base64_test.go
@@ -0,0 +1,74 @@
package process

import (
"bytes"
"context"
"testing"
)

var base64Tests = []struct {
proc Base64
test []byte
expected []byte
}{
// decode std base64
{
Base64{
Options: Base64Options{
Direction: "from",
Alphabet: "std",
},
},
[]byte(`YWJjMTIzIT8kKiYoKSctPUB+`),
[]byte(`abc123!?$*&()'-=@~`),
},
// decode url base64
{
Base64{
Options: Base64Options{
Direction: "from",
Alphabet: "url",
},
},
[]byte(`YWJjMTIzIT8kKiYoKSctPUB-`),
[]byte(`abc123!?$*&()'-=@~`),
},
// encode std base64
{
Base64{
Options: Base64Options{
Direction: "to",
Alphabet: "std",
},
},
[]byte(`abc123!?$*&()'-=@~`),
[]byte(`YWJjMTIzIT8kKiYoKSctPUB+`),
},
// encode url base64
{
Base64{
Options: Base64Options{
Direction: "to",
Alphabet: "url",
},
},
[]byte(`abc123!?$*&()'-=@~`),
[]byte(`YWJjMTIzIT8kKiYoKSctPUB-`),
},
}

func TestBase64(t *testing.T) {
ctx := context.TODO()
for _, test := range base64Tests {
res, err := test.proc.Byte(ctx, test.test)
if err != nil {
t.Logf("%v", err)
t.Fail()
}

if c := bytes.Compare(res, test.expected); c != 0 {
t.Logf("expected %s, got %s", test.expected, res)
t.Fail()
}
}
}
8 changes: 8 additions & 0 deletions process/process.go
Expand Up @@ -82,6 +82,10 @@ func ReadOnlyChannel(ch chan []byte) <-chan []byte {
// ByterFactory loads a Byter from a Config. This is the recommended function for retrieving ready-to-use Byters.
func ByterFactory(cfg Config) (Byter, error) {
switch t := cfg.Type; t {
case "base64":
var p Base64
mapstructure.Decode(cfg.Settings, &p)
return p, nil
case "capture":
var p Capture
mapstructure.Decode(cfg.Settings, &p)
Expand Down Expand Up @@ -158,6 +162,10 @@ func ByterFactory(cfg Config) (Byter, error) {
// ChannelerFactory loads Channeler from a Config. This is the recommended function for retrieving ready-to-use Channelers.
func ChannelerFactory(cfg Config) (Channeler, error) {
switch t := cfg.Type; t {
case "base64":
var p Base64
mapstructure.Decode(cfg.Settings, &p)
return p, nil
case "capture":
var p Capture
mapstructure.Decode(cfg.Settings, &p)
Expand Down
25 changes: 24 additions & 1 deletion process/process_test.go
Expand Up @@ -58,9 +58,32 @@ var processTests = []struct {
[]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 170, 86, 202, 72, 205, 201, 201, 87, 178, 82, 74, 207, 207, 79, 73, 170, 76, 85, 170, 5, 4, 0, 0, 255, 255, 214, 182, 196, 150, 19, 0, 0, 0},
[]byte(`{"hello":"goodbye"}`),
},
{
[]Config{
{
Type: "base64",
Settings: map[string]interface{}{
"condition": struct {
Operator string
}{
Operator: "all",
},
"options": struct {
Direction string
Alphabet string
}{
Direction: "from",
Alphabet: "std",
},
},
},
},
[]byte(`eyJoZWxsbyI6IndvcmxkIn0=`),
[]byte(`{"hello":"world"}`),
},
}

func TestByteorAll(t *testing.T) {
func TestByterAll(t *testing.T) {
ctx := context.TODO()

for _, test := range processTests {
Expand Down

0 comments on commit cc76318

Please sign in to comment.