From d606e4f65f112f737a9da53a299bbef0aa9a63ab Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 27 Jan 2023 16:14:33 +0000 Subject: [PATCH 1/6] Support downloading plugins from other cloudquery repos --- registry/download.go | 78 ++++++++++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 36 deletions(-) diff --git a/registry/download.go b/registry/download.go index 792caf091e..3cf7e2af1d 100644 --- a/registry/download.go +++ b/registry/download.go @@ -3,6 +3,7 @@ package registry import ( "archive/zip" "context" + "errors" "fmt" "io" "net/http" @@ -11,8 +12,6 @@ import ( "runtime" "time" - "github.com/avast/retry-go/v4" - "github.com/schollz/progressbar/v3" ) @@ -30,10 +29,20 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, downloadDir := filepath.Dir(localPath) pluginZipPath := localPath + ".zip" // https://github.com/cloudquery/cloudquery/releases/download/plugins-source-test-v1.1.5/test_darwin_amd64.zip - downloadURL := fmt.Sprintf("https://github.com/cloudquery/cloudquery/releases/download/plugins-%s-%s-%s/%s_%s_%s.zip", typ, name, version, name, runtime.GOOS, runtime.GOARCH) - if org != "cloudquery" { + var urls []string + if org == "cloudquery" { + urls = []string{ + // monorepo plugin + fmt.Sprintf("https://github.com/cloudquery/cloudquery/releases/download/plugins-%s-%s-%s/%s_%s_%s.zip", typ, name, version, name, runtime.GOOS, runtime.GOARCH), + // community plugin under CloudQuery org + fmt.Sprintf("https://github.com/cloudquery/cq-%s-%s/releases/download/%s/cq-%s-%s_%s_%s.zip", typ, name, version, typ, name, runtime.GOOS, runtime.GOARCH), + } + } else { // https://github.com/yevgenypats/cq-source-test/releases/download/v1.0.1/cq-source-test_darwin_amd64.zip - downloadURL = fmt.Sprintf("https://github.com/%s/cq-%s-%s/releases/download/%s/cq-%s-%s_%s_%s.zip", org, typ, name, version, typ, name, runtime.GOOS, runtime.GOARCH) + urls = []string{ + // community plugin under user org + fmt.Sprintf("https://github.com/%s/cq-%s-%s/releases/download/%s/cq-%s-%s_%s_%s.zip", org, typ, name, version, typ, name, runtime.GOOS, runtime.GOARCH), + } } if _, err := os.Stat(localPath); err == nil { @@ -44,7 +53,7 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, return fmt.Errorf("failed to create plugin directory %s: %w", downloadDir, err) } - err := downloadFile(ctx, pluginZipPath, downloadURL) + urlIndex, err := downloadFile(ctx, pluginZipPath, urls...) if err != nil { return fmt.Errorf("failed to download plugin: %w", err) } @@ -55,8 +64,10 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, } defer archive.Close() - pathInArchive := fmt.Sprintf("plugins/%s/%s", typ, name) - if org != "cloudquery" { + var pathInArchive string + if org == "cloudquery" && urlIndex == 0 { + pathInArchive = fmt.Sprintf("plugins/%s/%s", typ, name) + } else { pathInArchive = fmt.Sprintf("cq-%s-%s", typ, name) } pathInArchive = WithBinarySuffix(pathInArchive) @@ -80,34 +91,37 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, return nil } -func downloadFile(ctx context.Context, localPath string, url string) (err error) { +func downloadFile(ctx context.Context, localPath string, urls ...string) (urlIndex int, err error) { // Create the file out, err := os.Create(localPath) if err != nil { - return fmt.Errorf("failed to create file %s: %w", localPath, err) + return 0, fmt.Errorf("failed to create file %s: %w", localPath, err) } defer out.Close() - // Get the data - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return fmt.Errorf("failed create request %s: %w", url, err) - } + for r := 0; r < RetryAttempts; r++ { + for i, url := range urls { + // Get the data + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return 0, fmt.Errorf("failed create request %s: %w", url, err) + } - err = retry.Do( - func() error { // Do http request resp, err := http.DefaultClient.Do(req) if err != nil { - return fmt.Errorf("failed to get url %s: %w", url, err) + return 0, fmt.Errorf("failed to get url %s: %w", url, err) } - // Check server response - if resp.StatusCode != http.StatusOK { + if resp.StatusCode == http.StatusNotFound && i < len(urls)-1 { + // check alternative url + resp.Body.Close() + continue + } else if resp.StatusCode != http.StatusOK { fmt.Printf("Failed downloading %s with status code %d. Retrying\n", url, resp.StatusCode) - return fmt.Errorf("statusCode != 200") + resp.Body.Close() + break } - defer resp.Body.Close() fmt.Printf("Downloading %s\n", url) bar := downloadProgressBar(resp.ContentLength, "Downloading") @@ -115,23 +129,15 @@ func downloadFile(ctx context.Context, localPath string, url string) (err error) // Writer the body to file _, err = io.Copy(io.MultiWriter(out, bar), resp.Body) if err != nil { - return fmt.Errorf("failed to copy body to file %s: %w", localPath, err) + return 0, fmt.Errorf("failed to copy body to file %s: %w", localPath, err) } - - return nil - }, - retry.RetryIf(func(err error) bool { - return err.Error() == "statusCode != 200" - }), - retry.Attempts(RetryAttempts), - retry.Delay(RetryWaitTime), - ) - - if err != nil { - return fmt.Errorf("failed downloading: %s", url) + resp.Body.Close() + return i, nil + } + time.Sleep(RetryWaitTime) } - return nil + return 0, errors.New("failed to download plugin") } func downloadProgressBar(maxBytes int64, description ...string) *progressbar.ProgressBar { From c8ab535ac332396e4120fc94c1d88af636b2d4bc Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Fri, 27 Jan 2023 16:36:02 +0000 Subject: [PATCH 2/6] Simplify logic --- registry/download.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/registry/download.go b/registry/download.go index 3cf7e2af1d..f7bb4cea38 100644 --- a/registry/download.go +++ b/registry/download.go @@ -29,19 +29,16 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, downloadDir := filepath.Dir(localPath) pluginZipPath := localPath + ".zip" // https://github.com/cloudquery/cloudquery/releases/download/plugins-source-test-v1.1.5/test_darwin_amd64.zip - var urls []string + urls := []string{ + // community plugin format + fmt.Sprintf("https://github.com/%s/cq-%s-%s/releases/download/%s/cq-%s-%s_%s_%s.zip", org, typ, name, version, typ, name, runtime.GOOS, runtime.GOARCH), + } if org == "cloudquery" { urls = []string{ - // monorepo plugin + // CloudQuery monorepo plugin fmt.Sprintf("https://github.com/cloudquery/cloudquery/releases/download/plugins-%s-%s-%s/%s_%s_%s.zip", typ, name, version, name, runtime.GOOS, runtime.GOARCH), - // community plugin under CloudQuery org - fmt.Sprintf("https://github.com/cloudquery/cq-%s-%s/releases/download/%s/cq-%s-%s_%s_%s.zip", typ, name, version, typ, name, runtime.GOOS, runtime.GOARCH), - } - } else { - // https://github.com/yevgenypats/cq-source-test/releases/download/v1.0.1/cq-source-test_darwin_amd64.zip - urls = []string{ - // community plugin under user org - fmt.Sprintf("https://github.com/%s/cq-%s-%s/releases/download/%s/cq-%s-%s_%s_%s.zip", org, typ, name, version, typ, name, runtime.GOOS, runtime.GOARCH), + // fall back to community plugin format if the plugin is not found in the monorepo + urls[0], } } From 928d0aec43b2d818042f4ae0e3d55344f346c111 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Mon, 30 Jan 2023 09:59:34 +0000 Subject: [PATCH 3/6] Refactor and add test --- registry/download.go | 83 +++++++++++++++++++++++---------------- registry/download_test.go | 31 +++++++++++++++ 2 files changed, 80 insertions(+), 34 deletions(-) create mode 100644 registry/download_test.go diff --git a/registry/download.go b/registry/download.go index f7bb4cea38..8ec9171e36 100644 --- a/registry/download.go +++ b/registry/download.go @@ -12,6 +12,7 @@ import ( "runtime" "time" + "github.com/avast/retry-go/v4" "github.com/schollz/progressbar/v3" ) @@ -92,49 +93,63 @@ func downloadFile(ctx context.Context, localPath string, urls ...string) (urlInd // Create the file out, err := os.Create(localPath) if err != nil { - return 0, fmt.Errorf("failed to create file %s: %w", localPath, err) + return -1, fmt.Errorf("failed to create file %s: %w", localPath, err) } defer out.Close() - for r := 0; r < RetryAttempts; r++ { - for i, url := range urls { - // Get the data - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return 0, fmt.Errorf("failed create request %s: %w", url, err) +urlLoop: + for i, url := range urls { + err = downloadFileFromURL(ctx, out, url) + if err != nil { + for _, e := range err.(retry.Error) { + if e.Error() == "not found" { + continue urlLoop + } } + } + return i, err + } - // Do http request - resp, err := http.DefaultClient.Do(req) - if err != nil { - return 0, fmt.Errorf("failed to get url %s: %w", url, err) - } - // Check server response - if resp.StatusCode == http.StatusNotFound && i < len(urls)-1 { - // check alternative url - resp.Body.Close() - continue - } else if resp.StatusCode != http.StatusOK { - fmt.Printf("Failed downloading %s with status code %d. Retrying\n", url, resp.StatusCode) - resp.Body.Close() - break - } + return -1, errors.New("failed to download plugin") +} - fmt.Printf("Downloading %s\n", url) - bar := downloadProgressBar(resp.ContentLength, "Downloading") +func downloadFileFromURL(ctx context.Context, out *os.File, url string) (err error) { + return retry.Do(func() error { + // Get the data + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return fmt.Errorf("failed create request %s: %w", url, err) + } - // Writer the body to file - _, err = io.Copy(io.MultiWriter(out, bar), resp.Body) - if err != nil { - return 0, fmt.Errorf("failed to copy body to file %s: %w", localPath, err) - } - resp.Body.Close() - return i, nil + // Do http request + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed to get url %s: %w", url, err) } - time.Sleep(RetryWaitTime) - } + defer resp.Body.Close() + // Check server response + if resp.StatusCode == http.StatusNotFound { + return errors.New("not found") + } else if resp.StatusCode != http.StatusOK { + fmt.Printf("Failed downloading %s with status code %d. Retrying\n", url, resp.StatusCode) + return errors.New("statusCode != 200") + } + + fmt.Printf("Downloading %s\n", url) + bar := downloadProgressBar(resp.ContentLength, "Downloading") - return 0, errors.New("failed to download plugin") + // Writer the body to file + _, err = io.Copy(io.MultiWriter(out, bar), resp.Body) + if err != nil { + return fmt.Errorf("failed to copy body to file %s: %w", out.Name(), err) + } + return nil + }, retry.RetryIf(func(err error) bool { + return err.Error() == "statusCode != 200" + }), + retry.Attempts(RetryAttempts), + retry.Delay(RetryWaitTime), + ) } func downloadProgressBar(maxBytes int64, description ...string) *progressbar.ProgressBar { diff --git a/registry/download_test.go b/registry/download_test.go new file mode 100644 index 0000000000..a4dc128abb --- /dev/null +++ b/registry/download_test.go @@ -0,0 +1,31 @@ +package registry + +import ( + "context" + "path" + "testing" +) + +func TestDownloadPluginFromGithubIntegration(t *testing.T) { + tmp := t.TempDir() + cases := []struct { + name string + org string + plugin string + version string + pluginType PluginType + }{ + {name: "monorepo source", org: "cloudquery", plugin: "hackernews", version: "v1.1.4", pluginType: PluginTypeSource}, + {name: "many repo source", org: "cloudquery", plugin: "simple-analytics", version: "v1.0.0", pluginType: PluginTypeSource}, + {name: "monorepo destination", org: "cloudquery", plugin: "postgresql", version: "v2.0.7", pluginType: PluginTypeDestination}, + {name: "community source", org: "hermanschaaf", plugin: "simple-analytics", version: "v1.0.0", pluginType: PluginTypeSource}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := DownloadPluginFromGithub(context.Background(), path.Join(tmp, tc.name), tc.org, tc.plugin, tc.version, tc.pluginType) + if err != nil { + t.Fatal(err) + } + }) + } +} From babf374532d12512a0bae592ccc6792e8193dcdb Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Mon, 30 Jan 2023 12:44:55 +0000 Subject: [PATCH 4/6] Change format --- registry/download.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/registry/download.go b/registry/download.go index 8ec9171e36..04f2e55542 100644 --- a/registry/download.go +++ b/registry/download.go @@ -35,12 +35,12 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, fmt.Sprintf("https://github.com/%s/cq-%s-%s/releases/download/%s/cq-%s-%s_%s_%s.zip", org, typ, name, version, typ, name, runtime.GOOS, runtime.GOARCH), } if org == "cloudquery" { - urls = []string{ + urls = append( // CloudQuery monorepo plugin - fmt.Sprintf("https://github.com/cloudquery/cloudquery/releases/download/plugins-%s-%s-%s/%s_%s_%s.zip", typ, name, version, name, runtime.GOOS, runtime.GOARCH), + []string{fmt.Sprintf("https://github.com/cloudquery/cloudquery/releases/download/plugins-%s-%s-%s/%s_%s_%s.zip", typ, name, version, name, runtime.GOOS, runtime.GOARCH)}, // fall back to community plugin format if the plugin is not found in the monorepo - urls[0], - } + urls..., + ) } if _, err := os.Stat(localPath); err == nil { From 8eabf50cf2cd2ceb7095823cf54b7ab4727a4201 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Mon, 30 Jan 2023 12:53:52 +0000 Subject: [PATCH 5/6] use url struct --- registry/download.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/registry/download.go b/registry/download.go index 04f2e55542..2994320df3 100644 --- a/registry/download.go +++ b/registry/download.go @@ -26,18 +26,23 @@ const ( RetryWaitTime = 1 * time.Second ) +type pluginUrl struct { + url string + monorepo bool +} + func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, name string, version string, typ PluginType) error { downloadDir := filepath.Dir(localPath) pluginZipPath := localPath + ".zip" // https://github.com/cloudquery/cloudquery/releases/download/plugins-source-test-v1.1.5/test_darwin_amd64.zip - urls := []string{ + urls := []pluginUrl{ // community plugin format - fmt.Sprintf("https://github.com/%s/cq-%s-%s/releases/download/%s/cq-%s-%s_%s_%s.zip", org, typ, name, version, typ, name, runtime.GOOS, runtime.GOARCH), + pluginUrl{url: fmt.Sprintf("https://github.com/%s/cq-%s-%s/releases/download/%s/cq-%s-%s_%s_%s.zip", org, typ, name, version, typ, name, runtime.GOOS, runtime.GOARCH)}, } if org == "cloudquery" { urls = append( // CloudQuery monorepo plugin - []string{fmt.Sprintf("https://github.com/cloudquery/cloudquery/releases/download/plugins-%s-%s-%s/%s_%s_%s.zip", typ, name, version, name, runtime.GOOS, runtime.GOARCH)}, + []pluginUrl{{url: fmt.Sprintf("https://github.com/cloudquery/cloudquery/releases/download/plugins-%s-%s-%s/%s_%s_%s.zip", typ, name, version, name, runtime.GOOS, runtime.GOARCH), monorepo: true}}, // fall back to community plugin format if the plugin is not found in the monorepo urls..., ) @@ -51,7 +56,7 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, return fmt.Errorf("failed to create plugin directory %s: %w", downloadDir, err) } - urlIndex, err := downloadFile(ctx, pluginZipPath, urls...) + used, err := downloadFile(ctx, pluginZipPath, urls...) if err != nil { return fmt.Errorf("failed to download plugin: %w", err) } @@ -63,7 +68,7 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, defer archive.Close() var pathInArchive string - if org == "cloudquery" && urlIndex == 0 { + if used.monorepo { pathInArchive = fmt.Sprintf("plugins/%s/%s", typ, name) } else { pathInArchive = fmt.Sprintf("cq-%s-%s", typ, name) @@ -89,17 +94,17 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, return nil } -func downloadFile(ctx context.Context, localPath string, urls ...string) (urlIndex int, err error) { +func downloadFile(ctx context.Context, localPath string, urls ...pluginUrl) (used pluginUrl, err error) { // Create the file out, err := os.Create(localPath) if err != nil { - return -1, fmt.Errorf("failed to create file %s: %w", localPath, err) + return pluginUrl{}, fmt.Errorf("failed to create file %s: %w", localPath, err) } defer out.Close() urlLoop: - for i, url := range urls { - err = downloadFileFromURL(ctx, out, url) + for _, url := range urls { + err = downloadFileFromURL(ctx, out, url.url) if err != nil { for _, e := range err.(retry.Error) { if e.Error() == "not found" { @@ -107,10 +112,10 @@ urlLoop: } } } - return i, err + return url, err } - return -1, errors.New("failed to download plugin") + return pluginUrl{}, errors.New("failed to download plugin") } func downloadFileFromURL(ctx context.Context, out *os.File, url string) (err error) { From 179ab0611cc9d0ce40c2a91e3779b1cb06509dbf Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 31 Jan 2023 09:28:02 +0000 Subject: [PATCH 6/6] Fix naming --- registry/download.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/registry/download.go b/registry/download.go index a57da3393e..8767dfb92f 100644 --- a/registry/download.go +++ b/registry/download.go @@ -26,7 +26,7 @@ const ( RetryWaitTime = 1 * time.Second ) -type pluginUrl struct { +type pluginURL struct { url string monorepo bool } @@ -35,14 +35,14 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, downloadDir := filepath.Dir(localPath) pluginZipPath := localPath + ".zip" // https://github.com/cloudquery/cloudquery/releases/download/plugins-source-test-v1.1.5/test_darwin_amd64.zip - urls := []pluginUrl{ + urls := []pluginURL{ // community plugin format - pluginUrl{url: fmt.Sprintf("https://github.com/%s/cq-%s-%s/releases/download/%s/cq-%s-%s_%s_%s.zip", org, typ, name, version, typ, name, runtime.GOOS, runtime.GOARCH)}, + {url: fmt.Sprintf("https://github.com/%s/cq-%s-%s/releases/download/%s/cq-%s-%s_%s_%s.zip", org, typ, name, version, typ, name, runtime.GOOS, runtime.GOARCH)}, } if org == "cloudquery" { urls = append( // CloudQuery monorepo plugin - []pluginUrl{{url: fmt.Sprintf("https://github.com/cloudquery/cloudquery/releases/download/plugins-%s-%s-%s/%s_%s_%s.zip", typ, name, version, name, runtime.GOOS, runtime.GOARCH), monorepo: true}}, + []pluginURL{{url: fmt.Sprintf("https://github.com/cloudquery/cloudquery/releases/download/plugins-%s-%s-%s/%s_%s_%s.zip", typ, name, version, name, runtime.GOOS, runtime.GOARCH), monorepo: true}}, // fall back to community plugin format if the plugin is not found in the monorepo urls..., ) @@ -94,11 +94,11 @@ func DownloadPluginFromGithub(ctx context.Context, localPath string, org string, return nil } -func downloadFile(ctx context.Context, localPath string, urls ...pluginUrl) (used pluginUrl, err error) { +func downloadFile(ctx context.Context, localPath string, urls ...pluginURL) (used pluginURL, err error) { // Create the file out, err := os.Create(localPath) if err != nil { - return pluginUrl{}, fmt.Errorf("failed to create file %s: %w", localPath, err) + return pluginURL{}, fmt.Errorf("failed to create file %s: %w", localPath, err) } defer out.Close() @@ -109,7 +109,7 @@ func downloadFile(ctx context.Context, localPath string, urls ...pluginUrl) (use } return url, err } - return pluginUrl{}, fmt.Errorf("failed downloading from URL %v. Error %w", urls, err) + return pluginURL{}, fmt.Errorf("failed downloading from URL %v. Error %w", urls, err) } func downloadFileFromURL(ctx context.Context, out *os.File, url string) error {