diff --git a/bdd/go/tests/tcp_test/segments_feature_delete.go b/bdd/go/tests/tcp_test/segments_feature_delete.go new file mode 100644 index 0000000000..011ac9e611 --- /dev/null +++ b/bdd/go/tests/tcp_test/segments_feature_delete.go @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tcp_test + +import ( + iggcon "github.com/apache/iggy/foreign/go/contracts" + ierror "github.com/apache/iggy/foreign/go/errors" + "github.com/onsi/ginkgo/v2" +) + +var _ = ginkgo.Describe("DELETE SEGMENTS:", func() { + prefix := "DeleteSegments" + ginkgo.When("User is logged in", func() { + ginkgo.Context("and tries to delete zero segments for existing topic partition", func() { + client := createAuthorizedConnection() + streamId, _ := successfullyCreateStream(prefix, client) + defer deleteStreamAfterTests(streamId, client) + topicId, _ := successfullyCreateTopic(streamId, client) + + streamIdentifier, _ := iggcon.NewIdentifier(streamId) + topicIdentifier, _ := iggcon.NewIdentifier(topicId) + err := client.DeleteSegments( + streamIdentifier, + topicIdentifier, + 0, + 0, + ) + + itShouldNotReturnError(err) + }) + + ginkgo.Context("and tries to delete segments for a non existing stream", func() { + client := createAuthorizedConnection() + err := client.DeleteSegments( + randomU32Identifier(), + randomU32Identifier(), + 0, + 1, + ) + + itShouldReturnSpecificError(err, ierror.ErrStreamIdNotFound) + }) + + ginkgo.Context("and tries to delete segments for a non existing topic", func() { + client := createAuthorizedConnection() + streamId, _ := successfullyCreateStream(prefix, client) + defer deleteStreamAfterTests(streamId, client) + streamIdentifier, _ := iggcon.NewIdentifier(streamId) + err := client.DeleteSegments( + streamIdentifier, + randomU32Identifier(), + 0, + 1, + ) + + itShouldReturnSpecificError(err, ierror.ErrTopicIdNotFound) + }) + }) + + ginkgo.When("User is not logged in", func() { + ginkgo.Context("and tries to delete segments", func() { + client := createClient() + err := client.DeleteSegments( + randomU32Identifier(), + randomU32Identifier(), + 0, + 1, + ) + + itShouldReturnUnauthenticatedError(err) + }) + }) +}) diff --git a/foreign/go/client/tcp/tcp_segment_management.go b/foreign/go/client/tcp/tcp_segment_management.go new file mode 100644 index 0000000000..b4b4e79969 --- /dev/null +++ b/foreign/go/client/tcp/tcp_segment_management.go @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tcp + +import ( + iggcon "github.com/apache/iggy/foreign/go/contracts" + "github.com/apache/iggy/foreign/go/internal/command" +) + +func (c *IggyTcpClient) DeleteSegments( + streamId iggcon.Identifier, + topicId iggcon.Identifier, + partitionId uint32, + segmentsCount uint32, +) error { + _, err := c.do(&command.DeleteSegments{ + StreamId: streamId, + TopicId: topicId, + PartitionId: partitionId, + SegmentsCount: segmentsCount, + }) + return err +} diff --git a/foreign/go/contracts/client.go b/foreign/go/contracts/client.go index f3b6dac655..40fb82d469 100644 --- a/foreign/go/contracts/client.go +++ b/foreign/go/contracts/client.go @@ -195,6 +195,15 @@ type Client interface { partitionsCount uint32, ) error + // DeleteSegments deletes N segments from a topic partition by stream and topic unique IDs or names. + // Authentication is required, and the permission to manage the partitions. + DeleteSegments( + streamId Identifier, + topicId Identifier, + partitionId uint32, + segmentsCount uint32, + ) error + // GetUser get the info about a specific user by unique ID or username. // Authentication is required, and the permission to read the users, unless the provided user ID is the same as the authenticated user. GetUser(identifier Identifier) (*UserInfoDetails, error) diff --git a/foreign/go/internal/command/code.go b/foreign/go/internal/command/code.go index e4775eb2fc..644abf2dd0 100644 --- a/foreign/go/internal/command/code.go +++ b/foreign/go/internal/command/code.go @@ -57,6 +57,7 @@ const ( UpdateTopicCode Code = 304 CreatePartitionsCode Code = 402 DeletePartitionsCode Code = 403 + DeleteSegmentsCode Code = 503 GetGroupCode Code = 600 GetGroupsCode Code = 601 CreateGroupCode Code = 602 diff --git a/foreign/go/internal/command/segment.go b/foreign/go/internal/command/segment.go new file mode 100644 index 0000000000..9aaac24f13 --- /dev/null +++ b/foreign/go/internal/command/segment.go @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package command + +import ( + "encoding/binary" + + iggcon "github.com/apache/iggy/foreign/go/contracts" +) + +type DeleteSegments struct { + StreamId iggcon.Identifier `json:"streamId"` + TopicId iggcon.Identifier `json:"topicId"` + PartitionId uint32 `json:"partitionId"` + SegmentsCount uint32 `json:"segmentsCount"` +} + +func (d *DeleteSegments) Code() Code { + return DeleteSegmentsCode +} + +func (d *DeleteSegments) MarshalBinary() ([]byte, error) { + bytes, err := iggcon.MarshalIdentifiers(d.StreamId, d.TopicId) + if err != nil { + return nil, err + } + bytes = binary.LittleEndian.AppendUint32(bytes, d.PartitionId) + bytes = binary.LittleEndian.AppendUint32(bytes, d.SegmentsCount) + return bytes, nil +} diff --git a/foreign/go/internal/command/segment_test.go b/foreign/go/internal/command/segment_test.go new file mode 100644 index 0000000000..e87a95622b --- /dev/null +++ b/foreign/go/internal/command/segment_test.go @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package command + +import ( + "bytes" + "testing" + + iggcon "github.com/apache/iggy/foreign/go/contracts" +) + +func TestSerialize_DeleteSegments(t *testing.T) { + streamId, _ := iggcon.NewIdentifier("stream") + topicId, _ := iggcon.NewIdentifier(uint32(1)) + request := DeleteSegments{ + StreamId: streamId, + TopicId: topicId, + PartitionId: 2, + SegmentsCount: 3, + } + + serialized, err := request.MarshalBinary() + if err != nil { + t.Errorf("Failed to serialize DeleteSegments: %v", err) + } + + expected := []byte{ + 0x02, // StreamId Kind (StringId) + 0x06, // StreamId Length (6) + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, // StreamId Value ("stream") + 0x01, // TopicId Kind (NumericId) + 0x04, // TopicId Length (4) + 0x01, 0x00, 0x00, 0x00, // TopicId Value (1) + 0x02, 0x00, 0x00, 0x00, // PartitionId (2) + 0x03, 0x00, 0x00, 0x00, // SegmentsCount (3) + } + + if !bytes.Equal(serialized, expected) { + t.Errorf("Test case failed. \nExpected:\t%v\nGot:\t\t%v", expected, serialized) + } +} + +func TestDeleteSegments_Code(t *testing.T) { + request := DeleteSegments{} + + if request.Code() != DeleteSegmentsCode { + t.Errorf("Expected command code %v, got %v", DeleteSegmentsCode, request.Code()) + } +}