Skip to content
This repository has been archived by the owner on Sep 12, 2018. It is now read-only.

StorageDriver interface changes and cleanup #820

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions circle.yml
Expand Up @@ -21,8 +21,11 @@ test:
- test -z $(gofmt -s -l . | tee /dev/stderr)
- go vet ./...
- test -z $(golint ./... | tee /dev/stderr)
- go test -race -test.v ./...:
timeout: 600
- go test -test.v ./...

# Disabling the race detector due to massive memory usage.
# - go test -race -test.v ./...:
# timeout: 600

# TODO(stevvooe): The following is an attempt at using goveralls but it
# just doesn't work. goveralls requires a single profile file to be
Expand Down
6 changes: 3 additions & 3 deletions client/client_test.go
Expand Up @@ -91,7 +91,7 @@ func TestPush(t *testing.T) {
}

handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{
testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "PUT",
Route: "/v2/" + name + "/manifest/" + tag,
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestPull(t *testing.T) {
}

handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{
testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + name + "/manifest/" + tag,
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestPullResume(t *testing.T) {

for i := 0; i < 3; i++ {
layerRequestResponseMappings = append(layerRequestResponseMappings, testutil.RequestResponseMap{
testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + name + "/manifest/" + tag,
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry-storagedriver-azure/main.go
@@ -1,3 +1,5 @@
// +build ignore

package main

import (
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry-storagedriver-filesystem/main.go
@@ -1,3 +1,5 @@
// +build ignore

package main

import (
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry-storagedriver-inmemory/main.go
@@ -1,3 +1,5 @@
// +build ignore

package main

import (
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry-storagedriver-s3/main.go
@@ -1,3 +1,5 @@
// +build ignore

package main

import (
Expand Down
1 change: 0 additions & 1 deletion cmd/registry/main.go
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/docker/docker-registry/configuration"
_ "github.com/docker/docker-registry/storagedriver/filesystem"
_ "github.com/docker/docker-registry/storagedriver/inmemory"
_ "github.com/docker/docker-registry/storagedriver/s3"
)

func main() {
Expand Down
21 changes: 14 additions & 7 deletions storage/filereader.go
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"time"

"github.com/docker/docker-registry/storagedriver"
)
Expand All @@ -16,8 +17,9 @@ type fileReader struct {
driver storagedriver.StorageDriver

// identifying fields
path string
size int64 // size is the total layer size, must be set.
path string
size int64 // size is the total layer size, must be set.
modtime time.Time

// mutable fields
rc io.ReadCloser // remote read closer
Expand All @@ -28,16 +30,21 @@ type fileReader struct {

func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) {
// Grab the size of the layer file, ensuring existence.
size, err := driver.CurrentSize(path)
fi, err := driver.Stat(path)

if err != nil {
return nil, err
}

if fi.IsDir() {
return nil, fmt.Errorf("cannot read a directory")
}

return &fileReader{
driver: driver,
path: path,
size: int64(size),
driver: driver,
path: path,
size: fi.Size(),
modtime: fi.ModTime(),
}, nil
}

Expand Down Expand Up @@ -126,7 +133,7 @@ func (fr *fileReader) reader() (io.Reader, error) {
}

// If we don't have a reader, open one up.
rc, err := fr.driver.ReadStream(fr.path, uint64(fr.offset))
rc, err := fr.driver.ReadStream(fr.path, fr.offset)

if err != nil {
return nil, err
Expand Down
7 changes: 3 additions & 4 deletions storage/layerreader.go
Expand Up @@ -11,9 +11,8 @@ import (
type layerReader struct {
fileReader

name string // repo name of this layer
digest digest.Digest
createdAt time.Time
name string // repo name of this layer
digest digest.Digest
}

var _ Layer = &layerReader{}
Expand All @@ -27,5 +26,5 @@ func (lrs *layerReader) Digest() digest.Digest {
}

func (lrs *layerReader) CreatedAt() time.Time {
return lrs.createdAt
return lrs.modtime
}
7 changes: 0 additions & 7 deletions storage/layerstore.go
@@ -1,8 +1,6 @@
package storage

import (
"time"

"github.com/docker/docker-registry/digest"
"github.com/docker/docker-registry/storagedriver"
)
Expand Down Expand Up @@ -55,11 +53,6 @@ func (ls *layerStore) Fetch(name string, digest digest.Digest) (Layer, error) {
fileReader: *fr,
name: name,
digest: digest,

// TODO(stevvooe): Storage backend does not support modification time
// queries yet. Layers "never" change, so just return the zero value
// plus a nano-second.
createdAt: (time.Time{}).Add(time.Nanosecond),
}, nil
}

Expand Down
27 changes: 14 additions & 13 deletions storage/layerupload.go
Expand Up @@ -107,9 +107,13 @@ func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Laye
return nil, err
}

if err := luc.writeLayer(fp, size, digest); err != nil {
if nn, err := luc.writeLayer(fp, digest); err != nil {
// Cleanup?
return nil, err
} else if nn != size {
// TODO(stevvooe): Short write. Will have to delete the location and
// report an error. This error needs to be reported to the client.
return nil, fmt.Errorf("short write writing layer")
}

// Yes! We have written some layer data. Let's make it visible. Link the
Expand Down Expand Up @@ -281,19 +285,20 @@ func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst d
return dgst, nil
}

// writeLayer actually writes the the layer file into its final destination.
// The layer should be validated before commencing the write.
func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst digest.Digest) error {
// writeLayer actually writes the the layer file into its final destination,
// identified by dgst. The layer should be validated before commencing the
// write.
func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) {
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
digest: dgst,
})

if err != nil {
return err
return 0, err
}

// Check for existence
if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil {
if _, err := luc.layerStore.driver.Stat(blobPath); err != nil {
// TODO(stevvooe): This check is kind of problematic and very racy.
switch err := err.(type) {
case storagedriver.PathNotFoundError:
Expand All @@ -303,22 +308,18 @@ func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst dige
// content addressable and we should just use this to ensure we
// have it written. Although, we do need to verify that the
// content that is there is the correct length.
return err
return 0, err
}
}

// Seek our local layer file back now.
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
// Cleanup?
return err
return 0, err
}

// Okay: we can write the file to the blob store.
if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil {
return err
}

return nil
return luc.layerStore.driver.WriteStream(blobPath, 0, fp)
}

// linkLayer links a valid, written layer blob into the registry under the
Expand Down
15 changes: 12 additions & 3 deletions storage/manifeststore.go
Expand Up @@ -22,12 +22,21 @@ func (ms *manifestStore) Exists(name, tag string) (bool, error) {
return false, err
}

size, err := ms.driver.CurrentSize(p)
fi, err := ms.driver.Stat(p)
if err != nil {
return false, err
switch err.(type) {
case storagedriver.PathNotFoundError:
return false, nil
default:
return false, err
}
}

if fi.IsDir() {
return false, fmt.Errorf("unexpected directory at path: %v, name=%s tag=%s", p, name, tag)
}

if size == 0 {
if fi.Size() == 0 {
return false, nil
}

Expand Down
14 changes: 8 additions & 6 deletions storagedriver/azure/azure.go
@@ -1,3 +1,5 @@
// +build ignore

// Package azure provides a storagedriver.StorageDriver implementation to
// store blobs in Microsoft Azure Blob Storage Service.
package azure
Expand Down Expand Up @@ -103,7 +105,7 @@ func (d *Driver) PutContent(path string, contents []byte) error {

// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
if ok, err := d.client.BlobExists(d.container, path); err != nil {
return nil, err
} else if !ok {
Expand All @@ -115,7 +117,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
return nil, err
}

if offset >= size {
if offset >= int64(size) {
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}

Expand All @@ -129,10 +131,10 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {

// WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path.
func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
var (
lastBlockNum int
resumableOffset uint64
resumableOffset int64
blocks []azure.Block
)

Expand All @@ -153,12 +155,12 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo
return fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error())
}

var totalSize uint64
var totalSize int64
for _, v := range parts.CommittedBlocks {
blocks = append(blocks, azure.Block{
Id: v.Name,
Status: azure.BlockStatusCommitted})
totalSize += uint64(v.Size)
totalSize += int64(v.Size)
}

// NOTE: Azure driver currently supports only append mode (resumable
Expand Down
2 changes: 2 additions & 0 deletions storagedriver/azure/azure_test.go
@@ -1,3 +1,5 @@
// +build ignore

package azure

import (
Expand Down
26 changes: 16 additions & 10 deletions storagedriver/factory/factory.go
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

"github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker-registry/storagedriver/ipc"
)

// driverFactories stores an internal mapping between storage driver names and their respective
Expand Down Expand Up @@ -41,16 +40,23 @@ func Register(name string, factory StorageDriverFactory) {
func Create(name string, parameters map[string]string) (storagedriver.StorageDriver, error) {
driverFactory, ok := driverFactories[name]
if !ok {
return nil, InvalidStorageDriverError{name}

// NOTE(stevvooe): We are disabling storagedriver ipc for now, as the
// server and client need to be updated for the changed API calls and
// there were some problems libchan hanging. We'll phase this
// functionality back in over the next few weeks.

// No registered StorageDriverFactory found, try ipc
driverClient, err := ipc.NewDriverClient(name, parameters)
if err != nil {
return nil, InvalidStorageDriverError{name}
}
err = driverClient.Start()
if err != nil {
return nil, err
}
return driverClient, nil
// driverClient, err := ipc.NewDriverClient(name, parameters)
// if err != nil {
// return nil, InvalidStorageDriverError{name}
// }
// err = driverClient.Start()
// if err != nil {
// return nil, err
// }
// return driverClient, nil
}
return driverFactory.Create(parameters)
}
Expand Down