Skip to content

Commit

Permalink
feat: add initial implementation of cluster/partitions package
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Dec 16, 2020
1 parent 2875c0d commit 907acd8
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 1 deletion.
63 changes: 63 additions & 0 deletions internal/cluster/partitions/partition.go
@@ -0,0 +1,63 @@
// Copyright 2018-2020 Burak Sezer
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package partitions

import (
"sync"
"sync/atomic"

"github.com/buraksezer/olric/internal/discovery"
)

// Partition is a basic, logical storage unit in Olric and stores DMaps in a sync.Map.
type Partition struct {
sync.RWMutex

id uint64
backup bool
m sync.Map
owners atomic.Value
}

// owner returns partition owner. It's not thread-safe.
func (p *Partition) owner() discovery.Member {
if p.backup {
// programming error. it cannot occur at production!
panic("cannot call this if backup is true")
}
owners := p.owners.Load().([]discovery.Member)
if len(owners) == 0 {
panic("owners list cannot be empty")
}
return owners[len(owners)-1]
}

// ownerCount returns the current owner count of a partition.
func (p *Partition) ownerCount() int {
owners := p.owners.Load()
if owners == nil {
return 0
}
return len(owners.([]discovery.Member))
}

// loadOwners loads the partition owners from atomic.value and returns.
func (p *Partition) loadOwners() []discovery.Member {
owners := p.owners.Load()
if owners == nil {
return []discovery.Member{}
}
return owners.([]discovery.Member)
}
72 changes: 72 additions & 0 deletions internal/cluster/partitions/partitions.go
@@ -0,0 +1,72 @@
// Copyright 2018-2020 Burak Sezer
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package partitions

import (
"unsafe"

"github.com/buraksezer/olric/hasher"
"github.com/buraksezer/olric/internal/discovery"
)

const (
PRIMARY = iota + 1
BACKUP
)

type Partitions struct {
count uint64
kind int
hasher hasher.Hasher
m map[uint64]*Partition
}

func New(count uint64, kind int, hs hasher.Hasher) *Partitions {
return &Partitions{
kind: kind,
count: count,
hasher: hs,
m: make(map[uint64]*Partition),
}
}

// PartitionIdByHKey returns partition ID for a given HKey.
func (ps *Partitions) PartitionIdByHKey(hkey uint64) uint64 {
return hkey % ps.count
}

// PartitionByHKey loads the owner partition for a given hkey.
func (ps *Partitions) PartitionByHKey(hkey uint64) *Partition {
partID := ps.PartitionIdByHKey(hkey)
return ps.m[partID]
}

// PartitionOwners loads the partition owners list for a given hkey.
func (ps *Partitions) PartitionOwners(hkey uint64) []discovery.Member {
part := ps.PartitionByHKey(hkey)
return part.owners.Load().([]discovery.Member)
}

// HKey returns hash-key, a.k.a hkey, for a key on a dmap.
func (ps *Partitions) HKey(name, key string) uint64 {
tmp := name + key
return ps.hasher.Sum64(*(*[]byte)(unsafe.Pointer(&tmp)))
}

// findPartitionOwner finds the partition owner for a key on a dmap.
func (ps *Partitions) PartitionOwner(name, key string) (discovery.Member, uint64) {
hkey := ps.HKey(name, key)
return ps.PartitionByHKey(hkey).owner(), hkey
}
4 changes: 4 additions & 0 deletions olric.go
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/buraksezer/olric/config"
"github.com/buraksezer/olric/hasher"
"github.com/buraksezer/olric/internal/bufpool"
"github.com/buraksezer/olric/internal/cluster/partitions"
"github.com/buraksezer/olric/internal/discovery"
"github.com/buraksezer/olric/internal/kvstore"
"github.com/buraksezer/olric/internal/locker"
Expand Down Expand Up @@ -131,6 +132,9 @@ type Olric struct {
partitions map[uint64]*partition
backups map[uint64]*partition

npartitions *partitions.Partitions
nbackups *partitions.Partitions

// Matches opcodes to functions. It's somewhat like an HTTP request multiplexer
operations map[protocol.OpCode]func(w, r protocol.EncodeDecoder)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/config_test.go
Expand Up @@ -59,4 +59,4 @@ func Test_Config(t *testing.T) {
t.Fatalf("New config is not idential with the previous one")
}
})
}
}

0 comments on commit 907acd8

Please sign in to comment.