Skip to content

Commit

Permalink
Fix jet default configs (#731)
Browse files Browse the repository at this point in the history
* convert the config type fields which have no default value to pointer

* update test configs
  • Loading branch information
semihbkgr committed May 29, 2023
1 parent 33b50ed commit b96a73e
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 26 deletions.
13 changes: 8 additions & 5 deletions api/v1alpha1/hazelcast_types.go
Expand Up @@ -229,8 +229,10 @@ type JetEngineConfiguration struct {

type JetInstance struct {
// The number of threads Jet creates in its cooperative multithreading pool.
// Its default value is the number of cores
// +kubebuilder:validation:Minimum:=1
// +optional
CooperativeThreadCount int32 `json:"cooperativeThreadCount,omitempty"`
CooperativeThreadCount *int32 `json:"cooperativeThreadCount,omitempty"`

// The duration of the interval between flow-control packets.
// +kubebuilder:default:=100
Expand All @@ -254,8 +256,9 @@ type JetInstance struct {
LosslessRestartEnabled bool `json:"losslessRestartEnabled"`

// Specifies the maximum number of records that can be accumulated by any single processor instance.
// Default value is Long.MAX_VALUE
// +optional
MaxProcessorAccumulatedRecords int64 `json:"maxProcessorAccumulatedRecords,omitempty"`
MaxProcessorAccumulatedRecords *int64 `json:"maxProcessorAccumulatedRecords,omitempty"`
}

// Returns true if Jet Instance section is configured.
Expand All @@ -266,15 +269,15 @@ func (j *JetInstance) IsConfigured() bool {
type JetEdgeDefaults struct {
// Sets the capacity of processor-to-processor concurrent queues.
// +optional
QueueSize int32 `json:"queueSize,omitempty"`
QueueSize *int32 `json:"queueSize,omitempty"`

// Limits the size of the packet in bytes.
// +optional
PacketSizeLimit int32 `json:"packetSizeLimit,omitempty"`
PacketSizeLimit *int32 `json:"packetSizeLimit,omitempty"`

// Sets the scaling factor used by the adaptive receive window sizing function.
// +optional
ReceiveWindowMultiplier int8 `json:"receiveWindowMultiplier,omitempty"`
ReceiveWindowMultiplier *int8 `json:"receiveWindowMultiplier,omitempty"`
}

// Returns true if Jet Instance Edge Defaults is configured.
Expand Down
29 changes: 27 additions & 2 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions config/crd/bases/hazelcast.com_hazelcasts.yaml
Expand Up @@ -283,8 +283,10 @@ spec:
type: integer
cooperativeThreadCount:
description: The number of threads Jet creates in its cooperative
multithreading pool.
multithreading pool. Its default value is the number of
cores
format: int32
minimum: 1
type: integer
flowControlPeriodMillis:
default: 100
Expand All @@ -299,7 +301,8 @@ spec:
type: boolean
maxProcessorAccumulatedRecords:
description: Specifies the maximum number of records that
can be accumulated by any single processor instance.
can be accumulated by any single processor instance. Default
value is Long.MAX_VALUE
format: int64
type: integer
scaleUpDelayMillis:
Expand Down
12 changes: 6 additions & 6 deletions controllers/hazelcast/hazelcast_resources.go
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/go-logr/logr"
proto "github.com/hazelcast/hazelcast-go-client"
keystore "github.com/pavlo-v-chernykh/keystore-go/v4"
"github.com/pavlo-v-chernykh/keystore-go/v4"
"golang.org/x/mod/semver"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -727,21 +727,21 @@ func hazelcastBasicConfig(h *hazelcastv1alpha1.Hazelcast) config.Hazelcast {
if h.Spec.JetEngineConfiguration.Instance.IsConfigured() {
i := h.Spec.JetEngineConfiguration.Instance
cfg.Jet.Instance = config.JetInstance{
CooperativeThreadCount: &i.CooperativeThreadCount,
CooperativeThreadCount: i.CooperativeThreadCount,
FlowControlPeriodMillis: &i.FlowControlPeriodMillis,
BackupCount: &i.BackupCount,
ScaleUpDelayMillis: &i.ScaleUpDelayMillis,
LosslessRestartEnabled: &i.LosslessRestartEnabled,
MaxProcessorAccumulatedRecords: &i.MaxProcessorAccumulatedRecords,
MaxProcessorAccumulatedRecords: i.MaxProcessorAccumulatedRecords,
}
}

if h.Spec.JetEngineConfiguration.EdgeDefaults.IsConfigured() {
e := h.Spec.JetEngineConfiguration.EdgeDefaults
cfg.Jet.EdgeDefaults = config.EdgeDefaults{
QueueSize: &e.QueueSize,
PacketSizeLimit: &e.PacketSizeLimit,
ReceiveWindowMultiplier: &e.ReceiveWindowMultiplier,
QueueSize: e.QueueSize,
PacketSizeLimit: e.PacketSizeLimit,
ReceiveWindowMultiplier: e.ReceiveWindowMultiplier,
}
}
}
Expand Down
Expand Up @@ -539,8 +539,10 @@ spec:
type: integer
cooperativeThreadCount:
description: The number of threads Jet creates in its cooperative
multithreading pool.
multithreading pool. Its default value is the number of
cores
format: int32
minimum: 1
type: integer
flowControlPeriodMillis:
default: 100
Expand All @@ -555,7 +557,8 @@ spec:
type: boolean
maxProcessorAccumulatedRecords:
description: Specifies the maximum number of records that
can be accumulated by any single processor instance.
can be accumulated by any single processor instance. Default
value is Long.MAX_VALUE
format: int64
type: integer
scaleUpDelayMillis:
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/config/hazelcast/config.go
Expand Up @@ -296,8 +296,8 @@ var (
},
Instance: &hazelcastv1alpha1.JetInstance{
LosslessRestartEnabled: true,
CooperativeThreadCount: 1,
MaxProcessorAccumulatedRecords: 1000000000,
CooperativeThreadCount: pointer.Int32(1),
MaxProcessorAccumulatedRecords: pointer.Int64(1000000000),
},
},
Persistence: &hazelcastv1alpha1.HazelcastPersistenceConfiguration{
Expand Down Expand Up @@ -329,8 +329,8 @@ var (
ResourceUploadEnabled: true,
Instance: &hazelcastv1alpha1.JetInstance{
LosslessRestartEnabled: true,
CooperativeThreadCount: 1,
MaxProcessorAccumulatedRecords: 1000000000,
CooperativeThreadCount: pointer.Int32(1),
MaxProcessorAccumulatedRecords: pointer.Int64(1000000000),
},
},
Persistence: &hazelcastv1alpha1.HazelcastPersistenceConfiguration{
Expand Down
10 changes: 5 additions & 5 deletions test/integration/hazelcast_test.go
Expand Up @@ -1833,17 +1833,17 @@ var _ = Describe("Hazelcast CR", func() {
Enabled: ptr.Bool(true),
ResourceUploadEnabled: false,
Instance: &hazelcastv1alpha1.JetInstance{
CooperativeThreadCount: 1,
CooperativeThreadCount: ptr.Int32(1),
FlowControlPeriodMillis: 1,
BackupCount: 1,
ScaleUpDelayMillis: 1,
LosslessRestartEnabled: false,
MaxProcessorAccumulatedRecords: 1,
MaxProcessorAccumulatedRecords: ptr.Int64(1),
},
EdgeDefaults: &hazelcastv1alpha1.JetEdgeDefaults{
QueueSize: 1,
PacketSizeLimit: 1,
ReceiveWindowMultiplier: 1,
QueueSize: ptr.Int32(1),
PacketSizeLimit: ptr.Int32(1),
ReceiveWindowMultiplier: ptr.Int8(1),
},
}
hz := &hazelcastv1alpha1.Hazelcast{
Expand Down

0 comments on commit b96a73e

Please sign in to comment.