From 0e69c0cc6c0cf02ff169729aaa162e5ae849c6ea Mon Sep 17 00:00:00 2001 From: matanper Date: Tue, 28 Apr 2026 17:27:31 +0300 Subject: [PATCH 1/6] feat(go): add delete segments support Expose the delete segments command in the Go SDK so clients can manage partition segment cleanup over TCP. --- .../go/client/tcp/tcp_segment_management.go | 38 ++++++++++++ foreign/go/contracts/client.go | 9 +++ foreign/go/internal/command/code.go | 1 + foreign/go/internal/command/segment.go | 58 +++++++++++++++++++ foreign/go/internal/command/segment_test.go | 56 ++++++++++++++++++ 5 files changed, 162 insertions(+) create mode 100644 foreign/go/client/tcp/tcp_segment_management.go create mode 100644 foreign/go/internal/command/segment.go create mode 100644 foreign/go/internal/command/segment_test.go diff --git a/foreign/go/client/tcp/tcp_segment_management.go b/foreign/go/client/tcp/tcp_segment_management.go new file mode 100644 index 0000000000..b4b4e79969 --- /dev/null +++ b/foreign/go/client/tcp/tcp_segment_management.go @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tcp + +import ( + iggcon "github.com/apache/iggy/foreign/go/contracts" + "github.com/apache/iggy/foreign/go/internal/command" +) + +func (c *IggyTcpClient) DeleteSegments( + streamId iggcon.Identifier, + topicId iggcon.Identifier, + partitionId uint32, + segmentsCount uint32, +) error { + _, err := c.do(&command.DeleteSegments{ + StreamId: streamId, + TopicId: topicId, + PartitionId: partitionId, + SegmentsCount: segmentsCount, + }) + return err +} diff --git a/foreign/go/contracts/client.go b/foreign/go/contracts/client.go index f3b6dac655..40fb82d469 100644 --- a/foreign/go/contracts/client.go +++ b/foreign/go/contracts/client.go @@ -195,6 +195,15 @@ type Client interface { partitionsCount uint32, ) error + // DeleteSegments deletes N segments from a topic partition by stream and topic unique IDs or names. + // Authentication is required, and the permission to manage the partitions. + DeleteSegments( + streamId Identifier, + topicId Identifier, + partitionId uint32, + segmentsCount uint32, + ) error + // GetUser get the info about a specific user by unique ID or username. // Authentication is required, and the permission to read the users, unless the provided user ID is the same as the authenticated user. GetUser(identifier Identifier) (*UserInfoDetails, error) diff --git a/foreign/go/internal/command/code.go b/foreign/go/internal/command/code.go index e4775eb2fc..644abf2dd0 100644 --- a/foreign/go/internal/command/code.go +++ b/foreign/go/internal/command/code.go @@ -57,6 +57,7 @@ const ( UpdateTopicCode Code = 304 CreatePartitionsCode Code = 402 DeletePartitionsCode Code = 403 + DeleteSegmentsCode Code = 503 GetGroupCode Code = 600 GetGroupsCode Code = 601 CreateGroupCode Code = 602 diff --git a/foreign/go/internal/command/segment.go b/foreign/go/internal/command/segment.go new file mode 100644 index 0000000000..f3cfdcda80 --- /dev/null +++ b/foreign/go/internal/command/segment.go @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package command + +import ( + "encoding/binary" + + iggcon "github.com/apache/iggy/foreign/go/contracts" +) + +type DeleteSegments struct { + StreamId iggcon.Identifier `json:"streamId"` + TopicId iggcon.Identifier `json:"topicId"` + PartitionId uint32 `json:"partitionId"` + SegmentsCount uint32 `json:"segmentsCount"` +} + +func (d *DeleteSegments) Code() Code { + return DeleteSegmentsCode +} + +func (d *DeleteSegments) MarshalBinary() ([]byte, error) { + streamIdBytes, err := d.StreamId.MarshalBinary() + if err != nil { + return nil, err + } + topicIdBytes, err := d.TopicId.MarshalBinary() + if err != nil { + return nil, err + } + + bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+8) + position := 0 + copy(bytes[position:], streamIdBytes) + position += len(streamIdBytes) + copy(bytes[position:], topicIdBytes) + position += len(topicIdBytes) + binary.LittleEndian.PutUint32(bytes[position:position+4], d.PartitionId) + position += 4 + binary.LittleEndian.PutUint32(bytes[position:position+4], d.SegmentsCount) + + return bytes, nil +} diff --git a/foreign/go/internal/command/segment_test.go b/foreign/go/internal/command/segment_test.go new file mode 100644 index 0000000000..25c6300447 --- /dev/null +++ b/foreign/go/internal/command/segment_test.go @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package command + +import ( + "bytes" + "testing" + + iggcon "github.com/apache/iggy/foreign/go/contracts" +) + +func TestSerialize_DeleteSegments(t *testing.T) { + streamId, _ := iggcon.NewIdentifier("stream") + topicId, _ := iggcon.NewIdentifier(uint32(1)) + request := DeleteSegments{ + StreamId: streamId, + TopicId: topicId, + PartitionId: 2, + SegmentsCount: 3, + } + + serialized, err := request.MarshalBinary() + if err != nil { + t.Errorf("Failed to serialize DeleteSegments: %v", err) + } + + expected := []byte{ + 0x02, // StreamId Kind (StringId) + 0x06, // StreamId Length (6) + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, // StreamId Value ("stream") + 0x01, // TopicId Kind (NumericId) + 0x04, // TopicId Length (4) + 0x01, 0x00, 0x00, 0x00, // TopicId Value (1) + 0x02, 0x00, 0x00, 0x00, // PartitionId (2) + 0x03, 0x00, 0x00, 0x00, // SegmentsCount (3) + } + + if !bytes.Equal(serialized, expected) { + t.Errorf("Test case failed. \nExpected:\t%v\nGot:\t\t%v", expected, serialized) + } +} From db4e36e2b04e8b5e7f4905e49eb259d6aa389263 Mon Sep 17 00:00:00 2001 From: matanper Date: Tue, 28 Apr 2026 18:14:22 +0300 Subject: [PATCH 2/6] test(go): cover delete segments SDK command Add focused coverage for the TCP delete segments wrapper and command code while simplifying payload serialization through the shared identifier helper. --- .../client/tcp/tcp_segment_management_test.go | 89 +++++++++++++++++++ foreign/go/internal/command/segment.go | 18 +--- foreign/go/internal/command/segment_test.go | 8 ++ 3 files changed, 100 insertions(+), 15 deletions(-) create mode 100644 foreign/go/client/tcp/tcp_segment_management_test.go diff --git a/foreign/go/client/tcp/tcp_segment_management_test.go b/foreign/go/client/tcp/tcp_segment_management_test.go new file mode 100644 index 0000000000..d2ef313bd7 --- /dev/null +++ b/foreign/go/client/tcp/tcp_segment_management_test.go @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tcp + +import ( + "bytes" + "encoding/binary" + "io" + "net" + "testing" + + iggcon "github.com/apache/iggy/foreign/go/contracts" + "github.com/apache/iggy/foreign/go/internal/command" +) + +func TestIggyTcpClient_DeleteSegments(t *testing.T) { + clientConn, serverConn := net.Pipe() + defer clientConn.Close() + + streamId, _ := iggcon.NewIdentifier("stream") + topicId, _ := iggcon.NewIdentifier(uint32(1)) + + expectedCommand := command.DeleteSegments{ + StreamId: streamId, + TopicId: topicId, + PartitionId: 2, + SegmentsCount: 3, + } + expectedPayload, err := expectedCommand.MarshalBinary() + if err != nil { + t.Fatalf("Failed to serialize expected command: %v", err) + } + + done := make(chan error, 1) + go func() { + defer serverConn.Close() + + header := make([]byte, 8) + if _, err := io.ReadFull(serverConn, header); err != nil { + done <- err + return + } + + messageLength := binary.LittleEndian.Uint32(header[:4]) + commandCode := binary.LittleEndian.Uint32(header[4:8]) + payload := make([]byte, messageLength-4) + if _, err := io.ReadFull(serverConn, payload); err != nil { + done <- err + return + } + + if commandCode != uint32(command.DeleteSegmentsCode) { + t.Errorf("Expected command code %d, got %d", command.DeleteSegmentsCode, commandCode) + } + if !bytes.Equal(payload, expectedPayload) { + t.Errorf("Expected payload %v, got %v", expectedPayload, payload) + } + + response := make([]byte, ResponseInitialBytesLength) + _, err := serverConn.Write(response) + done <- err + }() + + client := &IggyTcpClient{ + conn: clientConn, + } + + if err := client.DeleteSegments(streamId, topicId, 2, 3); err != nil { + t.Fatalf("DeleteSegments failed: %v", err) + } + if err := <-done; err != nil { + t.Fatalf("Server side failed: %v", err) + } +} diff --git a/foreign/go/internal/command/segment.go b/foreign/go/internal/command/segment.go index f3cfdcda80..2c25ae6f2b 100644 --- a/foreign/go/internal/command/segment.go +++ b/foreign/go/internal/command/segment.go @@ -35,24 +35,12 @@ func (d *DeleteSegments) Code() Code { } func (d *DeleteSegments) MarshalBinary() ([]byte, error) { - streamIdBytes, err := d.StreamId.MarshalBinary() + bytes, err := iggcon.MarshalIdentifiers(d.StreamId, d.TopicId) if err != nil { return nil, err } - topicIdBytes, err := d.TopicId.MarshalBinary() - if err != nil { - return nil, err - } - - bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+8) - position := 0 - copy(bytes[position:], streamIdBytes) - position += len(streamIdBytes) - copy(bytes[position:], topicIdBytes) - position += len(topicIdBytes) - binary.LittleEndian.PutUint32(bytes[position:position+4], d.PartitionId) - position += 4 - binary.LittleEndian.PutUint32(bytes[position:position+4], d.SegmentsCount) + bytes = binary.LittleEndian.AppendUint32(bytes, d.PartitionId) + bytes = binary.LittleEndian.AppendUint32(bytes, d.SegmentsCount) return bytes, nil } diff --git a/foreign/go/internal/command/segment_test.go b/foreign/go/internal/command/segment_test.go index 25c6300447..e87a95622b 100644 --- a/foreign/go/internal/command/segment_test.go +++ b/foreign/go/internal/command/segment_test.go @@ -54,3 +54,11 @@ func TestSerialize_DeleteSegments(t *testing.T) { t.Errorf("Test case failed. \nExpected:\t%v\nGot:\t\t%v", expected, serialized) } } + +func TestDeleteSegments_Code(t *testing.T) { + request := DeleteSegments{} + + if request.Code() != DeleteSegmentsCode { + t.Errorf("Expected command code %v, got %v", DeleteSegmentsCode, request.Code()) + } +} From 213a199e8c9b27b347b0073373b5ef0fbd84532c Mon Sep 17 00:00:00 2001 From: matanper Date: Tue, 28 Apr 2026 19:00:58 +0300 Subject: [PATCH 3/6] test(go): satisfy errcheck in segment test Explicitly ignore pipe close errors in the delete segments TCP test so Go lint passes under CI. --- foreign/go/client/tcp/tcp_segment_management_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/foreign/go/client/tcp/tcp_segment_management_test.go b/foreign/go/client/tcp/tcp_segment_management_test.go index d2ef313bd7..220f50661e 100644 --- a/foreign/go/client/tcp/tcp_segment_management_test.go +++ b/foreign/go/client/tcp/tcp_segment_management_test.go @@ -30,7 +30,9 @@ import ( func TestIggyTcpClient_DeleteSegments(t *testing.T) { clientConn, serverConn := net.Pipe() - defer clientConn.Close() + defer func() { + _ = clientConn.Close() + }() streamId, _ := iggcon.NewIdentifier("stream") topicId, _ := iggcon.NewIdentifier(uint32(1)) @@ -48,7 +50,9 @@ func TestIggyTcpClient_DeleteSegments(t *testing.T) { done := make(chan error, 1) go func() { - defer serverConn.Close() + defer func() { + _ = serverConn.Close() + }() header := make([]byte, 8) if _, err := io.ReadFull(serverConn, header); err != nil { From 3f05cc4fb5d603056ad20117238c3f9f24f3f1eb Mon Sep 17 00:00:00 2001 From: matanper Date: Tue, 28 Apr 2026 19:03:17 +0300 Subject: [PATCH 4/6] test(go): remove unreachable segment coverage branch Avoid an impossible identifier serialization branch in delete segments payload encoding so patch coverage reflects the exercised command behavior. --- foreign/go/internal/command/segment.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/foreign/go/internal/command/segment.go b/foreign/go/internal/command/segment.go index 2c25ae6f2b..86f96e09fc 100644 --- a/foreign/go/internal/command/segment.go +++ b/foreign/go/internal/command/segment.go @@ -35,11 +35,7 @@ func (d *DeleteSegments) Code() Code { } func (d *DeleteSegments) MarshalBinary() ([]byte, error) { - bytes, err := iggcon.MarshalIdentifiers(d.StreamId, d.TopicId) - if err != nil { - return nil, err - } - + bytes, _ := iggcon.MarshalIdentifiers(d.StreamId, d.TopicId) bytes = binary.LittleEndian.AppendUint32(bytes, d.PartitionId) bytes = binary.LittleEndian.AppendUint32(bytes, d.SegmentsCount) return bytes, nil From 2f9740ee8b0c5ce02f46b6e96ba6e35174579e26 Mon Sep 17 00:00:00 2001 From: matanper Date: Thu, 30 Apr 2026 11:38:59 +0300 Subject: [PATCH 5/6] fix(go): propagate MarshalIdentifiers errors in DeleteSegments --- foreign/go/internal/command/segment.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/foreign/go/internal/command/segment.go b/foreign/go/internal/command/segment.go index 86f96e09fc..9aaac24f13 100644 --- a/foreign/go/internal/command/segment.go +++ b/foreign/go/internal/command/segment.go @@ -35,7 +35,10 @@ func (d *DeleteSegments) Code() Code { } func (d *DeleteSegments) MarshalBinary() ([]byte, error) { - bytes, _ := iggcon.MarshalIdentifiers(d.StreamId, d.TopicId) + bytes, err := iggcon.MarshalIdentifiers(d.StreamId, d.TopicId) + if err != nil { + return nil, err + } bytes = binary.LittleEndian.AppendUint32(bytes, d.PartitionId) bytes = binary.LittleEndian.AppendUint32(bytes, d.SegmentsCount) return bytes, nil From 301398cd5d76352584d338dca6f6aeb0e5874894 Mon Sep 17 00:00:00 2001 From: matanper Date: Thu, 30 Apr 2026 11:48:30 +0300 Subject: [PATCH 6/6] test(go): move delete segments coverage to BDD Cover delete segments through the Go TCP BDD suite so server-backed SDK behavior follows the existing test layout. --- .../tests/tcp_test/segments_feature_delete.go | 88 ++++++++++++++++++ .../client/tcp/tcp_segment_management_test.go | 93 ------------------- 2 files changed, 88 insertions(+), 93 deletions(-) create mode 100644 bdd/go/tests/tcp_test/segments_feature_delete.go delete mode 100644 foreign/go/client/tcp/tcp_segment_management_test.go diff --git a/bdd/go/tests/tcp_test/segments_feature_delete.go b/bdd/go/tests/tcp_test/segments_feature_delete.go new file mode 100644 index 0000000000..011ac9e611 --- /dev/null +++ b/bdd/go/tests/tcp_test/segments_feature_delete.go @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tcp_test + +import ( + iggcon "github.com/apache/iggy/foreign/go/contracts" + ierror "github.com/apache/iggy/foreign/go/errors" + "github.com/onsi/ginkgo/v2" +) + +var _ = ginkgo.Describe("DELETE SEGMENTS:", func() { + prefix := "DeleteSegments" + ginkgo.When("User is logged in", func() { + ginkgo.Context("and tries to delete zero segments for existing topic partition", func() { + client := createAuthorizedConnection() + streamId, _ := successfullyCreateStream(prefix, client) + defer deleteStreamAfterTests(streamId, client) + topicId, _ := successfullyCreateTopic(streamId, client) + + streamIdentifier, _ := iggcon.NewIdentifier(streamId) + topicIdentifier, _ := iggcon.NewIdentifier(topicId) + err := client.DeleteSegments( + streamIdentifier, + topicIdentifier, + 0, + 0, + ) + + itShouldNotReturnError(err) + }) + + ginkgo.Context("and tries to delete segments for a non existing stream", func() { + client := createAuthorizedConnection() + err := client.DeleteSegments( + randomU32Identifier(), + randomU32Identifier(), + 0, + 1, + ) + + itShouldReturnSpecificError(err, ierror.ErrStreamIdNotFound) + }) + + ginkgo.Context("and tries to delete segments for a non existing topic", func() { + client := createAuthorizedConnection() + streamId, _ := successfullyCreateStream(prefix, client) + defer deleteStreamAfterTests(streamId, client) + streamIdentifier, _ := iggcon.NewIdentifier(streamId) + err := client.DeleteSegments( + streamIdentifier, + randomU32Identifier(), + 0, + 1, + ) + + itShouldReturnSpecificError(err, ierror.ErrTopicIdNotFound) + }) + }) + + ginkgo.When("User is not logged in", func() { + ginkgo.Context("and tries to delete segments", func() { + client := createClient() + err := client.DeleteSegments( + randomU32Identifier(), + randomU32Identifier(), + 0, + 1, + ) + + itShouldReturnUnauthenticatedError(err) + }) + }) +}) diff --git a/foreign/go/client/tcp/tcp_segment_management_test.go b/foreign/go/client/tcp/tcp_segment_management_test.go deleted file mode 100644 index 220f50661e..0000000000 --- a/foreign/go/client/tcp/tcp_segment_management_test.go +++ /dev/null @@ -1,93 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package tcp - -import ( - "bytes" - "encoding/binary" - "io" - "net" - "testing" - - iggcon "github.com/apache/iggy/foreign/go/contracts" - "github.com/apache/iggy/foreign/go/internal/command" -) - -func TestIggyTcpClient_DeleteSegments(t *testing.T) { - clientConn, serverConn := net.Pipe() - defer func() { - _ = clientConn.Close() - }() - - streamId, _ := iggcon.NewIdentifier("stream") - topicId, _ := iggcon.NewIdentifier(uint32(1)) - - expectedCommand := command.DeleteSegments{ - StreamId: streamId, - TopicId: topicId, - PartitionId: 2, - SegmentsCount: 3, - } - expectedPayload, err := expectedCommand.MarshalBinary() - if err != nil { - t.Fatalf("Failed to serialize expected command: %v", err) - } - - done := make(chan error, 1) - go func() { - defer func() { - _ = serverConn.Close() - }() - - header := make([]byte, 8) - if _, err := io.ReadFull(serverConn, header); err != nil { - done <- err - return - } - - messageLength := binary.LittleEndian.Uint32(header[:4]) - commandCode := binary.LittleEndian.Uint32(header[4:8]) - payload := make([]byte, messageLength-4) - if _, err := io.ReadFull(serverConn, payload); err != nil { - done <- err - return - } - - if commandCode != uint32(command.DeleteSegmentsCode) { - t.Errorf("Expected command code %d, got %d", command.DeleteSegmentsCode, commandCode) - } - if !bytes.Equal(payload, expectedPayload) { - t.Errorf("Expected payload %v, got %v", expectedPayload, payload) - } - - response := make([]byte, ResponseInitialBytesLength) - _, err := serverConn.Write(response) - done <- err - }() - - client := &IggyTcpClient{ - conn: clientConn, - } - - if err := client.DeleteSegments(streamId, topicId, 2, 3); err != nil { - t.Fatalf("DeleteSegments failed: %v", err) - } - if err := <-done; err != nil { - t.Fatalf("Server side failed: %v", err) - } -}