Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - feat(atc): support volume size awareness for volume streaming #8693

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Binary file added atc/db/migration/mig
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE volumes
DROP COLUMN size;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE volumes
ADD COLUMN size bigint;
4 changes: 4 additions & 0 deletions atc/db/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type creatingVolume struct {
workerName string
handle string
path string
size int
teamID int
typ VolumeType
containerHandle string
Expand Down Expand Up @@ -150,6 +151,7 @@ func (volume *creatingVolume) InitializeArtifact() (WorkerArtifact, error) {
type CreatedVolume interface {
Handle() string
Path() string
Size() int
Type() VolumeType
TeamID() int
WorkerArtifactID() int
Expand All @@ -176,6 +178,7 @@ type createdVolume struct {
workerName string
handle string
path string
size int
teamID int
typ VolumeType
containerHandle string
Expand All @@ -197,6 +200,7 @@ type VolumeResourceType struct {

func (volume *createdVolume) Handle() string { return volume.handle }
func (volume *createdVolume) Path() string { return volume.path }
func (volume *createdVolume) Size() int { return volume.size }
func (volume *createdVolume) WorkerName() string { return volume.workerName }
func (volume *createdVolume) Type() VolumeType { return volume.typ }
func (volume *createdVolume) TeamID() int { return volume.teamID }
Expand Down
37 changes: 37 additions & 0 deletions atc/db/volume_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type VolumeRepository interface {
RemoveDestroyingVolumes(workerName string, handles []string) (int, error)

UpdateVolumesMissingSince(workerName string, handles []string) error
UpdateVolumeSize(handle string, size int) error
RemoveMissingVolumes(gracePeriod time.Duration) (removed int, err error)

DestroyUnknownVolumes(workerName string, handles []string) (int, error)
Expand Down Expand Up @@ -145,6 +146,32 @@ func (repository *volumeRepository) UpdateVolumesMissingSince(workerName string,
return tx.Commit()
}

func (repository *volumeRepository) UpdateVolumeSize(handle string, size int) error {
tx, err := repository.conn.Begin()
if err != nil {
return err
}

defer Rollback(tx)

query, args, err := psql.Update("volumes").
Set("size", size).
Where(sq.And{
sq.Eq{"handle": handle},
sq.NotEq{"state": VolumeStateCreating},
}).ToSql()
if err != nil {
return err
}

_, err = tx.Exec(query, args...)
if err != nil {
return err
}

return tx.Commit()
}

// Removes any volumes that exist in the database but are missing on the worker
// for over the designated grace time period.
func (repository *volumeRepository) RemoveMissingVolumes(gracePeriod time.Duration) (int, error) {
Expand Down Expand Up @@ -797,6 +824,7 @@ var volumeColumns = []string{
"v.state",
"w.name",
"v.path",
"v.size",
"c.handle",
"pv.handle",
"v.team_id",
Expand All @@ -823,6 +851,7 @@ func scanVolume(row sq.RowScanner, conn Conn) (CreatingVolume, CreatedVolume, De
var state string
var workerName string
var sqPath sql.NullString
var sqSize sql.NullInt64
var sqContainerHandle sql.NullString
var sqParentHandle sql.NullString
var sqTeamID sql.NullInt64
Expand All @@ -840,6 +869,7 @@ func scanVolume(row sq.RowScanner, conn Conn) (CreatingVolume, CreatedVolume, De
&state,
&workerName,
&sqPath,
&sqSize,
&sqContainerHandle,
&sqParentHandle,
&sqTeamID,
Expand All @@ -860,6 +890,11 @@ func scanVolume(row sq.RowScanner, conn Conn) (CreatingVolume, CreatedVolume, De
path = sqPath.String
}

var size int
if sqSize.Valid {
size = int(sqSize.Int64)
}

var containerHandle string
if sqContainerHandle.Valid {
containerHandle = sqContainerHandle.String
Expand Down Expand Up @@ -912,6 +947,7 @@ func scanVolume(row sq.RowScanner, conn Conn) (CreatingVolume, CreatedVolume, De
handle: handle,
typ: volumeType,
path: path,
size: size,
teamID: teamID,
workerName: workerName,
containerHandle: containerHandle,
Expand All @@ -930,6 +966,7 @@ func scanVolume(row sq.RowScanner, conn Conn) (CreatingVolume, CreatedVolume, De
handle: handle,
typ: volumeType,
path: path,
size: size,
teamID: teamID,
workerName: workerName,
containerHandle: containerHandle,
Expand Down
3 changes: 2 additions & 1 deletion atc/engine/build_step_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,14 @@ func (delegate *buildStepDelegate) SelectedWorker(logger lager.Logger, worker st
}
}

func (delegate *buildStepDelegate) StreamingVolume(logger lager.Logger, volume string, sourceWorker string, destWorker string) {
func (delegate *buildStepDelegate) StreamingVolume(logger lager.Logger, volume string, sourceWorker string, destWorker string, volumeSize int) {
err := delegate.build.SaveEvent(event.StreamingVolume{
Time: time.Now().Unix(),
Origin: event.Origin{
ID: event.OriginID(delegate.planID),
},
Volume: volume,
VolumeSize: volumeSize,
SourceWorker: sourceWorker,
DestWorker: destWorker,
})
Expand Down
1 change: 1 addition & 0 deletions atc/event/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type StreamingVolume struct {
Time int64 `json:"time"`
Origin Origin `json:"origin"`
Volume string `json:"volume"`
VolumeSize int `json:"volume_size"`
SourceWorker string `json:"source_worker"`
DestWorker string `json:"dest_worker"`
}
Expand Down
2 changes: 1 addition & 1 deletion atc/exec/build_step_delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type BuildStepDelegate interface {
BeforeSelectWorker(lager.Logger) error
WaitingForWorker(lager.Logger)
SelectedWorker(lager.Logger, string)
StreamingVolume(lager.Logger, string, string, string)
StreamingVolume(lager.Logger, string, string, string, int)
WaitingForStreamedVolume(lager.Logger, string, string)
BuildStartTime() time.Time

Expand Down
2 changes: 1 addition & 1 deletion atc/exec/check_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type CheckDelegate interface {
UpdateScopeLastCheckStartTime(db.ResourceConfigScope, bool) (bool, int, error)
UpdateScopeLastCheckEndTime(db.ResourceConfigScope, bool) (bool, error)

StreamingVolume(lager.Logger, string, string, string)
StreamingVolume(lager.Logger, string, string, string, int)
}

func NewCheckStep(
Expand Down
2 changes: 1 addition & 1 deletion atc/exec/get_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type GetDelegate interface {
BeforeSelectWorker(lager.Logger) error
WaitingForWorker(lager.Logger)
SelectedWorker(lager.Logger, string)
StreamingVolume(lager.Logger, string, string, string)
StreamingVolume(lager.Logger, string, string, string, int)
WaitingForStreamedVolume(lager.Logger, string, string)

UpdateResourceVersion(lager.Logger, string, resource.VersionResult)
Expand Down
2 changes: 1 addition & 1 deletion atc/exec/put_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type PutDelegate interface {
BeforeSelectWorker(lager.Logger) error
WaitingForWorker(lager.Logger)
SelectedWorker(lager.Logger, string)
StreamingVolume(lager.Logger, string, string, string)
StreamingVolume(lager.Logger, string, string, string, int)
WaitingForStreamedVolume(lager.Logger, string, string)
BuildStartTime() time.Time

Expand Down
2 changes: 1 addition & 1 deletion atc/exec/task_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type TaskDelegate interface {
BeforeSelectWorker(lager.Logger) error
WaitingForWorker(lager.Logger)
SelectedWorker(lager.Logger, string)
StreamingVolume(lager.Logger, string, string, string)
StreamingVolume(lager.Logger, string, string, string, int)
WaitingForStreamedVolume(lager.Logger, string, string)
BuildStartTime() time.Time
}
Expand Down
2 changes: 1 addition & 1 deletion atc/runtime/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type ContainerSpec struct {
}

type BuildStepDelegate interface {
StreamingVolume(lager.Logger, string, string, string)
StreamingVolume(lager.Logger, string, string, string, int)
WaitingForStreamedVolume(lager.Logger, string, string)
BuildStartTime() time.Time
}
Expand Down
24 changes: 23 additions & 1 deletion atc/worker/gardenruntime/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,11 +715,33 @@ func (worker *Worker) findOrStreamVolume(
return Volume{}, err
}

delegate.StreamingVolume(logger, inputPath, artifact.Source(), streamedVolume.DBVolume().WorkerName())
// if volume size not cached, get source volume size through LookupVolumeWithSize
var bcSourceVolume baggageclaim.Volume
if volume.DBVolume().Size() <= 0 {
bcSourceVolume, _, err = volume.worker.bcClient.LookupVolumeWithSize(ctx, volume.DBVolume().Handle())
if err != nil {
logger.Error("failed-to-find-source-volume-for-streaming", err)
return Volume{}, err
}
if inputPath == "for image" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should open this feature to all volumes. If you only want to apply to image volume, then this check should go to line 720.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the metrics, streamed image volume accounts for more than 80% of the total streamed volume(image & non-image).
Also, calculate&cache image volume size should make more sense than non-image volume, because non-image volume size may change swiftly but image volume size could be considered as unchanged.

volume.worker.db.VolumeRepo.UpdateVolumeSize(volume.DBVolume().Handle(), bcSourceVolume.Size())
}

logger.Info("update-worker-volume-size", lager.Data{
"worker": volume.worker.DBWorker().Name(),
"size": bcSourceVolume.Size(),
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When volume.DBVolume().Size() > 0, where bcSourceVolume is initialized?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


delegate.StreamingVolume(logger, inputPath, artifact.Source(), streamedVolume.DBVolume().WorkerName(), bcSourceVolume.Size())
if err := worker.streamer.Stream(ctx, artifact, streamedVolume); err != nil {
logger.Error("failed-to-stream-artifact", err)
return Volume{}, err
}
if inputPath == "for image" {
worker.db.VolumeRepo.UpdateVolumeSize(streamedVolume.DBVolume().Handle(), bcSourceVolume.Size())
}

logger.Debug("streamed-non-local-volume")
return streamedVolume, nil
}
Expand Down
4 changes: 2 additions & 2 deletions web/elm/src/Build/Output/Output.elm
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ handleEvent event ( model, effects ) =
, effects
)

StreamingVolume origin volume src time ->
( updateStep origin.id (setRunning << appendStepLog ("\u{001B}[1mstreaming volume \u{001B}[0m" ++ volume ++ " \u{001B}[1mfrom\u{001B}[0m " ++ src ++ "\n") time) model
StreamingVolume origin volume volume_size src time ->
( updateStep origin.id (setRunning << appendStepLog ("\u{001B}[1mstreaming volume \u{001B}[0m" ++ volume ++ " (" ++ String.fromInt volume_size ++ "MB)" ++ " \u{001B}[1mfrom\u{001B}[0m " ++ src ++ "\n") time) model
, effects
)

Expand Down
2 changes: 1 addition & 1 deletion web/elm/src/Build/StepTree/Models.elm
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ type BuildEvent
| Log Origin String (Maybe Time.Posix)
| WaitingForWorker Origin (Maybe Time.Posix)
| SelectedWorker Origin String (Maybe Time.Posix)
| StreamingVolume Origin String String (Maybe Time.Posix)
| StreamingVolume Origin String Int String (Maybe Time.Posix)
| WaitingForStreamedVolume Origin String (Maybe Time.Posix)
| Error Origin String Time.Posix
| ImageCheck Origin Concourse.BuildPlan
Expand Down
3 changes: 2 additions & 1 deletion web/elm/src/Concourse/BuildEvents.elm
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ decodeBuildEvent =
"streaming-volume" ->
Json.Decode.field
"data"
(Json.Decode.map4 StreamingVolume
(Json.Decode.map5 StreamingVolume
(Json.Decode.field "origin" <| Json.Decode.lazy (\_ -> decodeOrigin))
(Json.Decode.field "volume" Json.Decode.string)
(Json.Decode.field "volume_size" Json.Decode.int)
(Json.Decode.field "source_worker" Json.Decode.string)
(Json.Decode.maybe <| Json.Decode.field "time" <| Json.Decode.map dateFromSeconds Json.Decode.int)
)
Expand Down
1 change: 1 addition & 0 deletions worker/baggageclaim/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func NewHandler(
baggageclaim.CreateVolumeAsyncCheck: http.HandlerFunc(volumeServer.CreateVolumeAsyncCheck),
baggageclaim.ListVolumes: http.HandlerFunc(volumeServer.ListVolumes),
baggageclaim.GetVolume: http.HandlerFunc(volumeServer.GetVolume),
baggageclaim.GetVolumeWithSize: http.HandlerFunc(volumeServer.GetVolumeWithSize),
baggageclaim.SetProperty: http.HandlerFunc(volumeServer.SetProperty),
baggageclaim.GetPrivileged: http.HandlerFunc(volumeServer.GetPrivileged),
baggageclaim.SetPrivileged: http.HandlerFunc(volumeServer.SetPrivileged),
Expand Down
12 changes: 10 additions & 2 deletions worker/baggageclaim/api/volume_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (vs *VolumeServer) ListVolumes(w http.ResponseWriter, req *http.Request) {
}
}

func (vs *VolumeServer) GetVolume(w http.ResponseWriter, req *http.Request) {
func (vs *VolumeServer) getVolume(w http.ResponseWriter, req *http.Request, withSize bool) {
w.Header().Set("Content-Type", "application/json")

handle := rata.Param(req, "handle")
Expand All @@ -333,7 +333,7 @@ func (vs *VolumeServer) GetVolume(w http.ResponseWriter, req *http.Request) {

ctx := lagerctx.NewContext(req.Context(), hLog)

vol, found, err := vs.volumeRepo.GetVolume(ctx, handle)
vol, found, err := vs.volumeRepo.GetVolume(ctx, handle, withSize)
if err != nil {
hLog.Error("failed-to-get-volume", err)
RespondWithError(w, ErrGetVolumeFailed, http.StatusInternalServerError)
Expand All @@ -351,6 +351,14 @@ func (vs *VolumeServer) GetVolume(w http.ResponseWriter, req *http.Request) {
}
}

func (vs *VolumeServer) GetVolume(w http.ResponseWriter, req *http.Request) {
vs.getVolume(w, req, false)
}

func (vs *VolumeServer) GetVolumeWithSize(w http.ResponseWriter, req *http.Request) {
vs.getVolume(w, req, true)
}

func (vs *VolumeServer) SetProperty(w http.ResponseWriter, req *http.Request) {
handle := rata.Param(req, "handle")
propertyName := rata.Param(req, "property")
Expand Down
12 changes: 12 additions & 0 deletions worker/baggageclaim/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ type Client interface {
// or an error as to why the volume could not be found.
LookupVolume(context.Context, string) (Volume, bool, error)

// LookupVolumeWithSize finds a volume with volume size that is present on the server.
// It takes a string that corresponds to the Handle of the Volume.
//
// You are required to pass in a logger to the call to retain context across
// the library boundary.
//
// LookupVolumeWithSize returns a bool if the volume with volume size is found with the matching volume
// or an error as to why the volume could not be found.
LookupVolumeWithSize(context.Context, string) (Volume, bool, error)

// DestroyVolumes deletes the list of volumes that is present on the server. It takes
// a string of volumes
//
Expand Down Expand Up @@ -83,6 +93,8 @@ type Volume interface {
// supplied to other systems in order to let them use the volume.
Path() string

Size() int

// SetProperty sets a property on the Volume. Properties can be used to
// filter the results in the ListVolumes call above.
SetProperty(ctx context.Context, key string, value string) error
Expand Down
23 changes: 20 additions & 3 deletions worker/baggageclaim/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,19 @@ func (c *client) ListVolumes(ctx context.Context, properties baggageclaim.Volume
}

func (c *client) LookupVolume(ctx context.Context, handle string) (baggageclaim.Volume, bool, error) {
volumeResponse, found, err := c.getVolumeResponse(ctx, handle)
volumeResponse, found, err := c.getVolumeResponse(ctx, handle, false)
if err != nil {
return nil, false, err
}
if !found {
return nil, found, nil
}

return c.newVolume(volumeResponse), true, nil
}

func (c *client) LookupVolumeWithSize(ctx context.Context, handle string) (baggageclaim.Volume, bool, error) {
volumeResponse, found, err := c.getVolumeResponse(ctx, handle, true)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -243,6 +255,7 @@ func (c *client) newVolume(apiVolume baggageclaim.VolumeResponse) baggageclaim.V
volume := &clientVolume{
handle: apiVolume.Handle,
path: apiVolume.Path,
size: apiVolume.Size,

bcClient: c,
}
Expand Down Expand Up @@ -428,8 +441,12 @@ func getError(response *http.Response) error {
return errors.New(errorResponse.Message)
}

func (c *client) getVolumeResponse(ctx context.Context, handle string) (baggageclaim.VolumeResponse, bool, error) {
request, err := c.generateRequest(ctx, baggageclaim.GetVolume, rata.Params{
func (c *client) getVolumeResponse(ctx context.Context, handle string, requestSize bool) (baggageclaim.VolumeResponse, bool, error) {
requestName := baggageclaim.GetVolume
if requestSize {
requestName = baggageclaim.GetVolumeWithSize
}
request, err := c.generateRequest(ctx, requestName, rata.Params{
"handle": handle,
}, nil)
if err != nil {
Expand Down