Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rclone third-party copy push #3491

Merged
merged 9 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/rclone-tpc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: implement rclone third-party copy push option

This enhancement gives the option to use third-party copy push with rclone between two different user accounts.

https://github.com/cs3org/reva/pull/3491
39 changes: 28 additions & 11 deletions examples/datatx/datatx.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,39 @@
# example data transfer service configuration
# Example data transfer service configuration
[grpc.services.datatx]
# rclone is the default data transfer driver
# Rclone is the default data transfer driver
txdriver = "rclone"
# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json)
# The shares,transfers db file (default: /var/tmp/reva/datatx-shares.json)
tx_shares_file = ""
# base folder of the data transfers (default: /home/DataTransfers)
# Base folder of the data transfers (default: /home/DataTransfers)
data_transfers_folder = ""

# rclone data transfer driver
# Rclone data transfer driver
[grpc.services.datatx.txdrivers.rclone]
# rclone endpoint
# Rclone endpoint
endpoint = "http://..."
# basic auth is used
# Basic auth is used
auth_user = "...rcloneuser"
auth_pass = "...rcloneusersecret"
# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json)
# The authentication scheme to use in the src and dest requests by rclone (follows the endpoints' authentication methods)
# Valid values:
# "bearer" (default) will result in rclone using request header: Authorization: "Bearer ...token..."
# "x-access-token" will result in rclone using request header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
auth_header = "x-access-token"
# The transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json)
file = ""
# check status job interval in milliseconds
# Check status job interval in milliseconds
job_status_check_interval = 2000
# the job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000
# The job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000

[http.services.ocdav]
# Rclone supports third-party copy push; for that to work with reva enable this setting
enable_http_tpc = true
# The authentication scheme reva uses for the tpc push call (the call to Destination).
# Follows the destination endpoint authentication method.
# Valid values:
# "bearer" (default) will result in header: Authorization: "Bearer ...token..."
# "x-access-token" will result in header: X-Access-Token: "...token..."
# If not set "bearer" is assumed
http_tpc_push_auth_header = "x-access-token"
52 changes: 2 additions & 50 deletions internal/grpc/services/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ package datatx
import (
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"os"
"sync"

Expand Down Expand Up @@ -77,13 +75,6 @@ type txShare struct {
Opaque *types.Opaque `json:"opaque"`
}

type webdavEndpoint struct {
filePath string
endpoint string
endpointScheme string
token string
}

func (c *config) init() {
if c.TxDriver == "" {
c.TxDriver = "rclone"
Expand Down Expand Up @@ -156,23 +147,7 @@ func (s *service) UnprotectedEndpoints() []string {
}

func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) {
srcEp, err := s.extractEndpointInfo(ctx, req.SrcTargetUri)
if err != nil {
return nil, err
}
srcRemote := fmt.Sprintf("%s://%s", srcEp.endpointScheme, srcEp.endpoint)
srcPath := srcEp.filePath
srcToken := srcEp.token

destEp, err := s.extractEndpointInfo(ctx, req.DestTargetUri)
if err != nil {
return nil, err
}
dstRemote := fmt.Sprintf("%s://%s", destEp.endpointScheme, destEp.endpoint)
dstPath := destEp.filePath
dstToken := destEp.token

txInfo, startTransferErr := s.txManager.StartTransfer(ctx, srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken)
txInfo, startTransferErr := s.txManager.CreateTransfer(ctx, req.SrcTargetUri, req.DestTargetUri)

// we always save the transfer regardless of start transfer outcome
// only then, if starting fails, can we try to restart it
Expand Down Expand Up @@ -205,7 +180,7 @@ func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequ
return &datatx.PullTransferResponse{
Status: status.NewOK(ctx),
TxInfo: txInfo,
}, err
}, nil
}

func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) {
Expand Down Expand Up @@ -307,29 +282,6 @@ func (s *service) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRe
}, nil
}

func (s *service) extractEndpointInfo(ctx context.Context, targetURL string) (*webdavEndpoint, error) {
if targetURL == "" {
return nil, errtypes.BadRequest("datatx service: ref target is an empty uri")
}

uri, err := url.Parse(targetURL)
if err != nil {
return nil, errors.Wrap(err, "datatx service: error parsing target uri: "+targetURL)
}

m, err := url.ParseQuery(uri.RawQuery)
if err != nil {
return nil, errors.Wrap(err, "datatx service: error parsing target resource name")
}

return &webdavEndpoint{
filePath: m["name"][0],
endpoint: uri.Host + uri.Path,
endpointScheme: uri.Scheme,
token: uri.User.String(),
}, nil
}

func loadOrCreate(file string) (*txShareModel, error) {
_, err := os.Stat(file)
if os.IsNotExist(err) {
Expand Down
7 changes: 6 additions & 1 deletion internal/http/services/owncloud/ocdav/ocdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ type Config struct {
Timeout int64 `mapstructure:"timeout"`
Insecure bool `mapstructure:"insecure" docs:"false;Whether to skip certificate checks when sending requests."`
// If true, HTTP COPY will expect the HTTP-TPC (third-party copy) headers
EnableHTTPTpc bool `mapstructure:"enable_http_tpc"`
EnableHTTPTpc bool `mapstructure:"enable_http_tpc"`
// The authentication scheme to use for the tpc push call when userinfo part is specified in the Destination header uri. Default value is 'bearer'.
// Possible values:
// "bearer" results in header: Authorization: Bearer ...token...
// "x-access-token": results in header: X-Access-Token: ...token...
HTTPTpcPushAuthHeader string `mapstructure:"http_tpc_push_auth_header"`
PublicURL string `mapstructure:"public_url"`
FavoriteStorageDriver string `mapstructure:"favorite_storage_driver"`
FavoriteStorageDrivers map[string]map[string]interface{} `mapstructure:"favorite_storage_drivers"`
Expand Down
37 changes: 34 additions & 3 deletions internal/http/services/owncloud/ocdav/tpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"
Expand Down Expand Up @@ -392,9 +393,28 @@ func (s *svc) performHTTPPush(ctx context.Context, client gateway.GatewayAPIClie
return err
}

// add authentication header and content length
bearerHeader := r.Header.Get(HeaderTransferAuth)
req.Header.Add("Authorization", bearerHeader)
// Check if there is userinfo to be found in the destination URI
// This should be the token to use in the HTTP push call
userInfo, err := s.extractUserInfo(ctx, dst)
if err != nil {
sublog.Debug().Msgf("tpc push: error: %v", err)
}
if len(userInfo) > 0 {
sublog.Debug().Msg("tpc push: userinfo part found in destination url, using userinfo as token for the HTTP push request authorization header")
if s.c.HTTPTpcPushAuthHeader == "x-access-token" {
req.Header.Add(s.c.HTTPTpcPushAuthHeader, userInfo)
sublog.Debug().Msgf("tpc push: using authentication scheme: %v", s.c.HTTPTpcPushAuthHeader)
} else { // Bearer is the default
req.Header.Add("Authorization", "Bearer "+userInfo)
sublog.Debug().Msg("tpc push: using authentication scheme: bearer")
}
} else {
sublog.Debug().Msg("tpc push: no userinfo part found in destination url, using token from the COPY request authorization header")
// add authorization header; single token tpc
bearerHeader := r.Header.Get(HeaderTransferAuth)
req.Header.Add("Authorization", bearerHeader)
}
// add content length
req.ContentLength = int64(srcInfo.GetSize())

// do Upload
Expand All @@ -412,3 +432,14 @@ func (s *svc) performHTTPPush(ctx context.Context, client gateway.GatewayAPIClie

return nil
}

// Extracts and returns the userinfo part of the specified target URL (https://[userinfo]@www.example.com:123/...path).
// Returns an empty string if no userinfo part is found.
func (s *svc) extractUserInfo(ctx context.Context, targetURL string) (string, error) {
parsedURL, err := url.Parse(targetURL)
if err != nil {
return "", errtypes.BadRequest("tpc: error extracting userinfo part - error parsing url: " + targetURL)
}

return parsedURL.User.String(), nil
}
5 changes: 3 additions & 2 deletions pkg/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (

// Manager the interface any transfer driver should implement.
type Manager interface {
// StartTransfer initiates a transfer job and returns a TxInfo object including a unique transfer id, and error if any.
StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error)
// CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id.
// Specified target URIs are of form scheme://userinfo@host:port?name={path}
CreateTransfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error)
// GetTransferStatus returns a TxInfo object including the current status, and error if any.
GetTransferStatus(ctx context.Context, transferID string) (*datatx.TxInfo, error)
// CancelTransfer cancels the transfer and returns a TxInfo object and error if any.
Expand Down
74 changes: 69 additions & 5 deletions pkg/datatx/manager/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cs3org/reva/pkg/appctx"
txdriver "github.com/cs3org/reva/pkg/datatx"
registry "github.com/cs3org/reva/pkg/datatx/manager/registry"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rhttp"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -64,6 +65,7 @@ type config struct {
Endpoint string `mapstructure:"endpoint"`
AuthUser string `mapstructure:"auth_user"` // rclone basicauth user
AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass
AuthHeader string `mapstructure:"auth_header"`
File string `mapstructure:"file"`
JobStatusCheckInterval int `mapstructure:"job_status_check_interval"`
JobTimeout int `mapstructure:"job_timeout"`
Expand Down Expand Up @@ -118,6 +120,13 @@ var txEndStatuses = map[string]int32{
"STATUS_TRANSFER_EXPIRED": 10,
}

type endpoint struct {
filePath string
endpoint string
endpointScheme string
token string
}

// New returns a new rclone driver.
func New(m map[string]interface{}) (txdriver.Manager, error) {
c, err := parseConfig(m)
Expand Down Expand Up @@ -207,9 +216,32 @@ func (m *transferModel) saveTransfer(e error) error {
return e
}

// StartTransfer initiates a transfer job and returns a TxInfo object that includes a unique transfer id.
func (driver *rclone) StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) {
return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, destRemote, destPath, destToken)
// CreateTransfer creates a transfer job and returns a TxInfo object that includes a unique transfer id.
// Specified target URIs are of form scheme://userinfo@host:port?name={path}
func (driver *rclone) CreateTransfer(ctx context.Context, srcTargetURI string, dstTargetURI string) (*datatx.TxInfo, error) {
logger := appctx.GetLogger(ctx)

srcEp, err := driver.extractEndpointInfo(ctx, srcTargetURI)
if err != nil {
return nil, err
}
srcRemote := fmt.Sprintf("%s://%s", srcEp.endpointScheme, srcEp.endpoint)
srcPath := srcEp.filePath
srcToken := srcEp.token

destEp, err := driver.extractEndpointInfo(ctx, dstTargetURI)
if err != nil {
return nil, err
}
dstPath := destEp.filePath
dstToken := destEp.token
// we always set the userinfo part of the destination url for rclone tpc push support
dstRemote := fmt.Sprintf("%s://%s@%s", destEp.endpointScheme, dstToken, destEp.endpoint)

logger.Debug().Msgf("destination target URI: %v", dstTargetURI)
logger.Debug().Msgf("destination remote: %v", dstRemote)

return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken)
}

// startJob starts a transfer job. Retries a previous job if transferID is specified.
Expand Down Expand Up @@ -281,8 +313,17 @@ func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote
// DstToken string `json:"destToken"`
Async bool `json:"_async"`
}
srcFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", srcToken, srcRemote, srcPath)
dstFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", destToken, destRemote, destPath)
// bearer is the default authentication scheme for reva
srcAuthHeader := fmt.Sprintf("bearer_token=\"%v\"", srcToken)
if driver.config.AuthHeader == "x-access-token" {
srcAuthHeader = fmt.Sprintf("headers=\"x-access-token,%v\"", srcToken)
}
srcFs := fmt.Sprintf(":webdav,%v,url=\"%v\":%v", srcAuthHeader, srcRemote, srcPath)
destAuthHeader := fmt.Sprintf("bearer_token=\"%v\"", destToken)
if driver.config.AuthHeader == "x-access-token" {
destAuthHeader = fmt.Sprintf("headers=\"x-access-token,%v\"", destToken)
}
dstFs := fmt.Sprintf(":webdav,%v,url=\"%v\":%v", destAuthHeader, destRemote, destPath)
rcloneReq := &rcloneAsyncReqJSON{
SrcFs: srcFs,
DstFs: dstFs,
Expand Down Expand Up @@ -829,3 +870,26 @@ func (driver *rclone) remotePathIsFolder(remote string, remotePath string, remot
// in all other cases the remote path is a directory
return true, nil
}

func (driver *rclone) extractEndpointInfo(ctx context.Context, targetURL string) (*endpoint, error) {
if targetURL == "" {
return nil, errtypes.BadRequest("datatx service: ref target is an empty uri")
}

uri, err := url.Parse(targetURL)
if err != nil {
return nil, errors.Wrap(err, "datatx service: error parsing target uri: "+targetURL)
}

m, err := url.ParseQuery(uri.RawQuery)
if err != nil {
return nil, errors.Wrap(err, "datatx service: error parsing target resource name")
}

return &endpoint{
filePath: m["name"][0],
gmgigi96 marked this conversation as resolved.
Show resolved Hide resolved
endpoint: uri.Host + uri.Path,
endpointScheme: uri.Scheme,
token: uri.User.String(),
}, nil
}