From 2a90df0f44041279a726e2665b2d4ffeb7c0286f Mon Sep 17 00:00:00 2001 From: Jason Chionh Date: Wed, 3 Apr 2019 10:38:03 -0700 Subject: [PATCH] Initial commit of the source code --- .gitignore | 31 ++ LICENSE | 28 ++ .../bundle_support/3p/archiver/NOTICE | 24 ++ .../bundle_support/3p/archiver/archiver.go | 135 ++++++++ .../bundle_support/3p/archiver/tar.go | 244 ++++++++++++++ .../bundle_support/3p/archiver/targz.go | 98 ++++++ .../bundle_support/archive/bundle_archive.go | 89 +++++ .../archive/bundle_processor.go | 34 ++ .../archive/bundle_processor_test.go | 38 +++ .../archive/bundle_processor_v1.go | 33 ++ .../archive/bundle_processor_v1_test.go | 71 ++++ .../archive/bundle_processor_v2.go | 148 ++++++++ .../bundle_support/archive/overlay.go | 17 + .../robomaker/bundle_support/bundle/bundle.go | 106 ++++++ .../bundle_support/bundle/bundle_test.go | 95 ++++++ .../robomaker/bundle_support/bundle_error.go | 37 ++ .../bundle_support/bundle_provider.go | 149 ++++++++ .../extractors/bundle_v1_extractor.go | 72 ++++ .../extractors/bundle_v1_extractor_test.go | 24 ++ .../bundle_support/extractors/extractor.go | 40 +++ .../extractors/targz_extractor.go | 65 ++++ .../extractors/targz_extractor_test.go | 73 ++++ .../bundle_support/file_system/file_system.go | 65 ++++ .../robomaker/bundle_support/s3/s3_reader.go | 216 ++++++++++++ .../bundle_support/s3/s3_reader_test.go | 194 +++++++++++ .../bundle_support/store/bundle_store.go | 238 +++++++++++++ .../bundle_support/store/bundle_store_test.go | 319 ++++++++++++++++++ .../bundle_support/stream/path_to_stream.go | 139 ++++++++ .../stream/path_to_stream_test.go | 214 ++++++++++++ .../robomaker/example/s3_download/main.go | 67 ++++ .../robomaker/example/test_app/main.go | 86 +++++ 31 files changed, 3189 insertions(+) create mode 100644 .gitignore create mode 100644 src/go.amzn.com/robomaker/bundle_support/3p/archiver/NOTICE create mode 100644 src/go.amzn.com/robomaker/bundle_support/3p/archiver/archiver.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/3p/archiver/tar.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/3p/archiver/targz.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/archive/bundle_archive.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_test.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v1.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v1_test.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v2.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/archive/overlay.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/bundle/bundle.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/bundle/bundle_test.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/bundle_error.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/bundle_provider.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/extractors/bundle_v1_extractor.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/extractors/bundle_v1_extractor_test.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/extractors/extractor.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/extractors/targz_extractor.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/extractors/targz_extractor_test.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/file_system/file_system.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/s3/s3_reader.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/s3/s3_reader_test.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/store/bundle_store.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/store/bundle_store_test.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/stream/path_to_stream.go create mode 100644 src/go.amzn.com/robomaker/bundle_support/stream/path_to_stream_test.go create mode 100644 src/go.amzn.com/robomaker/example/s3_download/main.go create mode 100644 src/go.amzn.com/robomaker/example/test_app/main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ae1d0df --- /dev/null +++ b/.gitignore @@ -0,0 +1,31 @@ +/eclipse-bin/ +/bin +/build +/pkg +.classpath +.project +.settings +.vscode/ +.DS_Store +*.swp +*.swo +*.orig +mock_*.go +### JetBrains+all Patch ### +# Ignores the whole .idea folder and all .iml files +# See https://github.com/joeblau/gitignore.io/issues/186 and https://github.com/joeblau/gitignore.io/issues/360 + +.idea/ + +# Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-249601023 + +*.iml +modules.xml +.idea/misc.xml +*.ipr + +**/*.swp + +assets + +# End of https://www.gitignore.io/api/jetbrains+all \ No newline at end of file diff --git a/LICENSE b/LICENSE index 67db858..4d976c2 100644 --- a/LICENSE +++ b/LICENSE @@ -173,3 +173,31 @@ defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2019 Amazon.com, Inc. or its affiliates. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + \ No newline at end of file diff --git a/src/go.amzn.com/robomaker/bundle_support/3p/archiver/NOTICE b/src/go.amzn.com/robomaker/bundle_support/3p/archiver/NOTICE new file mode 100644 index 0000000..17c72db --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/3p/archiver/NOTICE @@ -0,0 +1,24 @@ +** github.com/mholt/archiver; version 3.1.1 -- https://github.com/mholt/archiver +Copyright (c) 2016 Matthew Holt + +MIT License + +Copyright (c) 2016 Matthew Holt + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/src/go.amzn.com/robomaker/bundle_support/3p/archiver/archiver.go b/src/go.amzn.com/robomaker/bundle_support/3p/archiver/archiver.go new file mode 100644 index 0000000..6cac0cf --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/3p/archiver/archiver.go @@ -0,0 +1,135 @@ +package archiver + +import ( + "fmt" + "io" + "log" + "os" + "path/filepath" + "runtime" + "strings" +) + +// Archiver represent a archive format +type Archiver interface { + // Match checks supported files + Match(filename string) bool + // Make makes an archive file on disk. + Make(destination string, sources []string) error + // Open extracts an archive file on disk. + Open(source, destination string) error + // Write writes an archive to a Writer. + Write(output io.Writer, sources []string) error + // Read reads an archive from a Reader. + Read(input io.Reader, destination string) error +} + +// SupportedFormats contains all supported archive formats +var SupportedFormats = map[string]Archiver{} + +// RegisterFormat adds a supported archive format +func RegisterFormat(name string, format Archiver) { + if _, ok := SupportedFormats[name]; ok { + log.Printf("Format %s already exists, skip!\n", name) + return + } + SupportedFormats[name] = format +} + +// MatchingFormat returns the first archive format that matches +// the given file, or nil if there is no match +func MatchingFormat(fpath string) Archiver { + for _, fmt := range SupportedFormats { + if fmt.Match(fpath) { + return fmt + } + } + return nil +} + +func writeNewFile(fpath string, in io.Reader, fm os.FileMode) error { + err := os.MkdirAll(filepath.Dir(fpath), 0755) + if err != nil { + return fmt.Errorf("%s: making directory for file: %v", fpath, err) + } + + out, err := os.Create(fpath) + if err != nil { + return fmt.Errorf("%s: creating new file: %v", fpath, err) + } + defer out.Close() + + err = out.Chmod(fm) + if err != nil && runtime.GOOS != "windows" { + return fmt.Errorf("%s: changing file mode: %v", fpath, err) + } + + _, err = io.Copy(out, in) + if err != nil { + return fmt.Errorf("%s: writing file: %v", fpath, err) + } + return nil +} + +func writeNewSymbolicLink(fpath string, target string) error { + err := os.MkdirAll(filepath.Dir(fpath), 0755) + if err != nil { + return fmt.Errorf("%s: making directory for file: %v", fpath, err) + } + + _, err = os.Lstat(fpath) + if err == nil { + err = os.Remove(fpath) + if err != nil { + return fmt.Errorf("%s: failed to unlink: %+v", fpath, err) + } + } + + err = os.Symlink(target, fpath) + if err != nil { + return fmt.Errorf("%s: making symbolic link for: %v", fpath, err) + } + + return nil +} + +func writeNewHardLink(fpath string, target string) error { + err := os.MkdirAll(filepath.Dir(fpath), 0755) + if err != nil { + return fmt.Errorf("%s: making directory for file: %v", fpath, err) + } + + _, err = os.Lstat(fpath) + if err == nil { + err = os.Remove(fpath) + if err != nil { + return fmt.Errorf("%s: failed to unlink: %+v", fpath, err) + } + } + + err = os.Link(target, fpath) + if err != nil { + return fmt.Errorf("%s: making hard link for: %v", fpath, err) + } + + return nil +} + +func mkdir(dirPath string) error { + err := os.MkdirAll(dirPath, 0755) + if err != nil { + return fmt.Errorf("%s: making directory: %v", dirPath, err) + } + return nil +} + +func sanitizeExtractPath(filePath string, destination string) error { + // to avoid zip slip (writing outside of the destination), we resolve + // the target path, and make sure it's nested in the intended + // destination, or bail otherwise. + destpath := filepath.Join(destination, filePath) + if !strings.HasPrefix(destpath, filepath.Clean(destination)) { + return fmt.Errorf("%s: illegal file path", filePath) + } + return nil +} diff --git a/src/go.amzn.com/robomaker/bundle_support/3p/archiver/tar.go b/src/go.amzn.com/robomaker/bundle_support/3p/archiver/tar.go new file mode 100644 index 0000000..ee0c443 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/3p/archiver/tar.go @@ -0,0 +1,244 @@ +package archiver + +import ( + "archive/tar" + "bytes" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "strings" +) + +// Tar is for Tar format +var Tar tarFormat + +func init() { + RegisterFormat("Tar", Tar) +} + +type tarFormat struct{} + +func (tarFormat) Match(filename string) bool { + return strings.HasSuffix(strings.ToLower(filename), ".tar") || isTar(filename) +} + +const tarBlockSize int = 512 + +// isTar checks the file has the Tar format header by reading its beginning +// block. +func isTar(tarPath string) bool { + f, err := os.Open(tarPath) + if err != nil { + return false + } + defer f.Close() + + buf := make([]byte, tarBlockSize) + if _, err = io.ReadFull(f, buf); err != nil { + return false + } + + return hasTarHeader(buf) +} + +// hasTarHeader checks passed bytes has a valid tar header or not. buf must +// contain at least 512 bytes and if not, it always returns false. +func hasTarHeader(buf []byte) bool { + if len(buf) < tarBlockSize { + return false + } + + b := buf[148:156] + b = bytes.Trim(b, " \x00") // clean up all spaces and null bytes + if len(b) == 0 { + return false // unknown format + } + hdrSum, err := strconv.ParseUint(string(b), 8, 64) + if err != nil { + return false + } + + // According to the go official archive/tar, Sun tar uses signed byte + // values so this calcs both signed and unsigned + var usum uint64 + var sum int64 + for i, c := range buf { + if 148 <= i && i < 156 { + c = ' ' // checksum field itself is counted as branks + } + usum += uint64(uint8(c)) + sum += int64(int8(c)) + } + + if hdrSum != usum && int64(hdrSum) != sum { + return false // invalid checksum + } + + return true +} + +// Write outputs a .tar file to a Writer containing the +// contents of files listed in filePaths. File paths can +// be those of regular files or directories. Regular +// files are stored at the 'root' of the archive, and +// directories are recursively added. +func (tarFormat) Write(output io.Writer, filePaths []string) error { + return writeTar(filePaths, output, "") +} + +// Make creates a .tar file at tarPath containing the +// contents of files listed in filePaths. File paths can +// be those of regular files or directories. Regular +// files are stored at the 'root' of the archive, and +// directories are recursively added. +func (tarFormat) Make(tarPath string, filePaths []string) error { + out, err := os.Create(tarPath) + if err != nil { + return fmt.Errorf("error creating %s: %v", tarPath, err) + } + defer out.Close() + + return writeTar(filePaths, out, tarPath) +} + +func writeTar(filePaths []string, output io.Writer, dest string) error { + tarWriter := tar.NewWriter(output) + defer tarWriter.Close() + + return tarball(filePaths, tarWriter, dest) +} + +// tarball writes all files listed in filePaths into tarWriter, which is +// writing into a file located at dest. +func tarball(filePaths []string, tarWriter *tar.Writer, dest string) error { + for _, fpath := range filePaths { + err := tarFile(tarWriter, fpath, dest) + if err != nil { + return err + } + } + return nil +} + +// tarFile writes the file at source into tarWriter. It does so +// recursively for directories. +func tarFile(tarWriter *tar.Writer, source, dest string) error { + sourceInfo, err := os.Stat(source) + if err != nil { + return fmt.Errorf("%s: stat: %v", source, err) + } + + var baseDir string + if sourceInfo.IsDir() { + baseDir = filepath.Base(source) + } + + return filepath.Walk(source, func(path string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("error walking to %s: %v", path, err) + } + + header, err := tar.FileInfoHeader(info, path) + if err != nil { + return fmt.Errorf("%s: making header: %v", path, err) + } + + if baseDir != "" { + header.Name = filepath.ToSlash(filepath.Join(baseDir, strings.TrimPrefix(path, source))) + } + + if header.Name == dest { + // our new tar file is inside the directory being archived; skip it + return nil + } + + if info.IsDir() { + header.Name += "/" + } + + err = tarWriter.WriteHeader(header) + if err != nil { + return fmt.Errorf("%s: writing header: %v", path, err) + } + + if info.IsDir() { + return nil + } + + if header.Typeflag == tar.TypeReg { + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("%s: open: %v", path, err) + } + defer file.Close() + + _, err = io.CopyN(tarWriter, file, info.Size()) + if err != nil && err != io.EOF { + return fmt.Errorf("%s: copying contents: %v", path, err) + } + } + return nil + }) +} + +// Read untars a .tar file read from a Reader and puts +// the contents into destination. +func (tarFormat) Read(input io.Reader, destination string) error { + return untar(tar.NewReader(input), destination) +} + +// Open untars source and puts the contents into destination. +func (tarFormat) Open(source, destination string) error { + f, err := os.Open(source) + if err != nil { + return fmt.Errorf("%s: failed to open archive: %v", source, err) + } + defer f.Close() + + return Tar.Read(f, destination) +} + +// untar un-tarballs the contents of tr into destination. +func untar(tr *tar.Reader, destination string) error { + for { + header, err := tr.Next() + if err == io.EOF { + break + } else if err != nil { + return err + } + + if err := untarFile(tr, header, destination); err != nil { + return err + } + } + return nil +} + +// untarFile untars a single file from tr with header header into destination. +func untarFile(tr *tar.Reader, header *tar.Header, destination string) error { + err := sanitizeExtractPath(header.Name, destination) + if err != nil { + return err + } + + destpath := filepath.Join(destination, header.Name) + + switch header.Typeflag { + case tar.TypeDir: + return mkdir(destpath) + case tar.TypeReg, tar.TypeRegA, tar.TypeChar, tar.TypeBlock, tar.TypeFifo: + return writeNewFile(destpath, tr, header.FileInfo().Mode()) + case tar.TypeSymlink: + return writeNewSymbolicLink(destpath, header.Linkname) + case tar.TypeLink: + return writeNewHardLink(destpath, filepath.Join(destination, header.Linkname)) + case tar.TypeXGlobalHeader: + // ignore the pax global header from git generated tarballs + return nil + default: + return fmt.Errorf("%s: unknown type flag: %c", header.Name, header.Typeflag) + } +} diff --git a/src/go.amzn.com/robomaker/bundle_support/3p/archiver/targz.go b/src/go.amzn.com/robomaker/bundle_support/3p/archiver/targz.go new file mode 100644 index 0000000..6751d49 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/3p/archiver/targz.go @@ -0,0 +1,98 @@ +package archiver + +import ( + "compress/gzip" + "fmt" + "io" + "os" + "strings" +) + +// TarGz is for TarGz format +var TarGz tarGzFormat + +func init() { + RegisterFormat("TarGz", TarGz) +} + +type tarGzFormat struct{} + +func (tarGzFormat) Match(filename string) bool { + return strings.HasSuffix(strings.ToLower(filename), ".tar.gz") || + strings.HasSuffix(strings.ToLower(filename), ".tgz") || + isTarGz(filename) +} + +// isTarGz checks the file has the gzip compressed Tar format header by reading +// its beginning block. +func isTarGz(targzPath string) bool { + f, err := os.Open(targzPath) + if err != nil { + return false + } + defer f.Close() + + gzr, err := gzip.NewReader(f) + if err != nil { + return false + } + defer gzr.Close() + + buf := make([]byte, tarBlockSize) + n, err := gzr.Read(buf) + if err != nil || n < tarBlockSize { + return false + } + + return hasTarHeader(buf) +} + +// Write outputs a .tar.gz file to a Writer containing +// the contents of files listed in filePaths. It works +// the same way Tar does, but with gzip compression. +func (tarGzFormat) Write(output io.Writer, filePaths []string) error { + return writeTarGz(filePaths, output, "") +} + +// Make creates a .tar.gz file at targzPath containing +// the contents of files listed in filePaths. It works +// the same way Tar does, but with gzip compression. +func (tarGzFormat) Make(targzPath string, filePaths []string) error { + out, err := os.Create(targzPath) + if err != nil { + return fmt.Errorf("error creating %s: %v", targzPath, err) + } + defer out.Close() + + return writeTarGz(filePaths, out, targzPath) +} + +func writeTarGz(filePaths []string, output io.Writer, dest string) error { + gzw := gzip.NewWriter(output) + defer gzw.Close() + + return writeTar(filePaths, gzw, dest) +} + +// Read untars a .tar.gz file read from a Reader and decompresses +// the contents into destination. +func (tarGzFormat) Read(input io.Reader, destination string) error { + gzr, err := gzip.NewReader(input) + if err != nil { + return fmt.Errorf("error decompressing: %v", err) + } + defer gzr.Close() + + return Tar.Read(gzr, destination) +} + +// Open untars source and decompresses the contents into destination. +func (tarGzFormat) Open(source, destination string) error { + f, err := os.Open(source) + if err != nil { + return fmt.Errorf("%s: failed to open archive: %v", source, err) + } + defer f.Close() + + return TarGz.Read(f, destination) +} diff --git a/src/go.amzn.com/robomaker/bundle_support/archive/bundle_archive.go b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_archive.go new file mode 100644 index 0000000..20d0c18 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_archive.go @@ -0,0 +1,89 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package archive + +import ( + "archive/tar" + "errors" + "fmt" + "go.amzn.com/robomaker/bundle_support/bundle" + "go.amzn.com/robomaker/bundle_support/extractors" + "go.amzn.com/robomaker/bundle_support/store" + "io" +) + +const ( + VersionFileName = "version" +) + +// BundleArchive's responsibility is to be able to extract bundles of all versions, v1, v2, etc. +type BundleArchive interface { + + // What version is this bundle? + Version() string + + // Extract everything into the cache + Extract(bundleCache store.BundleStore) (bundle.Bundle, error) +} + +type bundleArchive struct { + version string + inputStream io.ReadSeeker + bundleProcessor BundleProcessor +} + +func NewBundleArchive(inputStream io.ReadSeeker) (BundleArchive, error) { + // read version to determine bundle version + tarReader := extractors.TarReaderFromStream(inputStream) + version, versionErr := ReadVersionFromBundle(tarReader) + + if versionErr != nil { + return nil, fmt.Errorf("unable to read version from bundle: %v", versionErr) + } + + // get the appropriate bundle processor for the version + bundleProcessor := BundleProcessorForVersion(version) + if bundleProcessor == nil { + return nil, fmt.Errorf("unsuppported bundle processor version: %s", version) + } + + // reset seek position to start of the stream and init with the stream + inputStream.Seek(0, io.SeekStart) + return &bundleArchive{ + version: version, + inputStream: inputStream, + bundleProcessor: bundleProcessor, + }, nil +} + +func (b *bundleArchive) Version() string { + return b.version +} + +func (b *bundleArchive) Extract(bundleStore store.BundleStore) (bundle.Bundle, error) { + return b.bundleProcessor.Extract(b.inputStream, bundleStore) +} + +func ReadVersionFromBundle(tarReader *tar.Reader) (string, error) { + header, headerErr := tarReader.Next() + if headerErr != nil { + fmt.Printf("Error parsing tar: %v", headerErr) + return "", headerErr + } + + if header.Name != VersionFileName { + err := errors.New("invalid bundle format, first file should be a version file") + return "", err + } + + versionData := make([]byte, header.Size) + _, readVersionErr := tarReader.Read(versionData) + // We need to read a second time to get the io.EOF message + _, readVersionErr = tarReader.Read(nil) + if readVersionErr != io.EOF { + return "", fmt.Errorf("unable to read version: %v", readVersionErr) + } + + return string(versionData), nil +} diff --git a/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor.go b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor.go new file mode 100644 index 0000000..9a7d3d4 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor.go @@ -0,0 +1,34 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package archive + +import ( + "go.amzn.com/robomaker/bundle_support/bundle" + "go.amzn.com/robomaker/bundle_support/store" + "io" +) + +const ( + BundleProcessorVersion1 = "1" + BundleProcessorVersion2 = "2" +) + +// BundleProcessor's responsibility is to take a bundle stream and knows how to process/handle the Bundle File +// This includes the knowledge on how to process v1, v2, etc. +type BundleProcessor interface { + + // Extract takes the bundle bytes and extracts everything into the bundle store + Extract(inputStream io.ReadSeeker, bundleCache store.BundleStore) (bundle.Bundle, error) +} + +func BundleProcessorForVersion(version string) BundleProcessor { + switch version { + case BundleProcessorVersion1: + return NewBundleProcessorV1() + case BundleProcessorVersion2: + return NewBundleProcessorV2() + default: + return nil + } +} diff --git a/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_test.go b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_test.go new file mode 100644 index 0000000..3d9ef61 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_test.go @@ -0,0 +1,38 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package archive + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestBundleProcessorForVersion_V1_ShouldReturnV1(t *testing.T) { + t.Parallel() + processor := BundleProcessorForVersion(BundleProcessorVersion1) + + // type assert that this is v1 + _, ok := processor.(*bundleProcessorV1) + + assert.NotNil(t, processor) + assert.True(t, ok) +} + +func TestBundleProcessorForVersion_V2_ShouldReturnV2(t *testing.T) { + t.Parallel() + processor := BundleProcessorForVersion(BundleProcessorVersion2) + + // type assert that this is v2 + _, ok := processor.(*bundleProcessorV2) + + assert.NotNil(t, processor) + assert.True(t, ok) +} + +func TestBundleProcessorForVersion_Unsupported_ShouldReturnNil(t *testing.T) { + t.Parallel() + processor := BundleProcessorForVersion("NoVersion") + + assert.Nil(t, processor) +} diff --git a/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v1.go b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v1.go new file mode 100644 index 0000000..b866216 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v1.go @@ -0,0 +1,33 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package archive + +import ( + "github.com/google/uuid" + "go.amzn.com/robomaker/bundle_support/bundle" + "go.amzn.com/robomaker/bundle_support/extractors" + "go.amzn.com/robomaker/bundle_support/store" + "io" +) + +func NewBundleProcessorV1() BundleProcessor { + return &bundleProcessorV1{} +} + +// Bundle v1 simply extracts tar.gz +type bundleProcessorV1 struct{} + +func (b *bundleProcessorV1) Extract(inputStream io.ReadSeeker, bundleStore store.BundleStore) (bundle.Bundle, error) { + // create a bundle extractor that knows how to extract the bundle + bundleExtractor := extractors.NewBundleV1Extractor(inputStream) + + bundleKey := uuid.New().String() + // put it into the store + // for bundle v1, we plan to ask the higher-up caller for the key, use 12345 for now + _, putErr := bundleStore.Put(bundleKey, bundleExtractor) + if putErr != nil { + return nil, putErr + } + return bundle.NewBundle(bundleStore, []string{bundleKey}), nil +} diff --git a/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v1_test.go b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v1_test.go new file mode 100644 index 0000000..f164c0e --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v1_test.go @@ -0,0 +1,71 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package archive + +import ( + "fmt" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "go.amzn.com/robomaker/bundle_support/extractors" + "go.amzn.com/robomaker/bundle_support/store" + "testing" +) + +const ( + rootPath = "/testing_root" +) + +// Matcher that tests for BundleV1Extractor +type ofExtractorV1 struct { +} + +func OfExtractorV1() gomock.Matcher { + return &ofExtractorV1{} +} + +func (o *ofExtractorV1) Matches(x interface{}) bool { + _, ok := x.(*extractors.BundleV1Extractor) + return ok +} + +func (o *ofExtractorV1) String() string { + return "expected type: *extractors.BundleV1Extractor" +} + +func TestBundleProcessorV1_Extract_ShouldPutIntoStore(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + path := "testPath" + + mockBundleStore := store.NewMockBundleStore(ctrl) + mockBundleStore.EXPECT().Put(gomock.Any(), OfExtractorV1()).Return(path, nil) + + extractor := NewBundleProcessorV1() + bundle, err := extractor.Extract(nil, mockBundleStore) + + assert.NotNil(t, bundle) + assert.Nil(t, err) +} + +func TestBundleProcessorV1_Extract_WithError_ShouldReturnError(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + path := "" + + expectedError := fmt.Errorf("TestError") + + mockBundleStore := store.NewMockBundleStore(ctrl) + mockBundleStore.EXPECT().Put(gomock.Any(), OfExtractorV1()).Return(path, expectedError) + + extractor := NewBundleProcessorV1() + bundle, err := extractor.Extract(nil, mockBundleStore) + + assert.Nil(t, bundle) + assert.NotNil(t, err) + assert.Equal(t, expectedError, err) +} diff --git a/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v2.go b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v2.go new file mode 100644 index 0000000..05e87cb --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/archive/bundle_processor_v2.go @@ -0,0 +1,148 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package archive + +import ( + "archive/tar" + "compress/gzip" + "encoding/json" + "fmt" + "go.amzn.com/robomaker/bundle_support/bundle" + "go.amzn.com/robomaker/bundle_support/extractors" + "go.amzn.com/robomaker/bundle_support/store" + "io" + "io/ioutil" +) + +const ( + metadataFileName = "metadata.tar.gz" + overlaysFileName = "overlays.json" +) + +var EmptyOverlays Overlays + +func NewBundleProcessorV2() BundleProcessor { + return &bundleProcessorV2{} +} + +// Bundle v2 processor knows how to parse overlays and process them accordingly +type bundleProcessorV2 struct { +} + +func (b *bundleProcessorV2) Extract(inputStream io.ReadSeeker, bundleStore store.BundleStore) (bundle.Bundle, error) { + + // obtain the metadata from the bundle bytes + metadataTarReader, metadataErr := getMetadataTarReader(inputStream) + if metadataErr != nil { + return nil, metadataErr + } + + // get the list of overlays from the metadata + overlays, overalysErr := getOverlays(metadataTarReader) + if overalysErr != nil { + return nil, overalysErr + } + + var itemKeys []string + + // for every overlay, extract them into the bundle store + for _, overlay := range overlays.Overlays { + + fmt.Printf("Processing overlay: %+v\n", overlay) + + overlayReader, overlayErr := getReaderForOverlay(overlay, inputStream) + if overlayErr != nil { + return nil, overlayErr + } + + tarGzExtractor := extractors.ExtractorFromFileName(overlayReader, overlay.FileName) + if tarGzExtractor == nil { + return nil, fmt.Errorf("cannot create extractor for overlay: %s", overlay.FileName) + } + + // now, put into the bundle store, the store will take care of not extracting if it already exists + _, putError := bundleStore.Put(overlay.Sha256, tarGzExtractor) + if putError != nil { + return nil, putError + } + itemKeys = append(itemKeys, overlay.Sha256) + } + + //Seek to the end of the stream to expose completion to clients monitoring progress (we might not read everything) + _, _ = inputStream.Seek(0, io.SeekEnd) + + // create a new bundle with item paths + return bundle.NewBundle(bundleStore, itemKeys), nil +} + +// from the input stream get the metadata tar reader +func getMetadataTarReader(inputStream io.ReadSeeker) (*tar.Reader, error) { + tarReader := extractors.TarReaderFromStream(inputStream) + // skip past the version file and get to the metadata.tar.gz file + tarReader.Next() + metadataHeader, metadataErr := tarReader.Next() + if metadataErr != nil { + return nil, metadataErr + } + + // ensure that we are now pointing to the metadata file + if metadataHeader.Name != metadataFileName { + return nil, fmt.Errorf("unexpected metadata file: %s", metadataHeader.Name) + } + + // create a limit reader in order to extract this metadata file + metadataReader := io.LimitReader(tarReader, metadataHeader.Size) + + // now, get a tar reader from this metadataReader + // we know that this is a .tar.gz file + metadataTarGzReader, gzErr := gzip.NewReader(metadataReader) + if gzErr != nil { + return nil, gzErr + } + // transform it into a tarReader + return tar.NewReader(metadataTarGzReader), nil +} + +func getOverlays(metadataTarReader *tar.Reader) (Overlays, error) { + + // iterate headers in the metadata tar file and process each file in the tar + for { + header, err := metadataTarReader.Next() + if err == io.EOF { + // we there are no more headers, we finish + break + } else if err != nil { + return EmptyOverlays, err + } + + // if we find the overlays file, read the bytes and parse it + if header.Name == overlaysFileName { + overlayBytes, overlayBytesErr := ioutil.ReadAll(metadataTarReader) + if overlayBytesErr != nil { + return EmptyOverlays, overlayBytesErr + } + + var overlays Overlays + // unmarshal json + jsonErr := json.Unmarshal(overlayBytes, &overlays) + if jsonErr != nil { + return EmptyOverlays, fmt.Errorf("unable to parse JSON of the overlays file: %s", jsonErr) + } + return overlays, nil + } + } + return EmptyOverlays, fmt.Errorf("overlays file not find in metadata") +} + +func getReaderForOverlay(overlay Overlay, inputStream io.ReadSeeker) (io.Reader, error) { + // now we seek and create a limit reader, and get extractor + _, seekError := inputStream.Seek(int64(overlay.Offset), io.SeekStart) + + if seekError != nil { + return nil, fmt.Errorf("seekError: %v for %s", seekError, overlay.FileName) + } + + // create a limit reader to read part of a file + return io.LimitReader(inputStream, int64(overlay.Size)), nil +} diff --git a/src/go.amzn.com/robomaker/bundle_support/archive/overlay.go b/src/go.amzn.com/robomaker/bundle_support/archive/overlay.go new file mode 100644 index 0000000..cbabaa2 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/archive/overlay.go @@ -0,0 +1,17 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package archive + +// Overlays and Overlay mirror the json structure of the overlays documented in V2 format here: +// https://github.com/colcon/colcon-bundle/blob/master/BUNDLE_FORMAT.md +type Overlays struct { + Overlays []Overlay `json:"overlays"` +} + +type Overlay struct { + FileName string `json:"name"` + Sha256 string `json:"sha256"` + Offset int `json:"offset"` + Size int `json:"size"` +} diff --git a/src/go.amzn.com/robomaker/bundle_support/bundle/bundle.go b/src/go.amzn.com/robomaker/bundle_support/bundle/bundle.go new file mode 100644 index 0000000..74c625e --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/bundle/bundle.go @@ -0,0 +1,106 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package bundle + +//go:generate $MOCKGEN -destination=mock_bundle.go -package=bundle go.amzn.com/robomaker/bundle_support/bundle Bundle + +import ( + "fmt" + "go.amzn.com/robomaker/bundle_support/store" + "path/filepath" +) + +const ( + sourceCommandFormat = "BUNDLE_CURRENT_PREFIX=%s source %s/setup.sh;" + standardPosixSourceCommandFormat = "BUNDLE_CURRENT_PREFIX=%s . %s/setup.sh;" +) + +// Bundle's responsibility is to provide the correct source commands for the application to execute. +// The contents of the directories in the source commands are guaranteed to be available on disk by the BundleCache. +type Bundle interface { + + // List of commands that should be executed + // to insert the bundle's contents into + // a shell environment + SourceCommands() []string + + // List of commands that should be executed + // to insert the bundle's contents into + // a POSIX-standard environment + PosixSourceCommands() []string + + // List of commands that should be executed + // to insert the bundle's contents into + // a shell/POSIX-standard environment + // + // The root path in the source commands will be replaced with location. + // This is useful if you mount the store in a container want to access them in the container's mounted location. + SourceCommandsUsingLocation(location string) []string + PosixSourceCommandsUsingLocation(location string) []string + + // Releases all resources that this Bundle holds + Release() +} + +// Create a new bundle. Give it an array of item paths. Bundle knows how to construct source commands +// from the item paths +func NewBundle(bundleStore store.BundleStore, itemKeys []string) Bundle { + return &bundle{ + bundleStore: bundleStore, + itemKeys: itemKeys, + } +} + +type bundle struct { + bundleStore store.BundleStore + itemKeys []string +} + +func (b *bundle) SourceCommands() []string { + var sourceCommands []string + bundleStorePath := b.bundleStore.RootPath() + for _, itemKey := range b.itemKeys { + itemPath := filepath.Join(bundleStorePath, itemKey) + sourceCommand := fmt.Sprintf(sourceCommandFormat, itemPath, itemPath) + sourceCommands = append(sourceCommands, sourceCommand) + } + return sourceCommands +} + +func (b *bundle) PosixSourceCommands() []string { + var sourceCommands []string + bundleStorePath := b.bundleStore.RootPath() + for _, itemKey := range b.itemKeys { + itemPath := filepath.Join(bundleStorePath, itemKey) + sourceCommand := fmt.Sprintf(standardPosixSourceCommandFormat, itemPath, itemPath) + sourceCommands = append(sourceCommands, sourceCommand) + } + return sourceCommands +} + +func (b *bundle) SourceCommandsUsingLocation(location string) []string { + var sourceCommands []string + for _, itemKey := range b.itemKeys { + itemPath := filepath.Join(location, itemKey) + sourceCommand := fmt.Sprintf(sourceCommandFormat, itemPath, itemPath) + sourceCommands = append(sourceCommands, sourceCommand) + } + return sourceCommands +} + +func (b *bundle) PosixSourceCommandsUsingLocation(location string) []string { + var sourceCommands []string + for _, itemKey := range b.itemKeys { + itemPath := filepath.Join(location, itemKey) + sourceCommand := fmt.Sprintf(standardPosixSourceCommandFormat, itemPath, itemPath) + sourceCommands = append(sourceCommands, sourceCommand) + } + return sourceCommands +} + +func (b *bundle) Release() { + for _, itemKey := range b.itemKeys { + b.bundleStore.Release(itemKey) + } +} diff --git a/src/go.amzn.com/robomaker/bundle_support/bundle/bundle_test.go b/src/go.amzn.com/robomaker/bundle_support/bundle/bundle_test.go new file mode 100644 index 0000000..68d7c25 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/bundle/bundle_test.go @@ -0,0 +1,95 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package bundle + +import ( + "fmt" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "go.amzn.com/robomaker/bundle_support/store" + "testing" +) + +const ( + rootPath = "/testing_root" + containerRootPath = "/container_root" +) + +var itemKeys = []string{ + "item1", + "item2", + "item3", +} + +var itemPaths = []string{ + "/testing_root/item1", + "/testing_root/item2", + "/testing_root/item3", +} + +var containerItemPaths = []string{ + "/container_root/item1", + "/container_root/item2", + "/container_root/item3", +} + +func TestBundle_SourceCommands_GivesExpectedCommands(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockBundleStore := store.NewMockBundleStore(ctrl) + mockBundleStore.EXPECT().RootPath().Return(rootPath).AnyTimes() + + bundle := NewBundle(mockBundleStore, itemKeys) + sourceCommands := bundle.SourceCommands() + + assert.Equal(t, 3, len(sourceCommands)) + assert.Equal(t, fmt.Sprintf(sourceCommandFormat, itemPaths[0], itemPaths[0]), sourceCommands[0]) + assert.Equal(t, fmt.Sprintf(sourceCommandFormat, itemPaths[1], itemPaths[1]), sourceCommands[1]) + assert.Equal(t, fmt.Sprintf(sourceCommandFormat, itemPaths[2], itemPaths[2]), sourceCommands[2]) + + posixCommands := bundle.PosixSourceCommands() + assert.Equal(t, 3, len(posixCommands)) + assert.Equal(t, fmt.Sprintf(standardPosixSourceCommandFormat, itemPaths[0], itemPaths[0]), posixCommands[0]) + assert.Equal(t, fmt.Sprintf(standardPosixSourceCommandFormat, itemPaths[1], itemPaths[1]), posixCommands[1]) + assert.Equal(t, fmt.Sprintf(standardPosixSourceCommandFormat, itemPaths[2], itemPaths[2]), posixCommands[2]) +} + +func TestBundle_SourceCommandsUsingLocation_GivesExpectedCommands(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockBundleStore := store.NewMockBundleStore(ctrl) + + bundle := NewBundle(mockBundleStore, itemKeys) + sourceCommands := bundle.SourceCommandsUsingLocation(containerRootPath) + + assert.Equal(t, 3, len(sourceCommands)) + assert.Equal(t, fmt.Sprintf(sourceCommandFormat, containerItemPaths[0], containerItemPaths[0]), sourceCommands[0]) + assert.Equal(t, fmt.Sprintf(sourceCommandFormat, containerItemPaths[1], containerItemPaths[1]), sourceCommands[1]) + assert.Equal(t, fmt.Sprintf(sourceCommandFormat, containerItemPaths[2], containerItemPaths[2]), sourceCommands[2]) + + posixCommands := bundle.PosixSourceCommandsUsingLocation(containerRootPath) + assert.Equal(t, 3, len(posixCommands)) + assert.Equal(t, fmt.Sprintf(standardPosixSourceCommandFormat, containerItemPaths[0], containerItemPaths[0]), posixCommands[0]) + assert.Equal(t, fmt.Sprintf(standardPosixSourceCommandFormat, containerItemPaths[1], containerItemPaths[1]), posixCommands[1]) + assert.Equal(t, fmt.Sprintf(standardPosixSourceCommandFormat, containerItemPaths[2], containerItemPaths[2]), posixCommands[2]) + +} + +func TestBundle_Release_ShouldReleaseItemKeys(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockBundleStore := store.NewMockBundleStore(ctrl) + mockBundleStore.EXPECT().Release(itemKeys[0]) + mockBundleStore.EXPECT().Release(itemKeys[1]) + mockBundleStore.EXPECT().Release(itemKeys[2]) + + bundle := NewBundle(mockBundleStore, itemKeys) + bundle.Release() +} diff --git a/src/go.amzn.com/robomaker/bundle_support/bundle_error.go b/src/go.amzn.com/robomaker/bundle_support/bundle_error.go new file mode 100644 index 0000000..5a595ca --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/bundle_error.go @@ -0,0 +1,37 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package bundle_support + +import "fmt" + +const ( + ERROR_TYPE_CONTENT_ID = "CONTENT_ID" + ERROR_TYPE_SOURCE = "SOURCE" + ERROR_TYPE_FORMAT = "FORMAT" + ERROR_TYPE_EXTRACTION = "EXTRACTION" +) + +type BundleError struct { + cause error + errorType string +} + +func (err *BundleError) Error() string { + return fmt.Sprintf("[%s] bundle error: %v", err.errorType, err.cause) +} + +func (err *BundleError) GetCause() error { + return err.cause +} + +func (err *BundleError) GetErrorType() string { + return err.errorType +} + +func NewBundleError(err error, errorType string) *BundleError { + return &BundleError{ + cause: err, + errorType: errorType, + } +} diff --git a/src/go.amzn.com/robomaker/bundle_support/bundle_provider.go b/src/go.amzn.com/robomaker/bundle_support/bundle_provider.go new file mode 100644 index 0000000..24fe110 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/bundle_provider.go @@ -0,0 +1,149 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package bundle_support + +//go:generate $MOCKGEN -destination=mock_bundle_manager.go -package=bundle_support go.amzn.com/robomaker/bundle_support BundleProvider + +import ( + "fmt" + "io" + "time" + + "github.com/aws/aws-sdk-go/service/s3/s3iface" + + "go.amzn.com/robomaker/bundle_support/archive" + "go.amzn.com/robomaker/bundle_support/bundle" + "go.amzn.com/robomaker/bundle_support/store" + "go.amzn.com/robomaker/bundle_support/stream" +) + +/* +BundleProvider's responsibility is to create the Bundle object for the application to use. +It does this by: +1. Converting the URL to the bundle to a io.ReadSeeker +2. passing the ReadSeeker to the BundleArchive to know how extract the bundle +3. return a Bundle so that the application can use it. +*/ +type BundleProvider interface { + // With the URL to the bundle, extract bundle files to the cache, and return a useable Bundle + GetBundle(url string) (bundle.Bundle, error) + + // Get the bundle from the URL, and expect the content id to match expectedContentId. + // If it doesn't match, an error is thrown and we will get nil for bundle + // NOTE: Any double quotes character in expectedContentId will be ignored. + GetVersionedBundle(url string, expectedContentId string) (bundle.Bundle, error) + + // Set a progress callback function so that it will be called when we have a progress tick + SetProgressCallback(callback ProgressCallback) + + // Set the rate at which the progress callback will be called. + SetProgressCallbackRate(rateSeconds int) + + // Set a S3 client so we can use it to download from S3 + SetS3Client(s3Client s3iface.S3API) +} + +type ProgressCallback func(percentDone float32, timeElapsed time.Duration) + +type proxyReadSeeker struct { + r io.ReadSeeker + contentLength int64 + readStartTime time.Time + lastUpdated time.Time + callback ProgressCallback + callbackRateInSeconds int +} + +type bundleProvider struct { + s3Client s3iface.S3API + bundleStore store.BundleStore + progressCallback ProgressCallback + progressCallbackRateInSeconds int +} + +func NewBundleProvider(bundleStore store.BundleStore) BundleProvider { + return &bundleProvider{ + bundleStore: bundleStore, + progressCallbackRateInSeconds: 1, + } +} + +func (b *bundleProvider) SetProgressCallback(callback ProgressCallback) { + b.progressCallback = callback +} + +func (b *bundleProvider) SetS3Client(s3Client s3iface.S3API) { + b.s3Client = s3Client +} + +func (b *bundleProvider) SetProgressCallbackRate(rateSeconds int) { + b.progressCallbackRateInSeconds = rateSeconds +} + +func (b *bundleProvider) GetVersionedBundle(url string, expectedContentId string) (bundle.Bundle, error) { + return b.getBundle(url, expectedContentId) +} + +func (b *bundleProvider) GetBundle(url string) (bundle.Bundle, error) { + return b.getBundle(url, "") +} + +func (b *bundleProvider) getBundle(url string, expectedContentId string) (bundle.Bundle, error) { + // convert our URL to a readable seekable stream + stream, contentId, contentLength, streamErr := stream.PathToStream(url, b.s3Client) + if streamErr != nil { + return nil, NewBundleError(streamErr, ERROR_TYPE_SOURCE) + } + + if expectedContentId != "" && expectedContentId != contentId { + return nil, NewBundleError(fmt.Errorf("Expected content ID [%v] does not match actual content ID [%v]", expectedContentId, contentId), ERROR_TYPE_CONTENT_ID) + } + + if b.progressCallback != nil { + stream = &proxyReadSeeker{ + r: stream, + contentLength: contentLength, + callback: b.progressCallback, + callbackRateInSeconds: b.progressCallbackRateInSeconds, + readStartTime: time.Now(), + lastUpdated: time.Now(), + } + } + + // create a bundle archive for the stream + bundleArchive, bundleArchiveErr := archive.NewBundleArchive(stream) + if bundleArchiveErr != nil { + return nil, NewBundleError(bundleArchiveErr, ERROR_TYPE_FORMAT) + } + + // ask our bundle archive to extract + bundle, extractErr := bundleArchive.Extract(b.bundleStore) + if extractErr != nil { + return nil, NewBundleError(extractErr, ERROR_TYPE_EXTRACTION) + } + + return bundle, nil +} + +func (r *proxyReadSeeker) Read(p []byte) (n int, err error) { + n, err = r.r.Read(p) + + currentPos, _ := r.Seek(0, io.SeekCurrent) + percentDone := float32((float64(currentPos) / float64(r.contentLength)) * 100) + + if time.Since(r.lastUpdated).Seconds() > float64(r.callbackRateInSeconds) || currentPos == r.contentLength { + r.callback(percentDone, time.Since(r.readStartTime)) + r.lastUpdated = time.Now() + } + + return +} + +func (r *proxyReadSeeker) Seek(offset int64, whence int) (newOffset int64, err error) { + if whence == io.SeekEnd { + r.callback(100.0, time.Since(r.readStartTime)) + } + + return r.r.Seek(offset, whence) +} diff --git a/src/go.amzn.com/robomaker/bundle_support/extractors/bundle_v1_extractor.go b/src/go.amzn.com/robomaker/bundle_support/extractors/bundle_v1_extractor.go new file mode 100644 index 0000000..4025469 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/extractors/bundle_v1_extractor.go @@ -0,0 +1,72 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package extractors + +import ( + "archive/tar" + "go.amzn.com/robomaker/bundle_support/file_system" + "io" +) + +const ( + metadataFileName = "metadata.tar" + bundleFileName = "bundle.tar" +) + +var expectedFiles = [...]string{metadataFileName, bundleFileName} + +// Knows how to extract all files from a v1 bundle +type BundleV1Extractor struct { + + // the stream where the bundle's bytes are read from + readStream io.ReadSeeker +} + +func NewBundleV1Extractor(reader io.ReadSeeker) Extractor { + return &BundleV1Extractor{ + readStream: reader, + } +} + +func (e *BundleV1Extractor) Extract(extractLocation string, fs file_system.FileSystem) error { + return e.ExtractWithTarReader(TarReaderFromStream(e.readStream), extractLocation, fs) +} + +func (e *BundleV1Extractor) ExtractWithTarReader(tarReader *tar.Reader, extractLocation string, fs file_system.FileSystem) error { + // crete the extract location if it doesn't exist + extractLocationErr := fs.MkdirAll(extractLocation, DefaultFileMode) + if extractLocationErr != nil { + return extractLocationErr + } + + // iterate headers and process each file in the tar + for { + header, err := tarReader.Next() + + if err == io.EOF { + // we there are no more headers, we finish + break + } else if err != nil { + return err + } + + // we only extract when they are expected files + if isExpectedFile(header.Name) { + extractErr := NewTarExtractor(tarReader).Extract(extractLocation, fs) + if extractErr != nil { + return extractErr + } + } + } + return nil +} + +func isExpectedFile(fileName string) bool { + for _, tarFile := range expectedFiles { + if fileName == tarFile { + return true + } + } + return false +} diff --git a/src/go.amzn.com/robomaker/bundle_support/extractors/bundle_v1_extractor_test.go b/src/go.amzn.com/robomaker/bundle_support/extractors/bundle_v1_extractor_test.go new file mode 100644 index 0000000..b31f66f --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/extractors/bundle_v1_extractor_test.go @@ -0,0 +1,24 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package extractors + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestIsExpectedFile_WhenExpectedBundle_ShouldReturnTrue(t *testing.T) { + t.Parallel() + assert.True(t, isExpectedFile("bundle.tar")) +} + +func TestIsExpectedFile_WhenExpectedMetadata_ShouldReturnTrue(t *testing.T) { + t.Parallel() + assert.True(t, isExpectedFile("metadata.tar")) +} + +func TestIsExpectedFile_WhenNotExpected_ShouldReturnFalse(t *testing.T) { + t.Parallel() + assert.False(t, isExpectedFile("unknown.txt")) +} diff --git a/src/go.amzn.com/robomaker/bundle_support/extractors/extractor.go b/src/go.amzn.com/robomaker/bundle_support/extractors/extractor.go new file mode 100644 index 0000000..9477ce4 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/extractors/extractor.go @@ -0,0 +1,40 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package extractors + +//go:generate $MOCKGEN -destination=mock_extractor.go -package=extractors go.amzn.com/robomaker/bundle_support/extractors Extractor +//go:generate $MOCKGEN -destination=mock_archiver.go -package=extractors go.amzn.com/robomaker/bundle_support/3p/archiver Archiver + +import ( + "archive/tar" + "compress/gzip" + "go.amzn.com/robomaker/bundle_support/file_system" + "io" +) + +const ( + DefaultFileMode file_system.FileMode = 0755 +) + +// Extractor's responsibility is to extract all its contents into the target extract location +type Extractor interface { + Extract(extractLocation string, fs file_system.FileSystem) error +} + +func TarReaderFromStream(inputStream io.ReadSeeker) *tar.Reader { + var tarReader *tar.Reader = nil + + // try as a gzReader + gzReader, gzErr := gzip.NewReader(inputStream) + if gzErr == nil { + // this is a valid gz file + // create the tar reader from the gzReader + tarReader = tar.NewReader(gzReader) + } else { + // it wasn't a gz file, let's try to create the tar reader from the input stream + inputStream.Seek(0, io.SeekStart) + tarReader = tar.NewReader(inputStream) + } + return tarReader +} diff --git a/src/go.amzn.com/robomaker/bundle_support/extractors/targz_extractor.go b/src/go.amzn.com/robomaker/bundle_support/extractors/targz_extractor.go new file mode 100644 index 0000000..9bcecc4 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/extractors/targz_extractor.go @@ -0,0 +1,65 @@ +package extractors + +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +import ( + "go.amzn.com/robomaker/bundle_support/3p/archiver" + "go.amzn.com/robomaker/bundle_support/file_system" + "io" +) + +// knows how to extract from a tar.gz or a tar using archiver.Archiver interface +type tarGzExtractor struct { + // the stream where the tar.gz bytes are read from + readStream io.Reader + archiverInterface archiver.Archiver +} + +func NewTarGzExtractor(reader io.Reader) Extractor { + return newTarGzExtractor(reader, archiver.TarGz) +} + +func NewTarExtractor(reader io.Reader) Extractor { + return newTarGzExtractor(reader, archiver.Tar) +} + +func newTarGzExtractor(reader io.Reader, archiverInterface archiver.Archiver) Extractor { + return &tarGzExtractor{ + readStream: reader, + archiverInterface: archiverInterface, + } +} + +func ExtractorFromFileName(reader io.Reader, fileName string) Extractor { + archiverInterface := archiver.MatchingFormat(fileName) + + if archiverInterface == nil { + return nil + } + + return &tarGzExtractor{ + readStream: reader, + archiverInterface: archiverInterface, + } +} + +func (e *tarGzExtractor) Extract(extractLocation string, fs file_system.FileSystem) error { + return e.ExtractWithArchiver(extractLocation, fs, e.archiverInterface) +} + +func (e *tarGzExtractor) ExtractWithArchiver(extractLocation string, fs file_system.FileSystem, archiverInterface archiver.Archiver) error { + // crete the extract location if it doesn't exist + extractLocationErr := fs.MkdirAll(extractLocation, DefaultFileMode) + if extractLocationErr != nil { + return extractLocationErr + } + + // Now, extract the bytes + extractErr := archiverInterface.Read(e.readStream, extractLocation) + if extractErr != nil { + return extractErr + } + + return nil +} diff --git a/src/go.amzn.com/robomaker/bundle_support/extractors/targz_extractor_test.go b/src/go.amzn.com/robomaker/bundle_support/extractors/targz_extractor_test.go new file mode 100644 index 0000000..391d0d6 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/extractors/targz_extractor_test.go @@ -0,0 +1,73 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package extractors + +import ( + "errors" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "go.amzn.com/robomaker/bundle_support/file_system" + "testing" +) + +const ( + extractLocation = "/extractLocation" + expectedFileMode file_system.FileMode = 0755 +) + +func TestTarGzExtractor_Extract_WithNoErrors_ShouldExtract(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockArchiver := NewMockArchiver(ctrl) + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + mockFileSystem.EXPECT().MkdirAll(extractLocation, expectedFileMode).Return(nil) + mockArchiver.EXPECT().Read(nil, extractLocation).Return(nil) + + extractor := tarGzExtractor{} + extractErr := extractor.ExtractWithArchiver(extractLocation, mockFileSystem, mockArchiver) + + assert.Nil(t, extractErr) +} + +func TestTarGzExtractor_Extract_WithMkDirAllErrors_ShouldErrorAndNotExtract(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockArchiver := NewMockArchiver(ctrl) + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + mkdirAllError := errors.New("MkDirAll Error") + + mockFileSystem.EXPECT().MkdirAll(extractLocation, expectedFileMode).Return(mkdirAllError) + + extractor := tarGzExtractor{} + extractErr := extractor.ExtractWithArchiver(extractLocation, mockFileSystem, mockArchiver) + + assert.NotNil(t, extractErr) + assert.Equal(t, mkdirAllError, extractErr) +} + +func TestTarGzExtractor_Extract_WithExtractErrors_ShouldError(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockArchiver := NewMockArchiver(ctrl) + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + tarGzErr := errors.New("tarGzErr") + + mockFileSystem.EXPECT().MkdirAll(extractLocation, expectedFileMode).Return(nil) + mockArchiver.EXPECT().Read(nil, extractLocation).Return(tarGzErr) + + extractor := tarGzExtractor{} + extractErr := extractor.ExtractWithArchiver(extractLocation, mockFileSystem, mockArchiver) + + assert.NotNil(t, extractErr) + assert.Equal(t, tarGzErr, extractErr) +} diff --git a/src/go.amzn.com/robomaker/bundle_support/file_system/file_system.go b/src/go.amzn.com/robomaker/bundle_support/file_system/file_system.go new file mode 100644 index 0000000..b43a097 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/file_system/file_system.go @@ -0,0 +1,65 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package file_system + +//go:generate $MOCKGEN -destination=mock_file_system.go -package=file_system go.amzn.com/robomaker/bundle_support/file_system FileSystem +//go:generate $MOCKGEN -destination=mock_file.go -package=file_system go.amzn.com/robomaker/bundle_support/file_system File +//go:generate $MOCKGEN -destination=mock_file_info.go -package=file_system go.amzn.com/robomaker/bundle_support/file_system FileInfo + +import ( + "io" + "io/ioutil" + "os" + "time" +) + +type FileSystem interface { + NewFile(fd uintptr, name string) File + Create(name string) (File, error) + Open(name string) (File, error) + Stat(name string) (FileInfo, error) + RemoveAll(name string) error + MkdirAll(name string, mode FileMode) error + ReadFile(filename string) ([]byte, error) + WriteFile(filename string, data []byte, mode FileMode) error +} + +type File interface { + io.Closer + io.Reader + io.ReaderAt + io.Seeker + io.Writer + io.WriterAt + Stat() (os.FileInfo, error) +} + +type FileInfo interface { + Name() string // base name of the file + Size() int64 // length in bytes for regular files; system-dependent for others + Mode() os.FileMode // file mode bits + ModTime() time.Time // modification time + IsDir() bool // abbreviation for Mode().IsDir() + Sys() interface{} // underlying data source (can return nil) +} + +type FileMode os.FileMode + +// osFS implements FileSystem using the local disk. +type osFS struct{} + +func (osFS) NewFile(fd uintptr, name string) File { return os.NewFile(fd, name) } +func (osFS) Create(name string) (File, error) { return os.Create(name) } +func (osFS) Open(name string) (File, error) { return os.Open(name) } +func (osFS) Stat(name string) (FileInfo, error) { return os.Stat(name) } +func (osFS) RemoveAll(name string) error { return os.RemoveAll(name) } +func (osFS) MkdirAll(name string, mode FileMode) error { return os.MkdirAll(name, os.FileMode(mode)) } +func (osFS) ReadFile(filename string) ([]byte, error) { return ioutil.ReadFile(filename) } +func (osFS) WriteFile(filename string, data []byte, mode FileMode) error { + return ioutil.WriteFile(filename, data, os.FileMode(mode)) +} + +func NewLocalFS() FileSystem { + return &osFS{} +} diff --git a/src/go.amzn.com/robomaker/bundle_support/s3/s3_reader.go b/src/go.amzn.com/robomaker/bundle_support/s3/s3_reader.go new file mode 100644 index 0000000..37963eb --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/s3/s3_reader.go @@ -0,0 +1,216 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package s3 + +//go:generate $MOCKGEN -destination=mock_s3.go -package=s3 github.com/aws/aws-sdk-go/service/s3/s3iface S3API + +import ( + "fmt" + "io" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" +) + +type S3ReaderConfig struct { + NumRetries int + RetryWait time.Duration + BufferSize int64 +} + +func NewS3ReaderConfig() S3ReaderConfig { + //Retry up to 20 seconds at 5 second intervals + return S3ReaderConfig{ + NumRetries: 4, + RetryWait: time.Duration(5) * time.Second, + BufferSize: 5 * 1024 * 1024, //5MB, + } +} + +//Implements io.ReadSeeker +type S3Reader struct { + config S3ReaderConfig + buffer []byte + bufferStart int64 + bufferEnd int64 + bucket string + key string + s3 s3iface.S3API + offset int64 + ContentLength int64 + Etag string +} + +func newS3Reader(s3Api s3iface.S3API, bucket string, key string, contentLength int64, etag string, config S3ReaderConfig) *S3Reader { + return &S3Reader{ + buffer: make([]byte, config.BufferSize), + bufferStart: 0, + bufferEnd: 0, + bucket: bucket, + key: key, + s3: s3Api, + offset: 0, + ContentLength: contentLength, + Etag: etag, + config: config, + } +} + +type S3ReadError struct { + err error +} + +func (e *S3ReadError) Error() string { + return e.err.Error() +} + +func NewS3Reader(s3Api s3iface.S3API, bucket string, key string) (*S3Reader, error) { + return NewS3ReaderWithConfig(s3Api, bucket, key, NewS3ReaderConfig()) +} + +func NewS3ReaderWithConfig(s3Api s3iface.S3API, bucket string, key string, config S3ReaderConfig) (*S3Reader, error) { + resp, err := s3Api.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + + if err != nil { + return nil, err + } + + return newS3Reader(s3Api, bucket, key, *resp.ContentLength, *resp.ETag, config), nil +} + +/* + * Read up to len(p) bytes by peforming a Ranged Get request. + * This is useful for large objects and/or spotty connections, + * where connection issues may occur when reading from the Body + */ +func (r *S3Reader) Read(p []byte) (n int, err error) { + //Retry to handle dropped / spotty connections + //AWS SDK retry strategy will only handle failed API calls, but not failed reads on the underlying stream + //AWS SDK will also not retry on client errors, e.g. no network connection is present + for i := r.config.NumRetries; i >= 0; i-- { + n, err = r.read(p) + + //Retry on all request errors - worst case this adds 20 seconds to the deployment + shouldRetry := false + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == "RequestError" { + shouldRetry = true + } + } + + if _, ok := err.(*S3ReadError); ok { + shouldRetry = true + } + + if shouldRetry { + fmt.Printf("Error in S3Reader.Read: (%v). Retrying...\n", err) + time.Sleep(r.config.RetryWait) + continue + } + + break + } + + return +} + +func (r *S3Reader) read(p []byte) (n int, err error) { + if r.bufferEnd != 0 && r.bufferStart != r.bufferEnd { + return r.copyInto(p) + } + + r.resetBuffer() + + bufferBytes := int64(len(r.buffer)) + bytesLeft := r.ContentLength - r.offset + bytesToRead := min(bufferBytes, bytesLeft) + + resp, getObjectErr := r.s3.GetObjectWithContext( + aws.BackgroundContext(), + &s3.GetObjectInput{ + Bucket: aws.String(r.bucket), + Key: aws.String(r.key), + IfMatch: aws.String(r.Etag), + Range: aws.String(fmt.Sprintf("bytes=%v-%v", r.offset, r.offset+bytesToRead-1)), + }, + request.WithResponseReadTimeout(5*time.Second)) //Sets a timeout on the underlying Body.Read() calls. By default, there is no timeout on this read + + if getObjectErr != nil { + return 0, getObjectErr + } + + defer resp.Body.Close() + for { + bytesRead, readErr := resp.Body.Read(r.buffer[r.bufferEnd:]) + r.bufferEnd += int64(bytesRead) + + if readErr == io.EOF { + break + } + + if readErr != nil { + return n, &S3ReadError{readErr} + } + } + + return r.copyInto(p) +} + +func (r *S3Reader) copyInto(p []byte) (n int, err error) { + n = copy(p, r.buffer[r.bufferStart:r.bufferEnd]) + r.bufferStart += int64(n) + + r.offset += int64(n) + if r.offset == r.ContentLength { + return n, io.EOF + } + + return n, nil +} + +func (r *S3Reader) resetBuffer() { + r.bufferStart = 0 + r.bufferEnd = 0 +} + +func (r *S3Reader) Seek(offset int64, whence int) (newOffset int64, err error) { + oldPos := r.offset + + switch whence { + default: + return 0, fmt.Errorf("Seek: invalid whence %v", whence) + case io.SeekStart: + r.offset = offset + case io.SeekCurrent: + r.offset += offset + case io.SeekEnd: + r.offset = r.ContentLength + } + + if r.offset > r.ContentLength { + r.offset = r.ContentLength + } + + //Reset buffer when seeking + //Special case: Dont reset if the position hasn't changed + if oldPos != r.offset { + r.resetBuffer() + } + + return r.offset, nil +} + +func min(x, y int64) int64 { + if x > y { + return y + } + return x +} diff --git a/src/go.amzn.com/robomaker/bundle_support/s3/s3_reader_test.go b/src/go.amzn.com/robomaker/bundle_support/s3/s3_reader_test.go new file mode 100644 index 0000000..c538cd3 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/s3/s3_reader_test.go @@ -0,0 +1,194 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package s3 + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +const ( + testBucket string = "testBucket" + testKey string = "testString" + testEtag string = "testEtag" + testBodyContent string = "hello world" +) + +type errReader struct { +} + +func (e *errReader) Read(p []byte) (n int, err error) { + return 0, errors.New("Read failed!") +} + +func setupS3MockExpects(mockS3Client *MockS3API) { + mockS3Client.EXPECT().HeadObject(gomock.Any()).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(int64(len(testBodyContent))), + ETag: aws.String(testEtag), + }, nil).Times(1) + + mockS3Client.EXPECT().GetObjectWithContext(gomock.Any(), gomock.Any(), gomock.Any()).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(strings.NewReader(testBodyContent[:2])), + }, nil).Times(1) +} + +func TestS3Reader_Read_ReadFails_Retries(t *testing.T) { + const message = "Yo" + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockS3Client := NewMockS3API(ctrl) + mockS3Client.EXPECT().HeadObject(gomock.Any()).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(int64(len(testBodyContent))), + ETag: aws.String(testEtag), + }, nil).Times(1) + + mockS3Client.EXPECT().GetObjectWithContext(gomock.Any(), gomock.Any(), gomock.Any()).Return( + &s3.GetObjectOutput{ + Body: ioutil.NopCloser(&errReader{}), + }, nil).Times(3) + + mockS3Client.EXPECT().GetObjectWithContext(gomock.Any(), gomock.Any(), gomock.Any()).Return( + &s3.GetObjectOutput{ + Body: ioutil.NopCloser(strings.NewReader(message)), + }, nil).Times(1) + + config := NewS3ReaderConfig() + config.RetryWait = 1 * time.Nanosecond + config.NumRetries = 3 + s3Reader, _ := NewS3ReaderWithConfig(mockS3Client, testBucket, testKey, config) + + content := make([]byte, 2) + _, err := s3Reader.Read(content) + assert.True(t, err == nil) + assert.Equal(t, message, string(content)) +} + +func TestS3Reader_Read_ReadFails_ExhaustsAllRetries(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockS3Client := NewMockS3API(ctrl) + mockS3Client.EXPECT().HeadObject(gomock.Any()).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(int64(len(testBodyContent))), + ETag: aws.String(testEtag), + }, nil).Times(1) + + mockS3Client.EXPECT().GetObjectWithContext(gomock.Any(), gomock.Any(), gomock.Any()).Return( + &s3.GetObjectOutput{ + Body: ioutil.NopCloser(&errReader{}), + }, nil).Times(4) + + config := NewS3ReaderConfig() + config.RetryWait = 1 * time.Nanosecond + config.NumRetries = 3 + s3Reader, _ := NewS3ReaderWithConfig(mockS3Client, testBucket, testKey, config) + + _, err := s3Reader.Read(make([]byte, 1)) + assert.True(t, err != nil) +} + +func TestS3Reader_Read_CallFails_ReturnsError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockS3Client := NewMockS3API(ctrl) + mockS3Client.EXPECT().HeadObject(gomock.Any()).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(int64(len(testBodyContent))), + ETag: aws.String(testEtag), + }, nil).Times(1) + + mockS3Client.EXPECT().GetObjectWithContext(gomock.Any(), gomock.Any(), gomock.Any()).Return( + &s3.GetObjectOutput{}, fmt.Errorf("Call failed")).Times(1) + + s3Reader, _ := NewS3Reader(mockS3Client, testBucket, testKey) + + _, err := s3Reader.Read(make([]byte, 1)) + assert.True(t, err != nil) +} + +func TestS3Reader_Read_ShouldBuffer(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockS3Client := NewMockS3API(ctrl) + setupS3MockExpects(mockS3Client) + + mockS3Client.EXPECT().GetObjectWithContext(gomock.Any(), gomock.Any(), gomock.Any()).Return(&s3.GetObjectOutput{ + Body: ioutil.NopCloser(strings.NewReader(testBodyContent[2:4])), + }, nil).Times(1) + + config := NewS3ReaderConfig() + config.BufferSize = 2 + s3Reader, _ := NewS3ReaderWithConfig(mockS3Client, testBucket, testKey, config) + + //First call will read from s3 since the buffer is empty. + //Second call should read from internal buffer + //Third call should read from s3 again + s3Reader.Read(make([]byte, 1)) + s3Reader.Read(make([]byte, 1)) + s3Reader.Read(make([]byte, 1)) +} + +func TestS3Reader_Seek_ResetsBuffer(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockS3Client := NewMockS3API(ctrl) + setupS3MockExpects(mockS3Client) + s3Reader, _ := NewS3Reader(mockS3Client, testBucket, testKey) + + //Read to fill the buffer + s3Reader.Read(make([]byte, 1)) + assert.True(t, s3Reader.bufferEnd != 0) + + s3Reader.Seek(1, io.SeekCurrent) + assert.True(t, s3Reader.bufferEnd == 0) +} + +func TestS3Reader_SeekNoop_DoesNotResetBuffer(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockS3Client := NewMockS3API(ctrl) + setupS3MockExpects(mockS3Client) + s3Reader, _ := NewS3Reader(mockS3Client, testBucket, testKey) + + //Read to fill the buffer + s3Reader.Read(make([]byte, 1)) + assert.True(t, s3Reader.bufferEnd != 0) + + s3Reader.Seek(0, io.SeekCurrent) + assert.True(t, s3Reader.bufferEnd != 0) +} + +func TestS3Reader_Seek_SeeksToCorrectPosition(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockS3Client := NewMockS3API(ctrl) + mockS3Client.EXPECT().HeadObject(gomock.Any()).Return(&s3.HeadObjectOutput{ + ContentLength: aws.Int64(int64(len(testBodyContent))), + ETag: aws.String(testEtag), + }, nil).Times(1) + s3Reader, _ := NewS3Reader(mockS3Client, testBucket, testKey) + + s3Reader.Seek(5, io.SeekCurrent) + assert.True(t, s3Reader.offset == 5) + + s3Reader.Seek(1, io.SeekCurrent) + assert.True(t, s3Reader.offset == 6) + + s3Reader.Seek(0, io.SeekStart) + assert.True(t, s3Reader.offset == 0) + + s3Reader.Seek(0, io.SeekEnd) + assert.True(t, s3Reader.offset == s3Reader.ContentLength) + + s3Reader.Seek(100000, io.SeekCurrent) + assert.True(t, s3Reader.offset == s3Reader.ContentLength) +} diff --git a/src/go.amzn.com/robomaker/bundle_support/store/bundle_store.go b/src/go.amzn.com/robomaker/bundle_support/store/bundle_store.go new file mode 100644 index 0000000..6af824f --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/store/bundle_store.go @@ -0,0 +1,238 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package store + +//go:generate $MOCKGEN -destination=mock_bundle_store.go -package=store go.amzn.com/robomaker/bundle_support/store BundleStore + +import ( + "fmt" + "go.amzn.com/robomaker/bundle_support/extractors" + "go.amzn.com/robomaker/bundle_support/file_system" + "os" + "path/filepath" + "sync" +) + +// BundleStore's responsibility is to manage bundle files on disk. +// Every entry in the storeItems is a directory containing extracted files that bundle uses. +// The key is a key that uniquely identifies the group of extracted files. +// Each entry can be a versioned sub-part or a versioned full part of a bundle. +type BundleStore interface { + + // Put a key into the storeItems, with it's corresponding contents. + // If a key already exists, we ignore the Put command, in order to prevent double work. + // A reader is passed in, so that BundleStore can write the contents, and extract to disk. + // an extractor is passed in so that bundle can call the extractor to extract + // + // By default, a Put will set the item to "protected" + // + // Returns: + // result for GetPath for this item that is put. + // error if there are extract errors + Put(key string, extractor extractors.Extractor) (string, error) + + // Load existing keys into memory from disk. + // The initial key load into memory refcount is 0 + // Return: + // error if key doesn't exist on disk + Load(keys []string) error + + // Given a key, get the root path to the extracted files. + // An empty string "" is returned if the key doesn't exist. + GetPath(key string) string + + // Given a key, does it exist in the storeItems? + Exists(key string) bool + + // Return the root path of the store + RootPath() string + + // Get keys from in use (refcount > 0) storeItems + GetInUseItemKeys() []string + + // Tell the store that we're done with this item + Release(key string) error + + // Deletes storage space of items that are unreferenced + Cleanup() +} + +func NewSimpleStore(rootPath string, fileSystem file_system.FileSystem) BundleStore { + return &simpleStore{ + rootPath: rootPath, + storeItems: make(map[string]storeItem), + fileSystem: fileSystem, + } +} + +// Store Item records a key that has been put into the store. +// protected: boolean. Set to true when we first put into the storeItems. Able to set to false by API. +// when cleaning up the storeItems, items with protected set to true will not be cleaned up. +type storeItem struct { + key string + refCount int + pathToItem string +} + +type simpleStore struct { + rootPath string + storeItems map[string]storeItem + fileSystem file_system.FileSystem + mutex sync.Mutex +} + +func (s *simpleStore) Load(keys []string) error { + // ensure that Load is an atomic operation + s.mutex.Lock() + defer s.mutex.Unlock() + + for _, key := range keys { + if _, exists := s.storeItems[key]; exists { + continue + } + + itemPath, err := s.getPathToItemAndExistCheck(key) + + if err != nil { + return err + } + + // create a storeItem from this, 0 refcount from initial + newItem := storeItem{ + key: key, + refCount: 0, + pathToItem: itemPath, + } + s.storeItems[key] = newItem + } + return nil +} + +func (s *simpleStore) Put(key string, extractor extractors.Extractor) (string, error) { + // ensure that Put is an atomic operation + s.mutex.Lock() + defer s.mutex.Unlock() + + // there already exists an item, don't extract + if item, exists := s.storeItems[key]; exists { + // increment the item's refcount + item.refCount++ + s.storeItems[key] = item + return item.pathToItem, nil + } + + // figure location to extract the files to and make the dir + itemPath := s.getPathToItem(key) + + // create a storeItem from this + newItem := storeItem{ + key: key, + refCount: 1, + pathToItem: itemPath, + } + + // now try to extract to the destination path + extractErr := extractor.Extract(itemPath, s.fileSystem) + if extractErr != nil { + return "", extractErr + } + + // no error, let's add it to the storeItems + s.storeItems[key] = newItem + return itemPath, nil +} + +func (s *simpleStore) GetPath(key string) string { + // ensure that GetPath is an atomic operation + s.mutex.Lock() + defer s.mutex.Unlock() + + item, exists := s.storeItems[key] + + if !exists { + return "" + } + + return item.pathToItem +} + +func (s *simpleStore) Exists(key string) bool { + // ensure that Exists is an atomic operation + s.mutex.Lock() + defer s.mutex.Unlock() + + _, exists := s.storeItems[key] + return exists +} + +func (s *simpleStore) RootPath() string { + return s.rootPath +} + +// Internally, since we're using refCount, Release will decrement the refCount by 1 +func (s *simpleStore) Release(key string) error { + // ensure that Release is an atomic operation + s.mutex.Lock() + defer s.mutex.Unlock() + + item, exists := s.storeItems[key] + + if !exists { + return fmt.Errorf("key: %s does not exist", key) + } + + item.refCount-- + + // put the item back into the storeItems + s.storeItems[key] = item + return nil +} + +func (s *simpleStore) Cleanup() { + // ensure that Cleanup is an atomic operation + s.mutex.Lock() + defer s.mutex.Unlock() + + // iterate all keys in our map, and only delete unprotected + var unreferencedItems []storeItem + for _, item := range s.storeItems { + if item.refCount < 1 { + unreferencedItems = append(unreferencedItems, item) + } + } + + // now, delete the unprotected items + for _, item := range unreferencedItems { + s.fileSystem.RemoveAll(item.pathToItem) + delete(s.storeItems, item.key) + } +} + +func (s *simpleStore) GetInUseItemKeys() []string { + // ensure that GetInUseItemKeys is an atomic operation + s.mutex.Lock() + defer s.mutex.Unlock() + + var inUseKeys []string + for _, item := range s.storeItems { + if item.refCount > 0 { + inUseKeys = append(inUseKeys, item.key) + } + } + + return inUseKeys +} + +func (s *simpleStore) getPathToItem(itemKey string) string { + return filepath.Join(s.rootPath, itemKey) +} + +func (s *simpleStore) getPathToItemAndExistCheck(itemKey string) (string, error) { + + itemPath := filepath.Join(s.rootPath, itemKey) + if _, err := s.fileSystem.Stat(itemPath); os.IsNotExist(err) { + return "", fmt.Errorf("itemPath: %s does not exist", itemPath) + } + return itemPath, nil +} diff --git a/src/go.amzn.com/robomaker/bundle_support/store/bundle_store_test.go b/src/go.amzn.com/robomaker/bundle_support/store/bundle_store_test.go new file mode 100644 index 0000000..d4d85e8 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/store/bundle_store_test.go @@ -0,0 +1,319 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package store + +import ( + "errors" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "go.amzn.com/robomaker/bundle_support/extractors" + "go.amzn.com/robomaker/bundle_support/file_system" + "os" + "path/filepath" + "testing" +) + +const ( + cacheRootPath = "/rootPath" + sha256First = "1" + sha256Second = "2" + sha256Third = "3" + expectedExtractLocationForFirst = "/rootPath/1" +) + +func TestSimpleStore_Put_WithValidItem_ShouldPut(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockExtractor := extractors.NewMockExtractor(ctrl) + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + // assert that this is called only once + mockExtractor.EXPECT().Extract(expectedExtractLocationForFirst, mockFileSystem).Return(nil).Times(1) + + internalCache := make(map[string]storeItem) + bundleStore := simpleStore{ + rootPath: cacheRootPath, + storeItems: internalCache, + fileSystem: mockFileSystem, + } + + // assert doesn't exist in the storeItems + assert.False(t, bundleStore.Exists(sha256First)) + + // put + putPath, putError := bundleStore.Put(sha256First, mockExtractor) + + item, exist := internalCache[sha256First] + + assert.Nil(t, putError) + assert.True(t, bundleStore.Exists(sha256First)) + assert.Equal(t, expectedExtractLocationForFirst, putPath) + assert.True(t, exist) + assert.Equal(t, 1, item.refCount) + + // put a second time, and assert that no extraction is called + secondPutPath, secondPutErr := bundleStore.Put(sha256First, mockExtractor) + + item2nd, exist := internalCache[sha256First] + + assert.Nil(t, secondPutErr) + // assert it STILL exists + assert.True(t, bundleStore.Exists(sha256First)) + assert.Equal(t, expectedExtractLocationForFirst, secondPutPath) + assert.True(t, exist) + assert.Equal(t, 2, item2nd.refCount) +} + +func TestSimpleStore_Put_WithExtractError_ShouldNotPut(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockExtractor := extractors.NewMockExtractor(ctrl) + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + extractError := errors.New("Extraction Error") + + // Return an extraction error + mockExtractor.EXPECT().Extract(expectedExtractLocationForFirst, mockFileSystem).Return(extractError).Times(1) + + bundleStore := NewSimpleStore(cacheRootPath, mockFileSystem) + + // assert doesn't exist in the storeItems + assert.False(t, bundleStore.Exists(sha256First)) + + // put + putPath, putError := bundleStore.Put(sha256First, mockExtractor) + + assert.NotNil(t, putError) + assert.False(t, bundleStore.Exists(sha256First)) + assert.Equal(t, "", putPath) +} + +func TestSimpleStore_Load_WhenKeyDoesNotExist_ShouldLoad(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + mockFileSystem.EXPECT().Stat(gomock.Any()).Return(nil, nil).Times(1) + + internalCache := make(map[string]storeItem) + bundleStore := simpleStore{ + rootPath: cacheRootPath, + storeItems: internalCache, + fileSystem: mockFileSystem, + } + // assert doesn't exist in the storeItems + assert.False(t, bundleStore.Exists(sha256First)) + + loadError := bundleStore.Load([]string{sha256First}) + + assert.Nil(t, loadError) + assert.True(t, bundleStore.Exists(sha256First)) + assert.Equal(t, 0, internalCache[sha256First].refCount) +} + +func TestSimpleStore_Load_WhenKeyExist_ShouldLoad(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + mockFileSystem.EXPECT().Stat(gomock.Any()).Return(nil, nil).Times(3) + + internalCache := make(map[string]storeItem) + bundleStore := simpleStore{ + rootPath: cacheRootPath, + storeItems: internalCache, + fileSystem: mockFileSystem, + } + + bundleStore.Load([]string{sha256First, sha256Second, sha256Third}) + assert.True(t, bundleStore.Exists(sha256First)) + assert.True(t, bundleStore.Exists(sha256Second)) + assert.True(t, bundleStore.Exists(sha256Third)) + + loadError := bundleStore.Load([]string{sha256First}) + assert.Nil(t, loadError) +} + +func TestSimpleStore_Load_WhenKeyNotExistOnDisk_ShouldNotLoad(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + mockFileSystem.EXPECT().Stat(gomock.Any()).Return(nil, os.ErrNotExist).Times(1) + + internalCache := make(map[string]storeItem) + bundleStore := simpleStore{ + rootPath: cacheRootPath, + storeItems: internalCache, + fileSystem: mockFileSystem, + } + + loadError := bundleStore.Load([]string{sha256First}) + assert.NotNil(t, loadError) +} + +func TestSimpleStore_Get_WhenDoesNotExist_ShouldReturnEmptyString(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + bundleStore := NewSimpleStore(cacheRootPath, mockFileSystem) + + assert.Equal(t, "", bundleStore.GetPath(sha256First)) +} + +func TestSimpleStore_Get_WhenExist_ShouldReturnExpectedPath(t *testing.T) { + t.Parallel() + item := storeItem{ + key: sha256First, + refCount: 1, + pathToItem: filepath.Join(cacheRootPath, sha256First), + } + internalCache := make(map[string]storeItem) + internalCache[sha256First] = item + + bundleStore := simpleStore{ + rootPath: cacheRootPath, + storeItems: internalCache, + } + + assert.Equal(t, expectedExtractLocationForFirst, bundleStore.GetPath(sha256First)) +} + +func TestSimpleStore_GetInUseItemKeys_ShouldReturnInUseKeyOnly(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + item_notInUse := storeItem{ + key: sha256First, + refCount: 0, + pathToItem: filepath.Join(cacheRootPath, sha256First), + } + + item_InUse := storeItem{ + key: sha256Second, + refCount: 1, + pathToItem: filepath.Join(cacheRootPath, sha256Second), + } + + internalCache := make(map[string]storeItem) + internalCache[sha256First] = item_notInUse + internalCache[sha256Second] = item_InUse + + bundleStore := simpleStore{ + rootPath: cacheRootPath, + storeItems: internalCache, + fileSystem: mockFileSystem, + } + + keysInUse := bundleStore.GetInUseItemKeys() + + assert.Contains(t, keysInUse, sha256Second) + assert.NotContains(t, keysInUse, sha256First) +} + +func TestSimpleStore_RootPath_ReturnsRootPath(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + rootPath := "testRootPath" + bundleStore := NewSimpleStore(rootPath, mockFileSystem) + + assert.Equal(t, rootPath, bundleStore.RootPath()) +} + +func TestSimpleStore_Release_KeyNotFound_ShouldReturnError(t *testing.T) { + t.Parallel() + bundleStore := simpleStore{ + rootPath: cacheRootPath, + storeItems: make(map[string]storeItem), + } + + err := bundleStore.Release(sha256First) + + assert.NotNil(t, err) +} + +func TestSimpleStore_Release_KeyFound_ShouldRelease(t *testing.T) { + t.Parallel() + existingRefCount := 4 + + item1 := storeItem{ + key: sha256First, + refCount: existingRefCount, + pathToItem: filepath.Join(cacheRootPath, sha256First), + } + internalCache := make(map[string]storeItem) + internalCache[sha256First] = item1 + + bundleStore := simpleStore{ + rootPath: cacheRootPath, + storeItems: internalCache, + } + + err := bundleStore.Release(sha256First) + + item, exists := internalCache[sha256First] + + assert.True(t, exists) + assert.Equal(t, existingRefCount-1, item.refCount) + assert.Nil(t, err) +} + +func TestSimpleStore_Cleanup_ShouldCleanupOnlyUnprotectedItems(t *testing.T) { + t.Parallel() + item1 := storeItem{ + key: sha256First, + refCount: 1, + pathToItem: filepath.Join(cacheRootPath, sha256First), + } + item2 := storeItem{ + key: sha256Second, + refCount: 0, + pathToItem: filepath.Join(cacheRootPath, sha256Second), + } + item3 := storeItem{ + key: sha256Third, + refCount: 0, + pathToItem: filepath.Join(cacheRootPath, sha256Third), + } + internalCache := make(map[string]storeItem) + internalCache[sha256First] = item1 + internalCache[sha256Second] = item2 + internalCache[sha256Third] = item3 + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + mockFileSystem.EXPECT().RemoveAll(item2.pathToItem) + mockFileSystem.EXPECT().RemoveAll(item3.pathToItem) + + bundleStore := simpleStore{ + rootPath: cacheRootPath, + storeItems: internalCache, + fileSystem: mockFileSystem, + } + + bundleStore.Cleanup() +} diff --git a/src/go.amzn.com/robomaker/bundle_support/stream/path_to_stream.go b/src/go.amzn.com/robomaker/bundle_support/stream/path_to_stream.go new file mode 100644 index 0000000..efa14d6 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/stream/path_to_stream.go @@ -0,0 +1,139 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package stream + +//go:generate $MOCKGEN -destination=mock_s3_client.go -package=stream github.com/aws/aws-sdk-go/service/s3/s3iface S3API + +import ( + "crypto/md5" + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + awsS3 "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "go.amzn.com/robomaker/bundle_support/file_system" + "go.amzn.com/robomaker/bundle_support/s3" + "io" + "os" + "regexp" + "strings" +) + +const ( + s3Prefix = "s3://" //Looks like: s3:/// + s3HttpsPrefix = "https://s3" //Looks like: https://s3-.amazonaws.com// + httpPrefix = "http://" + httpsPrefix = "https://" +) + +type md5Func func(filePath string, fileSystem file_system.FileSystem) (string, error) + +// PathToStream converts a URL (currently supports: S3 URL and local file system) +// into a io.ReadSeeker for reading and seeking the contents +// return: +// 1. the io.ReadSeeker +// 2. a hash of the file -- for S3, etag. for local file, md5sum +// 3. the length of the stream in bytes +// 4. error if any +func PathToStream(url string, s3Client s3iface.S3API) (io.ReadSeeker, string, int64, error) { + return pathToStream(url, s3Client, file_system.NewLocalFS(), md5SumFile) +} + +func pathToStream(url string, s3Client s3iface.S3API, fileSystem file_system.FileSystem, computeMd5 md5Func) (io.ReadSeeker, string, int64, error) { + // is this s3? + if strings.HasPrefix(url, s3Prefix) || strings.HasPrefix(url, s3HttpsPrefix) { + region, bucket, key, err := parseS3Url(url) + if err != nil { + return nil, "", 0, err + } + return pathToS3Stream(region, bucket, key, s3Client) + } + + if strings.HasPrefix(url, httpsPrefix) || strings.HasPrefix(url, httpPrefix) { + return nil, "", 0, fmt.Errorf("http/s url not yet supported") + } + + // Assume this is local file + return fileToStream(url, fileSystem, computeMd5) +} + +func fileToStream(filePath string, fileSystem file_system.FileSystem, computeMd5 md5Func) (io.ReadSeeker, string, int64, error) { + // compute md5sum + md5Sum, md5Err := computeMd5(filePath, file_system.NewLocalFS()) + if md5Err != nil { + return nil, "", 0, md5Err + } + + file, openErr := fileSystem.Open(filePath) + if openErr != nil { + return nil, "", 0, openErr + } + + fileInfo, statErr := file.Stat() + if statErr != nil { + return nil, "", 0, statErr + } + + return file, md5Sum, fileInfo.Size(), nil +} + +func pathToS3Stream(region, bucket, key string, s3Client s3iface.S3API) (io.ReadSeeker, string, int64, error) { + var svc s3iface.S3API + + // if we're given a s3 client, use it + if s3Client != nil { + svc = s3Client + } else { + // else, try to create a s3 client + if region == "" { + region = os.Getenv("REGION") + } + + if region == "" { + return nil, "", 0, fmt.Errorf("Could not determine region for s3 bundle") + } + sess := session.Must(session.NewSession(&aws.Config{ + Region: aws.String(region), + })) + svc = awsS3.New(sess) + } + + s3Reader, err := s3.NewS3Reader(svc, bucket, key) + if err != nil { + return nil, "", 0, err + } + + return s3Reader, s3Reader.Etag, s3Reader.ContentLength, nil +} + +func md5SumFile(filePath string, fileSystem file_system.FileSystem) (string, error) { + file, openErr := fileSystem.Open(filePath) + if openErr != nil { + return "", openErr + } + defer file.Close() + + hash := md5.New() + + _, hashErr := io.Copy(hash, file) + if hashErr != nil { + return "", hashErr + } + + return fmt.Sprintf("%x", hash.Sum(nil)), nil +} + +func parseS3Url(s3Url string) (region string, bucket string, key string, err error) { + r1 := regexp.MustCompile(`https://s3-(.*)\.amazonaws\.com/(.*?)/(.*)`) + r2 := regexp.MustCompile(`s3://(.*?)/(.*)`) + result1 := r1.FindStringSubmatch(s3Url) + result2 := r2.FindStringSubmatch(s3Url) + if result1 != nil { + return result1[1], result1[2], result1[3], nil + } else if result2 != nil { + return "", result2[1], result2[2], nil + } + + return "", "", "", fmt.Errorf("Url %v is not a valid s3 url", s3Url) +} diff --git a/src/go.amzn.com/robomaker/bundle_support/stream/path_to_stream_test.go b/src/go.amzn.com/robomaker/bundle_support/stream/path_to_stream_test.go new file mode 100644 index 0000000..c881283 --- /dev/null +++ b/src/go.amzn.com/robomaker/bundle_support/stream/path_to_stream_test.go @@ -0,0 +1,214 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package stream + +import ( + "fmt" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "go.amzn.com/robomaker/bundle_support/file_system" + "testing" +) + +type ofHeadObjectInput struct { + Bucket string + Key string +} + +func OfHeadObjectInput(bucket string, key string) gomock.Matcher { + return &ofHeadObjectInput{Bucket: bucket, Key: key} +} + +func (o *ofHeadObjectInput) Matches(x interface{}) bool { + that, ok := x.(*s3.HeadObjectInput) + if !ok { + return false + } + return *that.Bucket == o.Bucket && *that.Key == o.Key +} + +func (o *ofHeadObjectInput) String() string { + return "Bucket: " + o.Bucket + " Key: " + o.Key +} + +func TestPathToStream_WithLocalFile_ShouldReturnStreamAndMd5(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + expectedMd5 := "12345" + filePath := "/test/stream" + + mockS3Client := NewMockS3API(ctrl) + mockFile := file_system.NewMockFile(ctrl) + mockFileSystem := file_system.NewMockFileSystem(ctrl) + mockFileInfo := file_system.NewMockFileInfo(ctrl) + mockFileSystem.EXPECT().Open(filePath).Return(mockFile, nil) + mockFile.EXPECT().Stat().Return(mockFileInfo, nil) + mockFileInfo.EXPECT().Size().Return(int64(1)) + + computeMd5 := func(filePath string, fileSystem file_system.FileSystem) (string, error) { + return expectedMd5, nil + } + + stream, md5, contentLength, err := pathToStream(filePath, mockS3Client, mockFileSystem, computeMd5) + + assert.Equal(t, mockFile, stream) + assert.Equal(t, expectedMd5, md5) + assert.Equal(t, int64(1), contentLength) + assert.Nil(t, err) +} + +func TestPathToStream_WithInvalidLocalFile_ShouldReturnError(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + filePath := "/test/stream" + expectedErr := fmt.Errorf("error") + + mockS3Client := NewMockS3API(ctrl) + mockFileSystem := file_system.NewMockFileSystem(ctrl) + mockFileSystem.EXPECT().Open(filePath).Return(nil, expectedErr) + + computeMd5 := func(filePath string, fileSystem file_system.FileSystem) (string, error) { + return "", nil + } + + stream, md5, _, err := pathToStream(filePath, mockS3Client, mockFileSystem, computeMd5) + + assert.Nil(t, stream) + assert.Equal(t, "", md5) + assert.NotNil(t, err) + assert.Equal(t, expectedErr, err) +} + +func TestPathToStream_WithS3Url_ShouldReturnUnsupportedError(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + filePath := "s3://test/stream" + err := fmt.Errorf("test error") + + mockS3Client := NewMockS3API(ctrl) + mockS3Client.EXPECT().HeadObject(gomock.Any()).Return(nil, err) + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + computeMd5 := func(filePath string, fileSystem file_system.FileSystem) (string, error) { + assert.Fail(t, "computeMd5 should not be called.") + return "", nil + } + + stream, md5, _, err := pathToStream(filePath, mockS3Client, mockFileSystem, computeMd5) + + assert.Nil(t, stream) + assert.Equal(t, "", md5) + assert.NotNil(t, err) +} + +func TestPathToStream_WithValidS3URL_ShouldReturnValidStream(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // s3etag is wrapped in quotes + s3Etag := "\"abcdefg\"" + + // we expect our final etag to have quotes stripped + expectedEtag := "\"abcdefg\"" + + var expectedContentLength int64 = 12345 + + expectedHeadObjectOutput := &s3.HeadObjectOutput{ + ETag: &s3Etag, + ContentLength: &expectedContentLength, + } + + mockS3Client := NewMockS3API(ctrl) + mockS3Client.EXPECT().HeadObject(OfHeadObjectInput("test", "stream")).Return(expectedHeadObjectOutput, nil) + + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + filePath := "s3://test/stream" + + computeMd5 := func(filePath string, fileSystem file_system.FileSystem) (string, error) { + assert.Fail(t, "computeMd5 should not be called.") + return "", nil + } + + stream, etag, contentLength, err := pathToStream(filePath, mockS3Client, mockFileSystem, computeMd5) + + assert.NotNil(t, stream) + assert.Equal(t, expectedEtag, etag) + assert.Equal(t, expectedContentLength, contentLength) + assert.Nil(t, err) +} + +func TestPathToStream_WithS3UrlWithoutClientAndWithoutRegion_ShouldReturnUnsupportedError(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + filePath := "s3://test/stream" + err := fmt.Errorf("test error") + + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + computeMd5 := func(filePath string, fileSystem file_system.FileSystem) (string, error) { + assert.Fail(t, "computeMd5 should not be called.") + return "", nil + } + + stream, md5, _, err := pathToStream(filePath, nil, mockFileSystem, computeMd5) + + assert.Nil(t, stream) + assert.Equal(t, "", md5) + assert.NotNil(t, err) +} + +func TestPathToStream_WithHttpUrl_ShouldReturnUnsupportedError(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + filePath := "http://test/stream" + + mockS3Client := NewMockS3API(ctrl) + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + computeMd5 := func(filePath string, fileSystem file_system.FileSystem) (string, error) { + assert.Fail(t, "computeMd5 should not be called.") + return "", nil + } + + stream, md5, _, err := pathToStream(filePath, mockS3Client, mockFileSystem, computeMd5) + + assert.Nil(t, stream) + assert.Equal(t, "", md5) + assert.NotNil(t, err) +} + +func TestPathToStream_WithHttpsUrl_ShouldReturnUnsupportedError(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + filePath := "https://test/stream" + + mockS3Client := NewMockS3API(ctrl) + mockFileSystem := file_system.NewMockFileSystem(ctrl) + + computeMd5 := func(filePath string, fileSystem file_system.FileSystem) (string, error) { + assert.Fail(t, "computeMd5 should not be called.") + return "", nil + } + + stream, md5, _, err := pathToStream(filePath, mockS3Client, mockFileSystem, computeMd5) + + assert.Nil(t, stream) + assert.Equal(t, "", md5) + assert.NotNil(t, err) +} diff --git a/src/go.amzn.com/robomaker/example/s3_download/main.go b/src/go.amzn.com/robomaker/example/s3_download/main.go new file mode 100644 index 0000000..32e30fa --- /dev/null +++ b/src/go.amzn.com/robomaker/example/s3_download/main.go @@ -0,0 +1,67 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "fmt" + "io" + "os" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + awsS3 "github.com/aws/aws-sdk-go/service/s3" + "go.amzn.com/robomaker/bundle_support/s3" +) + +const ( + // Replace these values with actual location on S3 + + // AWS S3 Bucket name where the bundle is stored. + AwsS3BundleBucketName = "my-aws-bucket-name" + + // AWS S3 Key, includes the folder name and the file name of the bundle. + // If the bundle file is directly in the bucket without sub-folders, just use the bundle file name. + AwsS3BundleKey = "my-folder/my-bundle-file.tar" +) + +func main() { + sess := session.Must(session.NewSession(&aws.Config{ + Region: aws.String("us-west-2"), + })) + svc := awsS3.New(sess) + s3Reader, err := s3.NewS3Reader(svc, AwsS3BundleBucketName, AwsS3BundleKey) + + if err != nil { + fmt.Println(err) + return + } + + fmt.Printf("Etag is %v\n", s3Reader.Etag) + + //In this example, a small (1kb) intermediate buffer size is used. + //Internally, the s3 reader will buffer 5MB at a time, so this won't call to s3 for each 1kb of data, which would be slow + buffer := make([]byte, 1*1024) + + f, fileErr := os.Create("/tmp/my-bundle-file.tar") + check(fileErr) + for { + bytesRead, readErr := s3Reader.Read(buffer) + if readErr == io.EOF { + _, writeErr := f.Write(buffer[:bytesRead]) + check(writeErr) + f.Close() + break + } + check(readErr) + + _, writeErr := f.Write(buffer[:bytesRead]) + check(writeErr) + } +} + +func check(e error) { + if e != nil { + panic(e) + } +} diff --git a/src/go.amzn.com/robomaker/example/test_app/main.go b/src/go.amzn.com/robomaker/example/test_app/main.go new file mode 100644 index 0000000..df0cd1a --- /dev/null +++ b/src/go.amzn.com/robomaker/example/test_app/main.go @@ -0,0 +1,86 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "fmt" + "time" + + "go.amzn.com/robomaker/bundle_support" + "go.amzn.com/robomaker/bundle_support/file_system" + "go.amzn.com/robomaker/bundle_support/store" +) + +const ( + // Replace these values with actual location on your disk + + // BundleStoreLocation is the root directory on local disk where bundles will extract to + BundleStoreRootLocation = "/my-bundle-store-root" + + // BundleFileLocation is the location of your bundle file. This can be local disk or a S3 URL. + BundleFileLocation = "/my-bundleFile" +) + +func main() { + // Create a bundle store pointing to a local directory + localFileSystem := file_system.NewLocalFS() + bundleStore := store.NewSimpleStore(BundleStoreRootLocation, localFileSystem) + + bundleProvider := bundle_support.NewBundleProvider(bundleStore) + bundleProvider.SetProgressCallback(printProgress) + + // Start measuring the time it takes to extract + startTime := time.Now() + + // Call the Bundle Provider to extract and get the bundle. + bundle, bundleErr := bundleProvider.GetBundle(BundleFileLocation) + + if bundleErr != nil { + fmt.Printf("Get Bundle Error: %v\n", bundleErr) + return + } + + elapsedTime := time.Since(startTime) + fmt.Printf("Time taken to download and extract: %s\n", elapsedTime) + fmt.Printf("SourceCommands: %v\n", bundle.SourceCommands()) + + // Release will mark the current bundle as unused, but will not be deleted. + bundle.Release() + + // Now extract a second time, because everything is in the BundleStore, this will only take a short amount of time + startTime = time.Now() + bundle, bundleErr = bundleProvider.GetBundle(BundleFileLocation) + if bundleErr != nil { + fmt.Printf("Get Bundle Error: %v\n", bundleErr) + return + } + + elapsedTime = time.Since(startTime) + fmt.Printf("Time taken to download and extract: %s\n", elapsedTime) + fmt.Printf("SourceCommands: %v\n", bundle.SourceCommandsUsingLocation("/abc")) + + bundle.Release() + + // Cleanup will delete all bundles that are marked unused. + bundleStore.Cleanup() +} + +func printProgress(percentDone float32, timeElapsed time.Duration) { + percentLeft := 100.0 - percentDone + + //This is only "accurate" as long as download speed doesn't fluctuate too much. For better results, the estimate + //should be based on the time taken for the last few percent done, not the entire percent done + estimatedTimeRemaining := time.Duration((float32(timeElapsed.Seconds())/percentDone)*percentLeft) * time.Second + + //Round to the nearest second to avoid long strings like 1.794995859s + timeElapsedRounded := time.Duration(timeElapsed.Nanoseconds()/time.Second.Nanoseconds()) * time.Second + estimatedTimeRemainingRounded := time.Duration(estimatedTimeRemaining.Nanoseconds()/time.Second.Nanoseconds()) * time.Second + + //Don't show time left for the first few percent, it'll be inaccurate since we are using a rolling average + if percentDone < 3.0 { + fmt.Printf("Percent done: %.2f. Time elapsed: %v. Estimated time remaining: --\n", percentDone, timeElapsedRounded) + } else { + fmt.Printf("Percent done: %.2f. Time elapsed: %v. Estimated time remaining: %v\n", percentDone, timeElapsedRounded, estimatedTimeRemainingRounded) + } +}