forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
/
piputil.go
178 lines (164 loc) · 6.39 KB
/
piputil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bufio"
"bytes"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"github.com/apache/beam/sdks/go/pkg/beam/util/execx"
)
var (
pip = pipLocation()
)
func pipLocation() string {
// Users can set 'pip' environment variable to use a custom pip path.
if v, ok := os.LookupEnv("pip"); ok {
return v
}
return "pip"
}
// pipInstallRequirements installs the given requirement, if present.
func pipInstallRequirements(files []string, dir, name string) error {
for _, file := range files {
if file == name {
// We run the install process in two rounds in order to avoid as much
// as possible PyPI downloads. In the first round the --find-links
// option will make sure that only things staged in the worker will be
// used without following their dependencies.
args := []string{"install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir}
if err := execx.Execute(pip, args...); err != nil {
return err
}
// The second install round opens up the search for packages on PyPI and
// also installs dependencies. The key is that if all the packages have
// been installed in the first round then this command will be a no-op.
args = []string{"install", "-r", filepath.Join(dir, name), "--disable-pip-version-check", "--find-links", dir}
return execx.Execute(pip, args...)
}
}
return nil
}
// pipInstallPackage installs the given package, if present.
func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error {
for _, file := range files {
if file == name {
var packageSpec = name
if extras != nil {
packageSpec += "[" + strings.Join(extras, ",") + "]"
}
if force {
// We only use force reinstallation for packages specified using the
// --extra_package flag. In this case, we always want to use the
// user-specified package, overwriting any existing package already
// installed. At the same time, we want to avoid reinstalling any
// dependencies. The "pip install" command doesn't have a clean way to do
// this, so we do this in two steps.
//
// First, we use the three flags "--upgrade --force-reinstall --no-deps"
// to "pip install" so as to force the package to be reinstalled, while
// avoiding reinstallation of dependencies. Note now that if any needed
// dependencies were not installed, they will still be missing.
//
// Next, we run "pip install" on the package without these flags. Since the
// installed version will match the package specified, the package itself
// will not be reinstalled, but its dependencies will now be resolved and
// installed if necessary. This achieves our goal outlined above.
args := []string{"install", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
filepath.Join(dir, packageSpec)}
err := execx.Execute(pip, args...)
if err != nil {
return err
}
args = []string{"install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
return execx.Execute(pip, args...)
}
// Case when we do not perform a forced reinstall.
args := []string{"install", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
return execx.Execute(pip, args...)
}
}
if optional {
return nil
}
return errors.New("package '" + name + "' not found")
}
// installExtraPackages installs all the packages declared in the extra
// packages manifest file.
func installExtraPackages(files []string, extraPackagesFile, dir string) error {
// First check that extra packages manifest file is present.
for _, file := range files {
if file != extraPackagesFile {
continue
}
// Found the manifest. Install extra packages.
manifest, err := ioutil.ReadFile(filepath.Join(dir, extraPackagesFile))
if err != nil {
return fmt.Errorf("failed to read extra packages manifest file: %v", err)
}
s := bufio.NewScanner(bytes.NewReader(manifest))
s.Split(bufio.ScanLines)
for s.Scan() {
extraPackage := s.Text()
log.Printf("Installing extra package: %s", extraPackage)
if err = pipInstallPackage(files, dir, extraPackage, true, false, nil); err != nil {
return fmt.Errorf("failed to install extra package %s: %v", extraPackage, err)
}
}
return nil
}
return nil
}
func findBeamSdkWhl(files []string, acceptableWhlSpecs []string) string {
for _, file := range files {
if strings.HasPrefix(file, "apache_beam") {
for _, s := range acceptableWhlSpecs {
if strings.HasSuffix(file, s) {
log.Printf("Found Apache Beam SDK wheel: %v", file)
return file
}
}
}
}
return ""
}
// InstallSdk installs Beam SDK: First, we try to find a compiled
// wheel distribution of Apache Beam among staged files. If we find it, we
// assume that the pipleine was started with the Beam SDK found in the wheel
// file, and we try to install it. If not successful, we fall back to installing
// SDK from source tarball provided in sdkSrcFile.
func installSdk(files []string, workDir string, sdkSrcFile string, acceptableWhlSpecs []string, required bool) error {
sdkWhlFile := findBeamSdkWhl(files, acceptableWhlSpecs)
if sdkWhlFile != "" {
err := pipInstallPackage(files, workDir, sdkWhlFile, false, false, []string{"gcp"})
if err == nil {
return nil
}
log.Printf("Could not install Apache Beam SDK from a wheel: %v, proceeding to install SDK from source tarball.", err)
}
if !required {
_, err := os.Stat(filepath.Join(workDir, sdkSrcFile))
if os.IsNotExist(err) {
return nil
}
}
err := pipInstallPackage(files, workDir, sdkSrcFile, false, false, []string{"gcp"})
return err
}