Skip to content

Commit

Permalink
Merge pull request #84 from arenadata/feature/ADBDEV-4978
Browse files Browse the repository at this point in the history
Feature/adbdev 4978
  • Loading branch information
iamlapa committed Mar 18, 2024
2 parents 07062d4 + 68f389d commit 1a66599
Show file tree
Hide file tree
Showing 33 changed files with 815 additions and 111 deletions.
2 changes: 1 addition & 1 deletion automation/arenadata/scripts/compile_pxf_without_test.sh
Expand Up @@ -8,7 +8,7 @@ GPHOME=/usr/local/greenplum-db-devel
bash --login -c "
export PXF_HOME=${GPHOME}/pxf
make -C '${PWD}/pxf_src/external-table' install
make -C '${PWD}/pxf_src/cli/go/src/pxf-cli' install
make -C '${PWD}/pxf_src/cli' install
make -C '${PWD}/pxf_src/server' install-server
"

Expand Down
10 changes: 9 additions & 1 deletion cli/cmd/cluster.go
Expand Up @@ -49,8 +49,12 @@ var (
restartCmd = createCobraCommand("restart", "Restart the PXF server on coordinator, standby coordinator, and all segment hosts", &RestartCommand)
prepareCmd = createCobraCommand("prepare", "Prepares a new base directory specified by the $PXF_BASE environment variable", &PrepareCommand)
migrateCmd = createCobraCommand("migrate", "Migrates configurations from older installations of PXF", &MigrateCommand)
reloadCmd = createCobraCommand("reload", "Reload the PXF caches on coordinator, standby coordinator, and all segment hosts for profiles and terminate all related queries.", &ReloadCommand)
// DeleteOnSync is a boolean for determining whether to use rsync with --delete, exported for tests
DeleteOnSync bool
DeleteOnSync bool
ReloadProfileName string
ReloadServerName string
ReloadAutoConfirm bool
)

func init() {
Expand All @@ -66,6 +70,10 @@ func init() {
clusterCmd.AddCommand(restartCmd)
clusterCmd.AddCommand(prepareCmd)
clusterCmd.AddCommand(migrateCmd)
clusterCmd.AddCommand(reloadCmd)
reloadCmd.Flags().StringVarP(&ReloadProfileName, "profile", "p", "", "profile that PXF uses to access the data")
reloadCmd.Flags().StringVarP(&ReloadServerName, "server", "s", "", "the name of a directory residing in $PXF_BASE/servers/")
reloadCmd.Flags().BoolVarP(&ReloadAutoConfirm, "auto", "a", false, "auto confirm the action of reload")
}

func exitWithReturnCode(err error) {
Expand Down
47 changes: 47 additions & 0 deletions cli/cmd/pxf.go
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/greenplum-db/gp-common-go-libs/cluster"
"github.com/greenplum-db/gp-common-go-libs/gplog"
)

type envVar string
Expand All @@ -20,6 +21,9 @@ const (
javaHome envVar = "JAVA_HOME"
// For pxf migrate
pxfConf envVar = "PXF_CONF"
// For pxf profile reload
pxfHost envVar = "PXF_HOST"
pxfPort envVar = "PXF_PORT"
)

type messageType int
Expand Down Expand Up @@ -70,6 +74,35 @@ func (cmd *command) GetFunctionToExecute() (func(string) string, error) {
hostname,
inputs[pxfBase])
}, nil
case reload:
pxfDefaultHost := "localhost"
pxfDefaultPort := "5888"
var pxfHostStr string
var pxfPortStr string
reloadCommandTemplate := "curl --silent --fail --show-error --request POST http://%s:%s/pxf/reload --header \"Content-Type: application/json\" --data '{\"profile\":\"%s\",\"server\":\"%s\"}'"

// Set pxf host
pxfHostStr, isPxfHostSet := os.LookupEnv(string(pxfHost))
if !isPxfHostSet {
pxfHostStr = pxfDefaultHost
}

// Set pxf port
pxfPortStr, isPxfPortSet := os.LookupEnv(string(pxfPort))
if !isPxfPortSet {
pxfPortStr = pxfDefaultPort
}

reloadCommand := fmt.Sprintf(reloadCommandTemplate, pxfHostStr, pxfPortStr, ReloadProfileName, ReloadServerName)
if !ReloadAutoConfirm {
cmd.warn = true
err := cmd.Warn(os.Stdin)
if err != nil {
return nil, err
}
}
gplog.Info(fmt.Sprintf("Execute command: %s", reloadCommand))
return func(_ string) string { return reloadCommand }, nil
default:
var effectivePxfBase string

Expand Down Expand Up @@ -127,6 +160,7 @@ const (
restart = "restart"
prepare = "prepare"
migrate = "migrate"
reload = "reload"
)

// The pxf cli commands, exported for testing
Expand Down Expand Up @@ -190,6 +224,19 @@ var (
// since the files are already on coordinator, we exclude coordinator but include standby coordinator
whereToRun: cluster.ON_LOCAL | cluster.ON_HOSTS | cluster.EXCLUDE_MASTER | cluster.INCLUDE_MIRRORS,
}
ReloadCommand = command{
name: reload,
messages: map[messageType]string{
success: "PXF successfully reloaded profiles on %d out of %d host%s\n",
status: "PXF is reloading profiles on coordinator host%s and %d segment host%s...\n",
standby: " standby coordinator host and",
err: "PXF failed to reload profile on %d out of %d host%s. Check the PXF logs located in the '$PXF_BASE/logs' directory\n",
warning: "Do you really want to reload profile(s) and terminate all related queries? Yy|Nn (default=N):",
},
warn: false,
envVars: []envVar{pxfBase},
whereToRun: cluster.ON_REMOTE | cluster.ON_HOSTS | cluster.INCLUDE_COORDINATOR | cluster.INCLUDE_MIRRORS,
}
StatusCommand = command{
name: statuses,
messages: map[messageType]string{
Expand Down
@@ -0,0 +1,36 @@
package org.greenplum.pxf.api.model;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

public interface CancelableOperation {
/**
* Cancel read operation. Might contain additional logic comparing with {@link Accessor#closeForRead()}
*
* @throws Exception if the cancel operation failed
*/
void cancelRead() throws Exception;

/**
* Cancel write operation. Might contain additional logic comparing with {@link Accessor#closeForWrite()}
*
* @throws Exception if the cancel the operation failed
*/
void cancelWrite() throws Exception;
}
@@ -0,0 +1,27 @@
package org.greenplum.pxf.api.model;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

public interface Reloader {

void reloadAll();

void reload(String server);
}
Expand Up @@ -16,7 +16,6 @@
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.greenplum.pxf.api.OneRow;
import org.greenplum.pxf.api.error.PxfRuntimeException;
import org.greenplum.pxf.api.filter.FilterParser;
import org.greenplum.pxf.api.filter.Node;
import org.greenplum.pxf.api.filter.Operator;
Expand Down

0 comments on commit 1a66599

Please sign in to comment.