Skip to content
This repository has been archived by the owner on Sep 12, 2018. It is now read-only.

Commit

Permalink
Merge pull request #820 from stevvooe/ng-storagedriver-updates
Browse files Browse the repository at this point in the history
StorageDriver interface changes and cleanup
  • Loading branch information
dmp42 committed Dec 8, 2014
2 parents 2ab1de3 + e364e71 commit f3f70d1
Show file tree
Hide file tree
Showing 28 changed files with 991 additions and 316 deletions.
7 changes: 5 additions & 2 deletions circle.yml
Expand Up @@ -21,8 +21,11 @@ test:
- test -z $(gofmt -s -l . | tee /dev/stderr)
- go vet ./...
- test -z $(golint ./... | tee /dev/stderr)
- go test -race -test.v ./...:
timeout: 600
- go test -test.v ./...

# Disabling the race detector due to massive memory usage.
# - go test -race -test.v ./...:
# timeout: 600

# TODO(stevvooe): The following is an attempt at using goveralls but it
# just doesn't work. goveralls requires a single profile file to be
Expand Down
6 changes: 3 additions & 3 deletions client/client_test.go
Expand Up @@ -91,7 +91,7 @@ func TestPush(t *testing.T) {
}

handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{
testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "PUT",
Route: "/v2/" + name + "/manifest/" + tag,
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestPull(t *testing.T) {
}

handler := testutil.NewHandler(append(blobRequestResponseMappings, testutil.RequestResponseMap{
testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + name + "/manifest/" + tag,
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestPullResume(t *testing.T) {

for i := 0; i < 3; i++ {
layerRequestResponseMappings = append(layerRequestResponseMappings, testutil.RequestResponseMap{
testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: "GET",
Route: "/v2/" + name + "/manifest/" + tag,
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry-storagedriver-azure/main.go
@@ -1,3 +1,5 @@
// +build ignore

package main

import (
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry-storagedriver-filesystem/main.go
@@ -1,3 +1,5 @@
// +build ignore

package main

import (
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry-storagedriver-inmemory/main.go
@@ -1,3 +1,5 @@
// +build ignore

package main

import (
Expand Down
2 changes: 2 additions & 0 deletions cmd/registry-storagedriver-s3/main.go
@@ -1,3 +1,5 @@
// +build ignore

package main

import (
Expand Down
1 change: 0 additions & 1 deletion cmd/registry/main.go
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/docker/docker-registry/configuration"
_ "github.com/docker/docker-registry/storagedriver/filesystem"
_ "github.com/docker/docker-registry/storagedriver/inmemory"
_ "github.com/docker/docker-registry/storagedriver/s3"
)

func main() {
Expand Down
21 changes: 14 additions & 7 deletions storage/filereader.go
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"time"

"github.com/docker/docker-registry/storagedriver"
)
Expand All @@ -16,8 +17,9 @@ type fileReader struct {
driver storagedriver.StorageDriver

// identifying fields
path string
size int64 // size is the total layer size, must be set.
path string
size int64 // size is the total layer size, must be set.
modtime time.Time

// mutable fields
rc io.ReadCloser // remote read closer
Expand All @@ -28,16 +30,21 @@ type fileReader struct {

func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) {
// Grab the size of the layer file, ensuring existence.
size, err := driver.CurrentSize(path)
fi, err := driver.Stat(path)

if err != nil {
return nil, err
}

if fi.IsDir() {
return nil, fmt.Errorf("cannot read a directory")
}

return &fileReader{
driver: driver,
path: path,
size: int64(size),
driver: driver,
path: path,
size: fi.Size(),
modtime: fi.ModTime(),
}, nil
}

Expand Down Expand Up @@ -126,7 +133,7 @@ func (fr *fileReader) reader() (io.Reader, error) {
}

// If we don't have a reader, open one up.
rc, err := fr.driver.ReadStream(fr.path, uint64(fr.offset))
rc, err := fr.driver.ReadStream(fr.path, fr.offset)

if err != nil {
return nil, err
Expand Down
7 changes: 3 additions & 4 deletions storage/layerreader.go
Expand Up @@ -11,9 +11,8 @@ import (
type layerReader struct {
fileReader

name string // repo name of this layer
digest digest.Digest
createdAt time.Time
name string // repo name of this layer
digest digest.Digest
}

var _ Layer = &layerReader{}
Expand All @@ -27,5 +26,5 @@ func (lrs *layerReader) Digest() digest.Digest {
}

func (lrs *layerReader) CreatedAt() time.Time {
return lrs.createdAt
return lrs.modtime
}
7 changes: 0 additions & 7 deletions storage/layerstore.go
@@ -1,8 +1,6 @@
package storage

import (
"time"

"github.com/docker/docker-registry/digest"
"github.com/docker/docker-registry/storagedriver"
)
Expand Down Expand Up @@ -55,11 +53,6 @@ func (ls *layerStore) Fetch(name string, digest digest.Digest) (Layer, error) {
fileReader: *fr,
name: name,
digest: digest,

// TODO(stevvooe): Storage backend does not support modification time
// queries yet. Layers "never" change, so just return the zero value
// plus a nano-second.
createdAt: (time.Time{}).Add(time.Nanosecond),
}, nil
}

Expand Down
27 changes: 14 additions & 13 deletions storage/layerupload.go
Expand Up @@ -107,9 +107,13 @@ func (luc *layerUploadController) Finish(size int64, digest digest.Digest) (Laye
return nil, err
}

if err := luc.writeLayer(fp, size, digest); err != nil {
if nn, err := luc.writeLayer(fp, digest); err != nil {
// Cleanup?
return nil, err
} else if nn != size {
// TODO(stevvooe): Short write. Will have to delete the location and
// report an error. This error needs to be reported to the client.
return nil, fmt.Errorf("short write writing layer")
}

// Yes! We have written some layer data. Let's make it visible. Link the
Expand Down Expand Up @@ -281,19 +285,20 @@ func (luc *layerUploadController) validateLayer(fp layerFile, size int64, dgst d
return dgst, nil
}

// writeLayer actually writes the the layer file into its final destination.
// The layer should be validated before commencing the write.
func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst digest.Digest) error {
// writeLayer actually writes the the layer file into its final destination,
// identified by dgst. The layer should be validated before commencing the
// write.
func (luc *layerUploadController) writeLayer(fp layerFile, dgst digest.Digest) (nn int64, err error) {
blobPath, err := luc.layerStore.pathMapper.path(blobPathSpec{
digest: dgst,
})

if err != nil {
return err
return 0, err
}

// Check for existence
if _, err := luc.layerStore.driver.CurrentSize(blobPath); err != nil {
if _, err := luc.layerStore.driver.Stat(blobPath); err != nil {
// TODO(stevvooe): This check is kind of problematic and very racy.
switch err := err.(type) {
case storagedriver.PathNotFoundError:
Expand All @@ -303,22 +308,18 @@ func (luc *layerUploadController) writeLayer(fp layerFile, size int64, dgst dige
// content addressable and we should just use this to ensure we
// have it written. Although, we do need to verify that the
// content that is there is the correct length.
return err
return 0, err
}
}

// Seek our local layer file back now.
if _, err := fp.Seek(0, os.SEEK_SET); err != nil {
// Cleanup?
return err
return 0, err
}

// Okay: we can write the file to the blob store.
if err := luc.layerStore.driver.WriteStream(blobPath, 0, uint64(size), fp); err != nil {
return err
}

return nil
return luc.layerStore.driver.WriteStream(blobPath, 0, fp)
}

// linkLayer links a valid, written layer blob into the registry under the
Expand Down
15 changes: 12 additions & 3 deletions storage/manifeststore.go
Expand Up @@ -22,12 +22,21 @@ func (ms *manifestStore) Exists(name, tag string) (bool, error) {
return false, err
}

size, err := ms.driver.CurrentSize(p)
fi, err := ms.driver.Stat(p)
if err != nil {
return false, err
switch err.(type) {
case storagedriver.PathNotFoundError:
return false, nil
default:
return false, err
}
}

if fi.IsDir() {
return false, fmt.Errorf("unexpected directory at path: %v, name=%s tag=%s", p, name, tag)
}

if size == 0 {
if fi.Size() == 0 {
return false, nil
}

Expand Down
14 changes: 8 additions & 6 deletions storagedriver/azure/azure.go
@@ -1,3 +1,5 @@
// +build ignore

// Package azure provides a storagedriver.StorageDriver implementation to
// store blobs in Microsoft Azure Blob Storage Service.
package azure
Expand Down Expand Up @@ -103,7 +105,7 @@ func (d *Driver) PutContent(path string, contents []byte) error {

// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
func (d *Driver) ReadStream(path string, offset int64) (io.ReadCloser, error) {
if ok, err := d.client.BlobExists(d.container, path); err != nil {
return nil, err
} else if !ok {
Expand All @@ -115,7 +117,7 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {
return nil, err
}

if offset >= size {
if offset >= int64(size) {
return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset}
}

Expand All @@ -129,10 +131,10 @@ func (d *Driver) ReadStream(path string, offset uint64) (io.ReadCloser, error) {

// WriteStream stores the contents of the provided io.ReadCloser at a location
// designated by the given path.
func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
func (d *Driver) WriteStream(path string, offset, size int64, reader io.ReadCloser) error {
var (
lastBlockNum int
resumableOffset uint64
resumableOffset int64
blocks []azure.Block
)

Expand All @@ -153,12 +155,12 @@ func (d *Driver) WriteStream(path string, offset, size uint64, reader io.ReadClo
return fmt.Errorf("Cannot parse block name as number '%s': %s", lastBlock.Name, err.Error())
}

var totalSize uint64
var totalSize int64
for _, v := range parts.CommittedBlocks {
blocks = append(blocks, azure.Block{
Id: v.Name,
Status: azure.BlockStatusCommitted})
totalSize += uint64(v.Size)
totalSize += int64(v.Size)
}

// NOTE: Azure driver currently supports only append mode (resumable
Expand Down
2 changes: 2 additions & 0 deletions storagedriver/azure/azure_test.go
@@ -1,3 +1,5 @@
// +build ignore

package azure

import (
Expand Down
26 changes: 16 additions & 10 deletions storagedriver/factory/factory.go
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

"github.com/docker/docker-registry/storagedriver"
"github.com/docker/docker-registry/storagedriver/ipc"
)

// driverFactories stores an internal mapping between storage driver names and their respective
Expand Down Expand Up @@ -41,16 +40,23 @@ func Register(name string, factory StorageDriverFactory) {
func Create(name string, parameters map[string]string) (storagedriver.StorageDriver, error) {
driverFactory, ok := driverFactories[name]
if !ok {
return nil, InvalidStorageDriverError{name}

// NOTE(stevvooe): We are disabling storagedriver ipc for now, as the
// server and client need to be updated for the changed API calls and
// there were some problems libchan hanging. We'll phase this
// functionality back in over the next few weeks.

// No registered StorageDriverFactory found, try ipc
driverClient, err := ipc.NewDriverClient(name, parameters)
if err != nil {
return nil, InvalidStorageDriverError{name}
}
err = driverClient.Start()
if err != nil {
return nil, err
}
return driverClient, nil
// driverClient, err := ipc.NewDriverClient(name, parameters)
// if err != nil {
// return nil, InvalidStorageDriverError{name}
// }
// err = driverClient.Start()
// if err != nil {
// return nil, err
// }
// return driverClient, nil
}
return driverFactory.Create(parameters)
}
Expand Down

0 comments on commit f3f70d1

Please sign in to comment.