Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions bdd/go/tests/tcp_test/segments_feature_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package tcp_test

import (
iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
"github.com/onsi/ginkgo/v2"
)

var _ = ginkgo.Describe("DELETE SEGMENTS:", func() {
prefix := "DeleteSegments"
ginkgo.When("User is logged in", func() {
ginkgo.Context("and tries to delete zero segments for existing topic partition", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
topicId, _ := successfullyCreateTopic(streamId, client)

streamIdentifier, _ := iggcon.NewIdentifier(streamId)
topicIdentifier, _ := iggcon.NewIdentifier(topicId)
err := client.DeleteSegments(
streamIdentifier,
topicIdentifier,
0,
0,
)

itShouldNotReturnError(err)
})

ginkgo.Context("and tries to delete segments for a non existing stream", func() {
client := createAuthorizedConnection()
err := client.DeleteSegments(
randomU32Identifier(),
randomU32Identifier(),
0,
1,
)

itShouldReturnSpecificError(err, ierror.ErrStreamIdNotFound)
})

ginkgo.Context("and tries to delete segments for a non existing topic", func() {
client := createAuthorizedConnection()
streamId, _ := successfullyCreateStream(prefix, client)
defer deleteStreamAfterTests(streamId, client)
streamIdentifier, _ := iggcon.NewIdentifier(streamId)
err := client.DeleteSegments(
streamIdentifier,
randomU32Identifier(),
0,
1,
)

itShouldReturnSpecificError(err, ierror.ErrTopicIdNotFound)
})
})

ginkgo.When("User is not logged in", func() {
ginkgo.Context("and tries to delete segments", func() {
client := createClient()
err := client.DeleteSegments(
randomU32Identifier(),
randomU32Identifier(),
0,
1,
)

itShouldReturnUnauthenticatedError(err)
})
})
})
38 changes: 38 additions & 0 deletions foreign/go/client/tcp/tcp_segment_management.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package tcp

import (
iggcon "github.com/apache/iggy/foreign/go/contracts"
"github.com/apache/iggy/foreign/go/internal/command"
)

func (c *IggyTcpClient) DeleteSegments(
streamId iggcon.Identifier,
topicId iggcon.Identifier,
partitionId uint32,
segmentsCount uint32,
) error {
_, err := c.do(&command.DeleteSegments{
StreamId: streamId,
TopicId: topicId,
PartitionId: partitionId,
SegmentsCount: segmentsCount,
})
return err
}
9 changes: 9 additions & 0 deletions foreign/go/contracts/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ type Client interface {
partitionsCount uint32,
) error

// DeleteSegments deletes N segments from a topic partition by stream and topic unique IDs or names.
// Authentication is required, and the permission to manage the partitions.
DeleteSegments(
streamId Identifier,
topicId Identifier,
partitionId uint32,
segmentsCount uint32,
) error

// GetUser get the info about a specific user by unique ID or username.
// Authentication is required, and the permission to read the users, unless the provided user ID is the same as the authenticated user.
GetUser(identifier Identifier) (*UserInfoDetails, error)
Expand Down
1 change: 1 addition & 0 deletions foreign/go/internal/command/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
UpdateTopicCode Code = 304
CreatePartitionsCode Code = 402
DeletePartitionsCode Code = 403
DeleteSegmentsCode Code = 503
GetGroupCode Code = 600
GetGroupsCode Code = 601
CreateGroupCode Code = 602
Expand Down
45 changes: 45 additions & 0 deletions foreign/go/internal/command/segment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package command

import (
"encoding/binary"

iggcon "github.com/apache/iggy/foreign/go/contracts"
)

type DeleteSegments struct {
StreamId iggcon.Identifier `json:"streamId"`
TopicId iggcon.Identifier `json:"topicId"`
PartitionId uint32 `json:"partitionId"`
SegmentsCount uint32 `json:"segmentsCount"`
}

func (d *DeleteSegments) Code() Code {
return DeleteSegmentsCode
}

func (d *DeleteSegments) MarshalBinary() ([]byte, error) {
bytes, err := iggcon.MarshalIdentifiers(d.StreamId, d.TopicId)
if err != nil {
return nil, err
}
bytes = binary.LittleEndian.AppendUint32(bytes, d.PartitionId)
bytes = binary.LittleEndian.AppendUint32(bytes, d.SegmentsCount)
return bytes, nil
}
64 changes: 64 additions & 0 deletions foreign/go/internal/command/segment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package command

import (
"bytes"
"testing"

iggcon "github.com/apache/iggy/foreign/go/contracts"
)

func TestSerialize_DeleteSegments(t *testing.T) {
streamId, _ := iggcon.NewIdentifier("stream")
topicId, _ := iggcon.NewIdentifier(uint32(1))
request := DeleteSegments{
StreamId: streamId,
TopicId: topicId,
PartitionId: 2,
SegmentsCount: 3,
}

serialized, err := request.MarshalBinary()
if err != nil {
t.Errorf("Failed to serialize DeleteSegments: %v", err)
}

expected := []byte{
0x02, // StreamId Kind (StringId)
0x06, // StreamId Length (6)
0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, // StreamId Value ("stream")
0x01, // TopicId Kind (NumericId)
0x04, // TopicId Length (4)
0x01, 0x00, 0x00, 0x00, // TopicId Value (1)
0x02, 0x00, 0x00, 0x00, // PartitionId (2)
0x03, 0x00, 0x00, 0x00, // SegmentsCount (3)
}

if !bytes.Equal(serialized, expected) {
t.Errorf("Test case failed. \nExpected:\t%v\nGot:\t\t%v", expected, serialized)
}
}

func TestDeleteSegments_Code(t *testing.T) {
request := DeleteSegments{}

if request.Code() != DeleteSegmentsCode {
t.Errorf("Expected command code %v, got %v", DeleteSegmentsCode, request.Code())
}
}