From 27b2949c53bbf18c3281caf4f832d4b0385301e7 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Wed, 21 Feb 2024 13:43:48 +0200 Subject: [PATCH 01/24] ADBDEV-4980: Implement reload and reloadAll methods for profiles - Add methods to the Plugin interface; - Implement methods for JDBC plugin. --- .../org/greenplum/pxf/api/model/BasePlugin.java | 8 ++++++++ .../java/org/greenplum/pxf/api/model/Plugin.java | 4 ++++ .../pxf/plugins/jdbc/JdbcBasePlugin.java | 11 +++++++++++ .../plugins/jdbc/utils/ConnectionManager.java | 16 ++++++++++++++++ .../pxf/plugins/jdbc/utils/PoolDescriptor.java | 3 ++- 5 files changed, 41 insertions(+), 1 deletion(-) diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/BasePlugin.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/BasePlugin.java index fe5b937638..ae4289f316 100644 --- a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/BasePlugin.java +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/BasePlugin.java @@ -34,6 +34,14 @@ public void setRequestContext(RequestContext context) { public void afterPropertiesSet() { } + @Override + public void reloadAll() { + } + + @Override + public void reload(String server) { + } + /** * When DEBUG mode is enabled, logs the total number of rows read, the * amount of time it took to read the file, and the average read speed diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Plugin.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Plugin.java index 9cbfefba6c..c280ac0da9 100644 --- a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Plugin.java +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Plugin.java @@ -16,4 +16,8 @@ public interface Plugin { * Invoked after the {@code RequestContext} has been bound */ void afterPropertiesSet(); + + void reloadAll(); + + void reload(String server); } diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java index 2169f7f4f2..703e8d2b8a 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java @@ -100,6 +100,7 @@ public class JdbcBasePlugin extends BasePlugin { private static final String MYSQL_DRIVER_PREFIX = "com.mysql."; private static final String JDBC_DATE_WIDE_RANGE = "jdbc.date.wideRange"; private static final String JDBC_DATE_WIDE_RANGE_LEGACY = "jdbc.date.wide-range"; + private enum TransactionIsolation { READ_UNCOMMITTED(1), READ_COMMITTED(2), @@ -493,6 +494,16 @@ public static void closeStatementAndConnection(Statement statement) throws SQLEx } } + @Override + public void reloadAll() { + connectionManager.reloadCache(poolDescriptor -> true); + } + + @Override + public void reload(String server) { + connectionManager.reloadCache(poolDescriptor -> poolDescriptor.getServer().equals(server)); + } + /** * For a Kerberized Hive JDBC connection, it creates a connection as the loginUser. * Otherwise, it returns a new connection. diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java index 72ef7f0781..b775a4c3b4 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java @@ -11,6 +11,7 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.greenplum.pxf.plugins.jdbc.PxfJdbcProperties; import org.slf4j.Logger; @@ -24,14 +25,18 @@ import java.sql.SQLException; import java.sql.SQLTimeoutException; import java.util.Properties; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; /** * Responsible for obtaining and maintaining JDBC connections to databases. If configured for a given server, * uses Hikari Connection Pool to pool database connections. */ +@Slf4j @Component public class ConnectionManager { @@ -78,6 +83,17 @@ public ConnectionManager(DataSourceFactory factory, Ticker ticker, PxfJdbcProper .build(CacheLoader.from(factory::createDataSource)); } + public void reloadCache(Predicate poolDescriptorFilter) { + Set poolDescriptorSet = dataSources.asMap().keySet().stream() + .filter(poolDescriptorFilter) + .collect(Collectors.toSet()); + poolDescriptorSet.forEach(pd -> { + log.info("Invalidate cache for pool descriptor {}", pd); + dataSources.invalidate(pd); + }); + cleanCache(); + } + /** * Explicitly runs cache maintenance operations. */ diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/PoolDescriptor.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/PoolDescriptor.java index 3026a04bf0..7685175a8a 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/PoolDescriptor.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/PoolDescriptor.java @@ -90,7 +90,8 @@ public int hashCode() { @Override public String toString() { return "PoolDescriptor{" + - "jdbcUrl=" + jdbcUrl + + "server=" + server + + ", jdbcUrl=" + jdbcUrl + ", user=" + user + ", password=" + ConnectionManager.maskPassword(password) + ", connectionConfig=" + connectionConfig + From c4972b875c11c947603999dddc1d43f71043a42f Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Mon, 26 Feb 2024 11:14:06 +0200 Subject: [PATCH 02/24] ADBDEV-4985: Implemented a new endpoint POST /pxf/reload --- .../service/profile/ProfileReloadService.java | 8 +++++++ .../profile/ProfileReloadServiceImpl.java | 12 ++++++++++ .../rest/MaintenanceRestController.java | 23 +++++++++++++++++++ .../rest/dto/ProfileReloadRequestDto.java | 11 +++++++++ 4 files changed, 54 insertions(+) create mode 100644 server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadService.java create mode 100644 server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java create mode 100644 server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java create mode 100644 server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/dto/ProfileReloadRequestDto.java diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadService.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadService.java new file mode 100644 index 0000000000..83efd74dc6 --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadService.java @@ -0,0 +1,8 @@ +package org.greenplum.pxf.service.profile; + +import org.greenplum.pxf.service.rest.dto.ProfileReloadRequestDto; + +public interface ProfileReloadService { + + void reloadProfile(ProfileReloadRequestDto reloadRequestDto); +} \ No newline at end of file diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java new file mode 100644 index 0000000000..e116f67020 --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java @@ -0,0 +1,12 @@ +package org.greenplum.pxf.service.profile; + +import org.greenplum.pxf.service.rest.dto.ProfileReloadRequestDto; +import org.springframework.stereotype.Service; + +@Service +public class ProfileReloadServiceImpl implements ProfileReloadService { + @Override + public void reloadProfile(ProfileReloadRequestDto reloadRequestDto) { + // Will be implemented as a part of the ADBDEV-4986 task + } +} diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java new file mode 100644 index 0000000000..924387bdfb --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java @@ -0,0 +1,23 @@ +package org.greenplum.pxf.service.rest; + +import org.greenplum.pxf.service.profile.ProfileReloadService; +import org.greenplum.pxf.service.rest.dto.ProfileReloadRequestDto; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/pxf") +public class MaintenanceRestController { + private final ProfileReloadService profileReloadService; + + public MaintenanceRestController(final ProfileReloadService profileReloadService) { + this.profileReloadService = profileReloadService; + } + + @PostMapping(value = "/reload") + public void reload(@RequestBody ProfileReloadRequestDto reloadRequestDto) { + profileReloadService.reloadProfile(reloadRequestDto); + } +} diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/dto/ProfileReloadRequestDto.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/dto/ProfileReloadRequestDto.java new file mode 100644 index 0000000000..ab3440fe05 --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/dto/ProfileReloadRequestDto.java @@ -0,0 +1,11 @@ +package org.greenplum.pxf.service.rest.dto; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +public class ProfileReloadRequestDto { + private final String profile; + private final String server; +} From 597551f380133c2a0b0e7eaf7d1e8768534868a1 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Tue, 27 Feb 2024 17:52:43 +0200 Subject: [PATCH 03/24] ADBDEV-4980: Add check for null and blank --- .../pxf/plugins/jdbc/JdbcBasePlugin.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java index 703e8d2b8a..adffd5e831 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java @@ -22,6 +22,7 @@ import io.arenadata.security.encryption.client.service.DecryptClient; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.greenplum.pxf.api.error.PxfRuntimeException; import org.greenplum.pxf.api.model.BasePlugin; import org.greenplum.pxf.api.model.RequestContext; import org.greenplum.pxf.api.security.SecureLogin; @@ -40,12 +41,8 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; import java.util.stream.Collectors; -import java.util.Objects; import static org.greenplum.pxf.api.security.SecureLogin.CONFIG_KEY_SERVICE_USER_IMPERSONATION; @@ -496,12 +493,22 @@ public static void closeStatementAndConnection(Statement statement) throws SQLEx @Override public void reloadAll() { - connectionManager.reloadCache(poolDescriptor -> true); + if (Objects.nonNull(connectionManager)) { + connectionManager.reloadCache(poolDescriptor -> true); + } else { + throw new PxfRuntimeException("Failed to reload profile. Connection manager is null."); + } } @Override public void reload(String server) { - connectionManager.reloadCache(poolDescriptor -> poolDescriptor.getServer().equals(server)); + if (Objects.isNull(connectionManager)) { + throw new PxfRuntimeException("Failed to reload profile. Connection manager is null."); + } else if (StringUtils.isBlank(server)) { + throw new PxfRuntimeException("Failed to reload profile. Parameter server is blank."); + } else { + connectionManager.reloadCache(poolDescriptor -> poolDescriptor.getServer().equals(server)); + } } /** From 20f23de6f2bb234a81f2aad33f557e33183ee457 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Tue, 27 Feb 2024 20:20:22 +0200 Subject: [PATCH 04/24] ADBDEV-4985: Add @RequiredArgsConstructor annotation --- .../pxf/service/rest/MaintenanceRestController.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java index 924387bdfb..e7bc588d28 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java @@ -1,5 +1,6 @@ package org.greenplum.pxf.service.rest; +import lombok.RequiredArgsConstructor; import org.greenplum.pxf.service.profile.ProfileReloadService; import org.greenplum.pxf.service.rest.dto.ProfileReloadRequestDto; import org.springframework.web.bind.annotation.PostMapping; @@ -8,14 +9,11 @@ import org.springframework.web.bind.annotation.RestController; @RestController +@RequiredArgsConstructor @RequestMapping("/pxf") public class MaintenanceRestController { private final ProfileReloadService profileReloadService; - public MaintenanceRestController(final ProfileReloadService profileReloadService) { - this.profileReloadService = profileReloadService; - } - @PostMapping(value = "/reload") public void reload(@RequestBody ProfileReloadRequestDto reloadRequestDto) { profileReloadService.reloadProfile(reloadRequestDto); From 786ed838a79d1aa0c55b39bbf3b2a6eb22489c3b Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Tue, 27 Feb 2024 21:26:04 +0200 Subject: [PATCH 05/24] ADBDEV-4980: Split ConnectionManager#reloadCache on two methods --- .../pxf/plugins/jdbc/JdbcBasePlugin.java | 10 +++++++--- .../plugins/jdbc/utils/ConnectionManager.java | 19 ++++++++++++++----- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java index adffd5e831..b0610894c5 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java @@ -41,7 +41,11 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; import java.util.stream.Collectors; import static org.greenplum.pxf.api.security.SecureLogin.CONFIG_KEY_SERVICE_USER_IMPERSONATION; @@ -494,7 +498,7 @@ public static void closeStatementAndConnection(Statement statement) throws SQLEx @Override public void reloadAll() { if (Objects.nonNull(connectionManager)) { - connectionManager.reloadCache(poolDescriptor -> true); + connectionManager.reloadCache(); } else { throw new PxfRuntimeException("Failed to reload profile. Connection manager is null."); } @@ -507,7 +511,7 @@ public void reload(String server) { } else if (StringUtils.isBlank(server)) { throw new PxfRuntimeException("Failed to reload profile. Parameter server is blank."); } else { - connectionManager.reloadCache(poolDescriptor -> poolDescriptor.getServer().equals(server)); + connectionManager.reloadCacheIf(poolDescriptor -> poolDescriptor.getServer().equals(server)); } } diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java index b775a4c3b4..25c1011b2a 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java @@ -24,6 +24,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.SQLTimeoutException; +import java.util.HashSet; import java.util.Properties; import java.util.Set; import java.util.concurrent.Executor; @@ -83,17 +84,25 @@ public ConnectionManager(DataSourceFactory factory, Ticker ticker, PxfJdbcProper .build(CacheLoader.from(factory::createDataSource)); } - public void reloadCache(Predicate poolDescriptorFilter) { + public void reloadCache() { + Set poolDescriptorSet = new HashSet<>(dataSources.asMap().keySet()); + poolDescriptorSet.forEach(this::invalidateCache); + cleanCache(); + } + + public void reloadCacheIf(Predicate poolDescriptorFilter) { Set poolDescriptorSet = dataSources.asMap().keySet().stream() .filter(poolDescriptorFilter) .collect(Collectors.toSet()); - poolDescriptorSet.forEach(pd -> { - log.info("Invalidate cache for pool descriptor {}", pd); - dataSources.invalidate(pd); - }); + poolDescriptorSet.forEach(this::invalidateCache); cleanCache(); } + private void invalidateCache(PoolDescriptor poolDescriptor) { + log.info("Invalidate cache for pool descriptor {}", poolDescriptor); + dataSources.invalidate(poolDescriptor); + } + /** * Explicitly runs cache maintenance operations. */ From 061408515a7d55ee1c2bbeb428150799185baa84 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Wed, 28 Feb 2024 10:48:38 +0200 Subject: [PATCH 06/24] ADBDEV-4980: Refactoring reloadCache methods --- .../plugins/jdbc/utils/ConnectionManager.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java index 25c1011b2a..8a5a45e7ca 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java @@ -24,9 +24,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.SQLTimeoutException; -import java.util.HashSet; import java.util.Properties; -import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -85,24 +83,22 @@ public ConnectionManager(DataSourceFactory factory, Ticker ticker, PxfJdbcProper } public void reloadCache() { - Set poolDescriptorSet = new HashSet<>(dataSources.asMap().keySet()); - poolDescriptorSet.forEach(this::invalidateCache); + log.info("Invalidate cache of all pool descriptors}"); + dataSources.invalidateAll(); cleanCache(); } public void reloadCacheIf(Predicate poolDescriptorFilter) { - Set poolDescriptorSet = dataSources.asMap().keySet().stream() + dataSources.asMap().keySet().stream() .filter(poolDescriptorFilter) - .collect(Collectors.toSet()); - poolDescriptorSet.forEach(this::invalidateCache); + .collect(Collectors.toSet()) + .forEach(poolDescriptor -> { + log.info("Invalidate cache of the pool descriptor {}", poolDescriptor); + dataSources.invalidate(poolDescriptor); + }); cleanCache(); } - private void invalidateCache(PoolDescriptor poolDescriptor) { - log.info("Invalidate cache for pool descriptor {}", poolDescriptor); - dataSources.invalidate(poolDescriptor); - } - /** * Explicitly runs cache maintenance operations. */ From 424d2d8125f67fa39fdd470d813e0c2465d8245f Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Wed, 28 Feb 2024 10:48:38 +0200 Subject: [PATCH 07/24] ADBDEV-4980: Refactoring reloadCache methods --- .../plugins/jdbc/utils/ConnectionManager.java | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java index 25c1011b2a..772f4945f4 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java @@ -24,9 +24,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.SQLTimeoutException; -import java.util.HashSet; import java.util.Properties; -import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -85,24 +83,22 @@ public ConnectionManager(DataSourceFactory factory, Ticker ticker, PxfJdbcProper } public void reloadCache() { - Set poolDescriptorSet = new HashSet<>(dataSources.asMap().keySet()); - poolDescriptorSet.forEach(this::invalidateCache); + log.info("Invalidate cache of all pool descriptors"); + dataSources.invalidateAll(); cleanCache(); } public void reloadCacheIf(Predicate poolDescriptorFilter) { - Set poolDescriptorSet = dataSources.asMap().keySet().stream() + dataSources.asMap().keySet().stream() .filter(poolDescriptorFilter) - .collect(Collectors.toSet()); - poolDescriptorSet.forEach(this::invalidateCache); + .collect(Collectors.toSet()) + .forEach(poolDescriptor -> { + log.info("Invalidate cache of the pool descriptor {}", poolDescriptor); + dataSources.invalidate(poolDescriptor); + }); cleanCache(); } - private void invalidateCache(PoolDescriptor poolDescriptor) { - log.info("Invalidate cache for pool descriptor {}", poolDescriptor); - dataSources.invalidate(poolDescriptor); - } - /** * Explicitly runs cache maintenance operations. */ From 2fc31559c331a9d1062531fdfd01b25208c847a5 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Wed, 28 Feb 2024 11:59:18 +0200 Subject: [PATCH 08/24] ADBDEV-4986: Implemented service to reload profile --- .../profile/ProfileReloadServiceImpl.java | 85 ++++++++++++++++++- .../pxf/service/profile/ProfilesConf.java | 6 +- .../service/utilities/BasePluginFactory.java | 25 +++--- 3 files changed, 102 insertions(+), 14 deletions(-) diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java index e116f67020..6bc3fa468b 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java @@ -1,12 +1,91 @@ package org.greenplum.pxf.service.profile; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.greenplum.pxf.api.error.PxfRuntimeException; +import org.greenplum.pxf.api.model.Plugin; import org.greenplum.pxf.service.rest.dto.ProfileReloadRequestDto; -import org.springframework.stereotype.Service; +import org.greenplum.pxf.service.utilities.BasePluginFactory; +import org.springframework.stereotype.Component; -@Service +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +@Slf4j +@Component +@RequiredArgsConstructor public class ProfileReloadServiceImpl implements ProfileReloadService { + private final static String ACCESSOR_KEY = "ACCESSOR"; + private final ProfilesConf profileConf; + private final BasePluginFactory basePluginFactory; + @Override public void reloadProfile(ProfileReloadRequestDto reloadRequestDto) { - // Will be implemented as a part of the ADBDEV-4986 task + String profile = reloadRequestDto.getProfile(); + String server = reloadRequestDto.getServer(); + log.info("Received a request to reload a profile with the parameters: profile={}, server={}", profile, server); + + if (StringUtils.isBlank(profile)) { + if (StringUtils.isBlank(server)) { + // TODO: Terminate all active queries before reload (ADBDEV-4987) + profileConf.getProfilesMap().keySet() + .forEach(this::reloadAll); + } else { + throw new IllegalArgumentException(String.format("The provided parameters (profile=%s, server=%s) " + + "are not correct. Please add profile", profile, server)); + } + } else if (StringUtils.isBlank(server)) { + // TODO: Terminate all active queries before reload (ADBDEV-4987) + reloadAll(profile); + } else { + // TODO: Terminate all active queries before reload (ADBDEV-4987) + reload(profile, server); + } + } + + private void reloadAll(String profile) { + Plugin accessor = getAccessor(profile); + if (Objects.nonNull(accessor)) { + log.info("Reload profile '{}' for all servers with accessor {}", profile, accessor); + accessor.reloadAll(); + } else { + log.info("Skipping reloading profile '{}' for all servers", profile); + } + } + + private void reload(String profile, String server) { + Plugin accessor = getAccessor(profile); + if (Objects.nonNull(accessor)) { + log.info("Reload profile '{}' for server '{}' with accessor {}", profile, server, accessor); + accessor.reload(server); + } else { + log.info("Skipping reloading profile '{}' for server '{}'", profile, server); + } + } + + private Plugin getAccessor(String profile) { + try { + Map pluginMap = profileConf.getPlugins(profile); + return Optional.ofNullable(pluginMap) + .map(plugins -> getAccessorInstance(plugins.get(ACCESSOR_KEY))) + .orElse(null); + } catch (Exception e) { + String message = String.format("Failed to get plugin. %s", e.getMessage()); + log.error(message); + throw new PxfRuntimeException(message, e); + } + } + + private Plugin getAccessorInstance(String accessorClassName) { + try { + Plugin instance = basePluginFactory.getPluginInstance(accessorClassName); + log.debug("Initialize instance {} of the accessor class {}", instance, accessorClassName); + return instance; + } catch (Exception e) { + log.error("Failed to initialize instance of the accessor class {}. {}", accessorClassName, e.getMessage()); + return null; + } } } diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfilesConf.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfilesConf.java index 28f51694d7..e9f8d80a68 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfilesConf.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfilesConf.java @@ -57,7 +57,7 @@ public class ProfilesConf implements PluginConf { private final String externalProfilesFilename; // maps a profileName --> Profile object - private Map profilesMap; + private final Map profilesMap; private Pattern dynamicProfilePattern; /** @@ -86,6 +86,10 @@ public ProfilesConf( @Value( "${pxf.profile.dynamic.regex}" ) String dynamicProf LOG.info("PXF profiles loaded: {}", profilesMap.keySet()); } + public Map getProfilesMap() { + return profilesMap; + } + @Override public Map getOptionMappings(String profileName) { Profile profile = getProfile(profileName); diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/utilities/BasePluginFactory.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/utilities/BasePluginFactory.java index 27ece3b33b..0e7ffea8c6 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/utilities/BasePluginFactory.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/utilities/BasePluginFactory.java @@ -16,6 +16,20 @@ public class BasePluginFactory { public T getPlugin(RequestContext context, String pluginClassName) { + Plugin instance = getPluginInstance(pluginClassName); + + // initialize the instance + instance.setRequestContext(context); + instance.afterPropertiesSet(); + + // cast into a target type + @SuppressWarnings("unchecked") + T castInstance = (T) instance; + + return castInstance; + } + + public Plugin getPluginInstance(String pluginClassName) { // get the class name of the plugin if (StringUtils.isBlank(pluginClassName)) { throw new RuntimeException("Could not determine plugin class name"); @@ -52,15 +66,6 @@ public T getPlugin(RequestContext context, String pluginClass } catch (Exception e) { throw new RuntimeException(String.format("Class %s could not be instantiated", pluginClassName), e); } - - // initialize the instance - instance.setRequestContext(context); - instance.afterPropertiesSet(); - - // cast into a target type - @SuppressWarnings("unchecked") - T castInstance = (T) instance; - - return castInstance; + return instance; } } From de429e40a4a7bcab96afc777f66270227a0dc9ed Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Thu, 29 Feb 2024 11:00:43 +0200 Subject: [PATCH 09/24] ADBDEV-4986: Replace null by Optional --- .../profile/ProfileReloadServiceImpl.java | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java index 6bc3fa468b..0b12f5d79e 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java @@ -10,7 +10,6 @@ import org.springframework.stereotype.Component; import java.util.Map; -import java.util.Objects; import java.util.Optional; @Slf4j @@ -46,33 +45,32 @@ public void reloadProfile(ProfileReloadRequestDto reloadRequestDto) { } private void reloadAll(String profile) { - Plugin accessor = getAccessor(profile); - if (Objects.nonNull(accessor)) { - log.info("Reload profile '{}' for all servers with accessor {}", profile, accessor); - accessor.reloadAll(); + Optional accessor = getAccessor(profile); + if (accessor.isPresent()) { + log.info("Reload profile '{}' for all servers with accessor {}", profile, accessor.get()); + accessor.get().reloadAll(); } else { log.info("Skipping reloading profile '{}' for all servers", profile); } } private void reload(String profile, String server) { - Plugin accessor = getAccessor(profile); - if (Objects.nonNull(accessor)) { - log.info("Reload profile '{}' for server '{}' with accessor {}", profile, server, accessor); - accessor.reload(server); + Optional accessor = getAccessor(profile); + if (accessor.isPresent()) { + log.info("Reload profile '{}' for server '{}' with accessor {}", profile, server, accessor.get()); + accessor.get().reload(server); } else { log.info("Skipping reloading profile '{}' for server '{}'", profile, server); } } - private Plugin getAccessor(String profile) { + private Optional getAccessor(String profile) { try { Map pluginMap = profileConf.getPlugins(profile); return Optional.ofNullable(pluginMap) - .map(plugins -> getAccessorInstance(plugins.get(ACCESSOR_KEY))) - .orElse(null); + .map(plugins -> getAccessorInstance(plugins.get(ACCESSOR_KEY))); } catch (Exception e) { - String message = String.format("Failed to get plugin. %s", e.getMessage()); + String message = String.format("Failed to get plugin of the profile '%s'. %s", profile, e.getMessage()); log.error(message); throw new PxfRuntimeException(message, e); } From efe84d21d76716064b43459283511f4102c06c08 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Mon, 4 Mar 2024 11:59:22 +0200 Subject: [PATCH 10/24] ADBDEV-4980: Refactoring reloader`s methods - Add new interface Reloader; - Moved all methods from the Plugin interface to the Reloader; - Add implementation of the Reloader interface to the JdbcBasePlugin; - Refactor ConnectionManager#reloadCacheIf method --- .../org/greenplum/pxf/api/model/BasePlugin.java | 8 -------- .../org/greenplum/pxf/api/model/Plugin.java | 4 ---- .../org/greenplum/pxf/api/model/Reloader.java | 8 ++++++++ .../pxf/plugins/jdbc/JdbcBasePlugin.java | 3 ++- .../plugins/jdbc/utils/ConnectionManager.java | 17 +++++++++-------- 5 files changed, 19 insertions(+), 21 deletions(-) create mode 100644 server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Reloader.java diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/BasePlugin.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/BasePlugin.java index ae4289f316..fe5b937638 100644 --- a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/BasePlugin.java +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/BasePlugin.java @@ -34,14 +34,6 @@ public void setRequestContext(RequestContext context) { public void afterPropertiesSet() { } - @Override - public void reloadAll() { - } - - @Override - public void reload(String server) { - } - /** * When DEBUG mode is enabled, logs the total number of rows read, the * amount of time it took to read the file, and the average read speed diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Plugin.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Plugin.java index c280ac0da9..9cbfefba6c 100644 --- a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Plugin.java +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Plugin.java @@ -16,8 +16,4 @@ public interface Plugin { * Invoked after the {@code RequestContext} has been bound */ void afterPropertiesSet(); - - void reloadAll(); - - void reload(String server); } diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Reloader.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Reloader.java new file mode 100644 index 0000000000..89a71aa217 --- /dev/null +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Reloader.java @@ -0,0 +1,8 @@ +package org.greenplum.pxf.api.model; + +public interface Reloader { + + void reloadAll(); + + void reload(String server); +} diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java index b0610894c5..84a6d75f02 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcBasePlugin.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.greenplum.pxf.api.error.PxfRuntimeException; import org.greenplum.pxf.api.model.BasePlugin; +import org.greenplum.pxf.api.model.Reloader; import org.greenplum.pxf.api.model.RequestContext; import org.greenplum.pxf.api.security.SecureLogin; import org.greenplum.pxf.api.utilities.ColumnDescriptor; @@ -55,7 +56,7 @@ *

* Implemented subclasses: {@link JdbcAccessor}, {@link JdbcResolver}. */ -public class JdbcBasePlugin extends BasePlugin { +public class JdbcBasePlugin extends BasePlugin implements Reloader { private static final Logger LOG = LoggerFactory.getLogger(JdbcBasePlugin.class); diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java index 772f4945f4..52a14468a4 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java @@ -25,6 +25,7 @@ import java.sql.SQLException; import java.sql.SQLTimeoutException; import java.util.Properties; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -83,20 +84,20 @@ public ConnectionManager(DataSourceFactory factory, Ticker ticker, PxfJdbcProper } public void reloadCache() { - log.info("Invalidate cache of all pool descriptors"); dataSources.invalidateAll(); + log.info("Invalidate cache of all pool descriptors"); cleanCache(); } public void reloadCacheIf(Predicate poolDescriptorFilter) { - dataSources.asMap().keySet().stream() + Set poolDescriptorSet = dataSources.asMap().keySet().stream() .filter(poolDescriptorFilter) - .collect(Collectors.toSet()) - .forEach(poolDescriptor -> { - log.info("Invalidate cache of the pool descriptor {}", poolDescriptor); - dataSources.invalidate(poolDescriptor); - }); - cleanCache(); + .collect(Collectors.toSet()); + if (!poolDescriptorSet.isEmpty()) { + dataSources.invalidateAll(poolDescriptorSet); + log.info("Invalidate cache of the pool descriptor(s): {}", poolDescriptorSet); + cleanCache(); + } } /** From 3a943b6df68d85f84428b541508f91bc61103a7e Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Mon, 4 Mar 2024 17:33:49 +0200 Subject: [PATCH 11/24] ADBDEV-4986: Refactoring service to reload profile -Add ProfileReloadConfiguration to initialize map with profiles which support reloading; --- .../profile/ProfileReloadConfiguration.java | 58 ++++++++++++ .../profile/ProfileReloadServiceImpl.java | 93 ++++++++----------- 2 files changed, 95 insertions(+), 56 deletions(-) create mode 100644 server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadConfiguration.java diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadConfiguration.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadConfiguration.java new file mode 100644 index 0000000000..524e2a47f5 --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadConfiguration.java @@ -0,0 +1,58 @@ +package org.greenplum.pxf.service.profile; + +import lombok.extern.slf4j.Slf4j; +import org.greenplum.pxf.api.model.Plugin; +import org.greenplum.pxf.api.model.Reloader; +import org.greenplum.pxf.service.utilities.BasePluginFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +@Slf4j +@Configuration +public class ProfileReloadConfiguration { + private final static String ACCESSOR_KEY = "ACCESSOR"; + + @Lazy + @Bean("profileReloaderMap") + public Map getProfileReloaderMap(ProfilesConf profileConf, BasePluginFactory basePluginFactory) { + Map profileReloaderMap = new HashMap<>(); + profileConf.getProfilesMap().keySet() + .forEach(profile -> getReloader(profile, profileConf, basePluginFactory) + .ifPresent(reloader -> profileReloaderMap.put(profile, reloader))); + return profileReloaderMap; + } + + private Optional getReloader(String profile, + ProfilesConf profileConf, + BasePluginFactory basePluginFactory) { + try { + Map pluginMap = profileConf.getPlugins(profile); + return Optional.ofNullable(pluginMap) + .map(plugins -> getReloaderInstance(plugins.get(ACCESSOR_KEY), basePluginFactory, profile)); + } catch (Exception e) { + log.warn("Profile '{}': Failed to get plugin map", profile); + } + return Optional.empty(); + } + + private Reloader getReloaderInstance(String accessorClassName, BasePluginFactory basePluginFactory, String profile) { + try { + Plugin instance = basePluginFactory.getPluginInstance(accessorClassName); + log.debug("Profile '{}': Initialize instance {} of the accessor class {}", profile, instance, accessorClassName); + if (Reloader.class.isAssignableFrom(instance.getClass())) { + return (Reloader) instance; + } else { + log.debug("Profile '{}': Accessor class {} doesn't implement Reloader interface", profile, accessorClassName); + } + } catch (Exception e) { + log.warn("Profile '{}': Failed to initialize instance of the accessor class {}. {}", + profile, accessorClassName, e.getMessage()); + } + return null; + } +} diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java index 0b12f5d79e..b5ca05d2aa 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java @@ -1,89 +1,70 @@ package org.greenplum.pxf.service.profile; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.greenplum.pxf.api.error.PxfRuntimeException; -import org.greenplum.pxf.api.model.Plugin; +import org.greenplum.pxf.api.model.Reloader; import org.greenplum.pxf.service.rest.dto.ProfileReloadRequestDto; -import org.greenplum.pxf.service.utilities.BasePluginFactory; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import java.util.Map; -import java.util.Optional; +import java.util.Objects; +import java.util.function.Predicate; @Slf4j @Component -@RequiredArgsConstructor public class ProfileReloadServiceImpl implements ProfileReloadService { - private final static String ACCESSOR_KEY = "ACCESSOR"; - private final ProfilesConf profileConf; - private final BasePluginFactory basePluginFactory; + private final Map profileReloaderMap; + + public ProfileReloadServiceImpl(@Lazy Map profileReloaderMap) { + this.profileReloaderMap = profileReloaderMap; + } @Override public void reloadProfile(ProfileReloadRequestDto reloadRequestDto) { String profile = reloadRequestDto.getProfile(); String server = reloadRequestDto.getServer(); log.info("Received a request to reload a profile with the parameters: profile={}, server={}", profile, server); - - if (StringUtils.isBlank(profile)) { - if (StringUtils.isBlank(server)) { - // TODO: Terminate all active queries before reload (ADBDEV-4987) - profileConf.getProfilesMap().keySet() - .forEach(this::reloadAll); - } else { - throw new IllegalArgumentException(String.format("The provided parameters (profile=%s, server=%s) " + - "are not correct. Please add profile", profile, server)); - } - } else if (StringUtils.isBlank(server)) { - // TODO: Terminate all active queries before reload (ADBDEV-4987) + if (StringUtils.isBlank(server)) { reloadAll(profile); - } else { - // TODO: Terminate all active queries before reload (ADBDEV-4987) + } else if (StringUtils.isNotBlank(profile)) { reload(profile, server); + } else { + throw new IllegalArgumentException(String.format("The provided parameters (profile=%s, server=%s) " + + "are not correct. Please add profile", profile, server)); } } private void reloadAll(String profile) { - Optional accessor = getAccessor(profile); - if (accessor.isPresent()) { - log.info("Reload profile '{}' for all servers with accessor {}", profile, accessor.get()); - accessor.get().reloadAll(); - } else { - log.info("Skipping reloading profile '{}' for all servers", profile); - } + Predicate profilePredicate = key -> (StringUtils.isBlank(profile) || key.equals(profile)); + profileReloaderMap.forEach((profileName, reloader) -> { + if (profilePredicate.test(profileName)) { + // TODO: Terminate all active queries before reload (ADBDEV-4987) + reloader.reloadAll(); + log.info("Reload profile '{}' for all servers with reloader {}", profileName, reloader); + } else { + String message = String.format( + "Profile '%s' doesn't support reloading methods. Skipping reloading for all servers. " + + "Profiles with supporting reloading methods: %s", profile, profileReloaderMap.keySet()); + log.error(message); + throw new PxfRuntimeException(message); + } + }); } private void reload(String profile, String server) { - Optional accessor = getAccessor(profile); - if (accessor.isPresent()) { - log.info("Reload profile '{}' for server '{}' with accessor {}", profile, server, accessor.get()); - accessor.get().reload(server); + Reloader reloader = profileReloaderMap.get(profile); + if (Objects.nonNull(reloader)) { + // TODO: Terminate all active queries before reload (ADBDEV-4987) + reloader.reload(server); + log.info("Reload profile '{}' for server '{}' with reloader {}", profile, server, reloader); } else { - log.info("Skipping reloading profile '{}' for server '{}'", profile, server); - } - } - - private Optional getAccessor(String profile) { - try { - Map pluginMap = profileConf.getPlugins(profile); - return Optional.ofNullable(pluginMap) - .map(plugins -> getAccessorInstance(plugins.get(ACCESSOR_KEY))); - } catch (Exception e) { - String message = String.format("Failed to get plugin of the profile '%s'. %s", profile, e.getMessage()); + String message = String.format( + "Profile '%s' doesn't support reloading methods. Skipping reloading for server '%s'. " + + "Profiles with supporting reloading methods: %s", profile, server, profileReloaderMap.keySet()); log.error(message); - throw new PxfRuntimeException(message, e); - } - } - - private Plugin getAccessorInstance(String accessorClassName) { - try { - Plugin instance = basePluginFactory.getPluginInstance(accessorClassName); - log.debug("Initialize instance {} of the accessor class {}", instance, accessorClassName); - return instance; - } catch (Exception e) { - log.error("Failed to initialize instance of the accessor class {}. {}", accessorClassName, e.getMessage()); - return null; + throw new PxfRuntimeException(message); } } } From e315480b18d53f33ac92c4a5c01fc56622d15d7a Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Wed, 6 Mar 2024 19:35:51 +0200 Subject: [PATCH 12/24] ADBDEV-4980: Add closing hikari data source Additionally, we need to close hikari data source. It might take for a long time to close in case of active connections. --- .../plugins/jdbc/utils/ConnectionManager.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java index 52a14468a4..b4495b4e41 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/utils/ConnectionManager.java @@ -25,12 +25,10 @@ import java.sql.SQLException; import java.sql.SQLTimeoutException; import java.util.Properties; -import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; -import java.util.stream.Collectors; /** * Responsible for obtaining and maintaining JDBC connections to databases. If configured for a given server, @@ -84,20 +82,25 @@ public ConnectionManager(DataSourceFactory factory, Ticker ticker, PxfJdbcProper } public void reloadCache() { - dataSources.invalidateAll(); - log.info("Invalidate cache of all pool descriptors"); - cleanCache(); + dataSources.asMap().forEach(this::invalidateAndCloseDaraSource); } public void reloadCacheIf(Predicate poolDescriptorFilter) { - Set poolDescriptorSet = dataSources.asMap().keySet().stream() - .filter(poolDescriptorFilter) - .collect(Collectors.toSet()); - if (!poolDescriptorSet.isEmpty()) { - dataSources.invalidateAll(poolDescriptorSet); - log.info("Invalidate cache of the pool descriptor(s): {}", poolDescriptorSet); - cleanCache(); - } + dataSources.asMap() + .forEach((poolDescriptor, hikariDataSource) -> { + if (poolDescriptorFilter.test(poolDescriptor)) { + invalidateAndCloseDaraSource(poolDescriptor, hikariDataSource); + } + }); + } + + private void invalidateAndCloseDaraSource(PoolDescriptor poolDescriptor, HikariDataSource hds) { + log.debug("Datasource: {}; Number of active connection: {}; Pool descriptor: {}", + hds.getPoolName(), hds.getHikariPoolMXBean().getActiveConnections(), poolDescriptor); + dataSources.invalidate(poolDescriptor); + hds.close(); + cleanCache(); + log.info("Invalidated and closed datasource {}. Pool descriptor: {}", hds.getPoolName(), poolDescriptor); } /** @@ -260,7 +263,5 @@ public Connection getConnection(String url, java.util.Properties info) throws SQ public Driver getDriver(String url) throws SQLException { return DriverManager.getDriver(url); } - } } - From c9ea907117375645e2aa168d20a460e8d656b418 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Wed, 6 Mar 2024 21:00:34 +0200 Subject: [PATCH 13/24] ADBDEV-4980: Fix Pool Descriptor unit test --- .../greenplum/pxf/plugins/jdbc/utils/PoolDescriptorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/pxf-jdbc/src/test/java/org/greenplum/pxf/plugins/jdbc/utils/PoolDescriptorTest.java b/server/pxf-jdbc/src/test/java/org/greenplum/pxf/plugins/jdbc/utils/PoolDescriptorTest.java index 7f4e393d0c..cb6ac6fda4 100644 --- a/server/pxf-jdbc/src/test/java/org/greenplum/pxf/plugins/jdbc/utils/PoolDescriptorTest.java +++ b/server/pxf-jdbc/src/test/java/org/greenplum/pxf/plugins/jdbc/utils/PoolDescriptorTest.java @@ -132,9 +132,9 @@ public void testPoolDescriptorHashCodeAndEquals() { @Test public void testPoolDescriptorToString() { poolDescriptor = new PoolDescriptor("test-server", "test-jdbcUrl", connConfig, poolConfig, null); - assertEquals("PoolDescriptor{jdbcUrl=test-jdbcUrl, user=test-user, password=*************, connectionConfig={test-other-property=test-other-property-value}, poolConfig={test-pool-property=test-pool-property-value}, qualifier=null}", poolDescriptor.toString()); + assertEquals("PoolDescriptor{server=test-server, jdbcUrl=test-jdbcUrl, user=test-user, password=*************, connectionConfig={test-other-property=test-other-property-value}, poolConfig={test-pool-property=test-pool-property-value}, qualifier=null}", poolDescriptor.toString()); poolDescriptor = new PoolDescriptor("test-server", "test-jdbcUrl", connConfig, poolConfig, "foo"); - assertEquals("PoolDescriptor{jdbcUrl=test-jdbcUrl, user=test-user, password=*************, connectionConfig={test-other-property=test-other-property-value}, poolConfig={test-pool-property=test-pool-property-value}, qualifier=foo}", poolDescriptor.toString()); + assertEquals("PoolDescriptor{server=test-server, jdbcUrl=test-jdbcUrl, user=test-user, password=*************, connectionConfig={test-other-property=test-other-property-value}, poolConfig={test-pool-property=test-pool-property-value}, qualifier=foo}", poolDescriptor.toString()); } private void testInvalidProperty(String property) { From d7e005c5a8848f6972bc3b6f85de7d7a5b3dfe4a Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Wed, 6 Mar 2024 20:55:16 +0200 Subject: [PATCH 14/24] ADBDEV-4987: Implement additional methods to cancel read and write requests - Add cancelIteration() method to Bridge interface; - Add cancelWrite() and cancelRead() methods to Accessor interface; - Implement additional logic to cancelWrite() and cancelRead() methods in JdbcAccessor. --- .../pxf/api/examples/DemoAccessor.java | 18 +++ .../org/greenplum/pxf/api/model/Accessor.java | 14 ++ .../pxf/api/utilities/UtilitiesTest.java | 15 ++ .../diagnostic/UserDataVerifyAccessor.java | 9 ++ .../pxf/plugins/hbase/HBaseAccessor.java | 16 +++ .../pxf/plugins/hdfs/AvroFileAccessor.java | 8 ++ .../plugins/hdfs/HdfsAtomicDataAccessor.java | 8 ++ .../hdfs/HdfsSplittableDataAccessor.java | 8 ++ .../pxf/plugins/hdfs/LineBreakAccessor.java | 8 ++ .../pxf/plugins/hdfs/ParquetFileAccessor.java | 16 +++ .../plugins/hdfs/QuotedLineBreakAccessor.java | 8 ++ .../plugins/hdfs/SequenceFileAccessor.java | 8 ++ .../hdfs/orc/ORCVectorizedAccessor.java | 17 ++- .../hdfs/HdfsSplittableDataAccessorTest.java | 4 + .../pxf/plugins/hive/HiveAccessor.java | 8 ++ .../pxf/plugins/jdbc/JdbcAccessor.java | 128 +++++++++++------- .../writercallable/BatchWriterCallable.java | 2 + .../writercallable/SimpleWriterCallable.java | 2 + .../pxf/plugins/s3/S3SelectAccessor.java | 13 ++ .../greenplum/pxf/service/bridge/Bridge.java | 6 + .../pxf/service/bridge/ReadBridge.java | 14 +- .../pxf/service/bridge/WriteBridge.java | 14 +- .../pxf/service/bridge/BaseBridgeTest.java | 4 + 23 files changed, 295 insertions(+), 53 deletions(-) diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/examples/DemoAccessor.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/examples/DemoAccessor.java index b6c35b8a01..08a03677b6 100644 --- a/server/pxf-api/src/main/java/org/greenplum/pxf/api/examples/DemoAccessor.java +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/examples/DemoAccessor.java @@ -84,6 +84,14 @@ public void closeForRead() throws Exception { /* Demo close doesn't do anything */ } + /** + * cancel read operation. no action here + */ + @Override + public void cancelRead() throws Exception { + /* Demo cancel doesn't do anything */ + } + /** * Opens the resource for write. * @@ -116,4 +124,14 @@ public boolean writeNextObject(OneRow onerow) throws Exception { public void closeForWrite() throws Exception { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); } + + /** + * Cancel write operation. + * + * @throws Exception if the cancel operation failed + */ + @Override + public void cancelWrite() throws Exception { + throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); + } } diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Accessor.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Accessor.java index 733c6e9fb9..64675f65dd 100644 --- a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Accessor.java +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Accessor.java @@ -49,6 +49,13 @@ public interface Accessor extends Plugin { */ void closeForRead() throws Exception; + /** + * Cancel read operation. Might contain additional logic comparing with {@link Accessor#closeForRead()} + * + * @throws Exception if the cancel operation failed + */ + void cancelRead() throws Exception; + /** * Opens the resource for write. * @@ -72,4 +79,11 @@ public interface Accessor extends Plugin { * @throws Exception if closing the resource failed */ void closeForWrite() throws Exception; + + /** + * Cancel write operation. Might contain additional logic comparing with {@link Accessor#closeForWrite()} + * + * @throws Exception if the cancel the operation failed + */ + void cancelWrite() throws Exception; } diff --git a/server/pxf-api/src/test/java/org/greenplum/pxf/api/utilities/UtilitiesTest.java b/server/pxf-api/src/test/java/org/greenplum/pxf/api/utilities/UtilitiesTest.java index 15bf89f349..b8721c9448 100644 --- a/server/pxf-api/src/test/java/org/greenplum/pxf/api/utilities/UtilitiesTest.java +++ b/server/pxf-api/src/test/java/org/greenplum/pxf/api/utilities/UtilitiesTest.java @@ -62,6 +62,10 @@ public OneRow readNextObject() { public void closeForRead() { } + @Override + public void cancelRead() { + } + @Override public boolean openForWrite() { return false; @@ -74,7 +78,10 @@ public boolean writeNextObject(OneRow onerow) { @Override public void closeForWrite() { + } + @Override + public void cancelWrite() { } @Override @@ -111,6 +118,10 @@ public OneRow readNextObject() { public void closeForRead() { } + @Override + public void cancelRead() { + } + @Override public boolean openForWrite() { return false; @@ -125,6 +136,10 @@ public boolean writeNextObject(OneRow onerow) { public void closeForWrite() { } + @Override + public void cancelWrite() { + } + @Override public void setRequestContext(RequestContext context) { } diff --git a/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java b/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java index 48aa6da3cc..31bdc4deba 100644 --- a/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java +++ b/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java @@ -62,6 +62,10 @@ public OneRow readNextObject() { public void closeForRead() { } + @Override + public void cancelRead() { + } + @Override public boolean openForWrite() { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); @@ -76,4 +80,9 @@ public boolean writeNextObject(OneRow onerow) { public void closeForWrite() { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); } + + @Override + public void cancelWrite() { + throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); + } } diff --git a/server/pxf-hbase/src/main/java/org/greenplum/pxf/plugins/hbase/HBaseAccessor.java b/server/pxf-hbase/src/main/java/org/greenplum/pxf/plugins/hbase/HBaseAccessor.java index 5eef76a69f..912fa62a5f 100644 --- a/server/pxf-hbase/src/main/java/org/greenplum/pxf/plugins/hbase/HBaseAccessor.java +++ b/server/pxf-hbase/src/main/java/org/greenplum/pxf/plugins/hbase/HBaseAccessor.java @@ -146,6 +146,14 @@ public void closeForRead() throws Exception { HBaseUtilities.closeConnection(null, connection); } + /** + * Cancel read operation. + */ + @Override + public void cancelRead() { + throw new UnsupportedOperationException("Cancel read operation is not supported"); + } + /** * Opens the resource for write. * @@ -175,6 +183,14 @@ public void closeForWrite() { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); } + /** + * Cancel write operation. + */ + @Override + public void cancelWrite() { + throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); + } + /** * Returns the next row in the HBase table, null if end of fragment. */ diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/AvroFileAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/AvroFileAccessor.java index 99bcf8d25f..35dccc0005 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/AvroFileAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/AvroFileAccessor.java @@ -218,6 +218,14 @@ public void closeForWrite() throws Exception { context.getServerName()); } + /** + * Cancel write operation. + */ + @Override + public void cancelWrite() { + throw new UnsupportedOperationException("Cancel write operation is not supported"); + } + /** * Closes the resource for write. * diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java index 6ae1bea85d..c164db88f2 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java @@ -105,6 +105,14 @@ public void closeForRead() throws Exception { } } + /** + * Cancel read operation. + */ + @Override + public void cancelRead() { + throw new UnsupportedOperationException("Cancel read operation is not supported"); + } + /* * Making sure that only the segment that got assigned the first data * fragment will read the (whole) file. diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java index b0a9657a9f..c0ec3e0873 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java @@ -142,6 +142,14 @@ public void closeForRead() throws Exception { } } + /** + * Cancel read operation. + */ + @Override + public void cancelRead() { + throw new UnsupportedOperationException("Cancel read operation is not supported"); + } + /** * Helper routine to get compression codec by class name or alias. * diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java index a6dc865874..b06f00664b 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java @@ -195,6 +195,14 @@ public void closeForWrite() throws IOException { } } + /** + * Cancel write operation. + */ + @Override + public void cancelWrite() { + throw new UnsupportedOperationException("Cancel write operation is not supported"); + } + /* * Creates output stream from given file. If compression codec is provided, * wrap it around stream. diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java index f5c529f2a2..566c424dc7 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java @@ -210,6 +210,14 @@ public void closeForRead() throws IOException { } } + /** + * Cancel read operation. + */ + @Override + public void cancelRead() throws IOException { + throw new UnsupportedOperationException("Cancel read operation is not supported"); + } + /** * Opens the resource for write. * Uses compression codec based on user input which @@ -294,6 +302,14 @@ public void closeForWrite() throws IOException, InterruptedException { context.getServerName()); } + /** + * Cancel write operation. + */ + @Override + public void cancelWrite() { + throw new UnsupportedOperationException("Cancel write operation is not supported"); + } + /** * Returns the parquet record filter for the given filter string * diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/QuotedLineBreakAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/QuotedLineBreakAccessor.java index 9610ec9244..6110341e68 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/QuotedLineBreakAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/QuotedLineBreakAccessor.java @@ -179,4 +179,12 @@ public boolean writeNextObject(OneRow onerow) { public void closeForWrite() { throw new UnsupportedOperationException(String.format(UNSUPPORTED_ERR_MESSAGE, context.getProfile())); } + + /** + * Cancel write operation. + */ + @Override + public void cancelWrite() { + throw new UnsupportedOperationException(String.format(UNSUPPORTED_ERR_MESSAGE, context.getProfile())); + } } diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java index cb4836fe0f..1f9e5884da 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java @@ -201,6 +201,14 @@ public void closeForWrite() throws Exception { } } + /** + * Cancel write operation. + */ + @Override + public void cancelWrite() { + throw new UnsupportedOperationException("Cancel write operation is not supported"); + } + public CompressionType getCompressionType() { return compressionType; } diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/orc/ORCVectorizedAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/orc/ORCVectorizedAccessor.java index 33f0d2e487..7c610a6774 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/orc/ORCVectorizedAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/orc/ORCVectorizedAccessor.java @@ -16,7 +16,6 @@ import org.apache.orc.TypeDescription; import org.apache.orc.Writer; import org.greenplum.pxf.api.OneRow; -import org.greenplum.pxf.api.error.PxfRuntimeException; import org.greenplum.pxf.api.filter.FilterParser; import org.greenplum.pxf.api.filter.Node; import org.greenplum.pxf.api.filter.Operator; @@ -164,6 +163,14 @@ public void closeForRead() throws IOException { } } + /** + * Cancel read operation. + */ + @Override + public void cancelRead() { + throw new UnsupportedOperationException("Cancel read operation is not supported"); + } + @Override public boolean openForWrite() throws IOException { HcfsType hcfsType = HcfsType.getHcfsType(context); @@ -218,6 +225,14 @@ public void closeForWrite() throws IOException { } } + /** + * Cancel write operation. + */ + @Override + public void cancelWrite() { + throw new UnsupportedOperationException("Cancel write operation is not supported"); + } + /** * Given a filter string, builds the SearchArgument object to perform * predicated pushdown for ORC diff --git a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessorTest.java b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessorTest.java index 44e0d6d48b..5ca525cf80 100644 --- a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessorTest.java +++ b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessorTest.java @@ -39,6 +39,10 @@ public void closeForWrite() throws Exception { } + @Override + public void cancelWrite() { + } + @Override protected Object getReader(JobConf jobConf, InputSplit split) throws IOException { return null; diff --git a/server/pxf-hive/src/main/java/org/greenplum/pxf/plugins/hive/HiveAccessor.java b/server/pxf-hive/src/main/java/org/greenplum/pxf/plugins/hive/HiveAccessor.java index cc17c5217d..1a4973b366 100644 --- a/server/pxf-hive/src/main/java/org/greenplum/pxf/plugins/hive/HiveAccessor.java +++ b/server/pxf-hive/src/main/java/org/greenplum/pxf/plugins/hive/HiveAccessor.java @@ -337,6 +337,14 @@ public void closeForWrite() { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); } + /** + * Cancel write operation. + */ + @Override + public void cancelWrite() { + throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); + } + /** * Creates the RecordReader suitable for this given split. * diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java index e6eb85a33b..a596ef1d56 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java @@ -22,6 +22,7 @@ import io.arenadata.security.encryption.client.service.DecryptClient; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; import org.greenplum.pxf.api.OneRow; import org.greenplum.pxf.api.error.PxfRuntimeException; import org.greenplum.pxf.api.model.Accessor; @@ -41,10 +42,9 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.SQLTimeoutException; import java.sql.Statement; -import java.util.LinkedList; -import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -70,7 +70,8 @@ public class JdbcAccessor extends JdbcBasePlugin implements Accessor { private WriterCallableFactory writerCallableFactory = null; private WriterCallable writerCallable = null; private ExecutorService executorServiceWrite = null; - private List> poolTasks = null; + private ConcurrentLinkedQueue> poolTasks = null; + private Exception firstException = null; /** * Creates a new instance of the JdbcAccessor @@ -94,11 +95,10 @@ public JdbcAccessor() { * Create query, open JDBC connection, execute query and store the result into resultSet * * @return true if successful - * @throws SQLException if a database access error occurs - * @throws SQLTimeoutException if a problem with the connection occurs + * @throws SQLException if a database access error occurs */ @Override - public boolean openForRead() throws SQLException, SQLTimeoutException { + public boolean openForRead() throws SQLException { if (statementRead != null && !statementRead.isClosed()) { return true; } @@ -182,16 +182,25 @@ public void closeForRead() throws SQLException { closeStatementAndConnection(statementRead); } + /** + * Cancel read operation. + * + * @throws SQLException if the cancel operation failed + */ + @Override + public void cancelRead() throws SQLException { + closeForRead(); + } + /** * openForWrite() implementation * Create query template and open JDBC connection * * @return true if successful - * @throws SQLException if a database access error occurs - * @throws SQLTimeoutException if a problem with the connection occurs + * @throws SQLException if a database access error occurs */ @Override - public boolean openForWrite() throws SQLException, SQLTimeoutException { + public boolean openForWrite() throws SQLException { if (queryName != null) { throw new IllegalArgumentException("specifying query name in data path is not supported for JDBC writable external tables"); } @@ -231,7 +240,7 @@ public boolean openForWrite() throws SQLException, SQLTimeoutException { } if (poolSize > 1) { executorServiceWrite = Executors.newFixedThreadPool(poolSize); - poolTasks = new LinkedList<>(); + poolTasks = new ConcurrentLinkedQueue<>(); } // Setup WriterCallableFactory @@ -268,17 +277,22 @@ public boolean writeNextObject(OneRow row) throws Exception { if (writerCallable.isCallRequired()) { if (poolSize > 1) { // Pooling is used. Create new writerCallable - poolTasks.add(executorServiceWrite.submit(writerCallable)); - writerCallable = writerCallableFactory.get(); + if (!executorServiceWrite.isShutdown() && !executorServiceWrite.isTerminated()) { + poolTasks.add(executorServiceWrite.submit(writerCallable)); + writerCallable = writerCallableFactory.get(); + } else { + firstException = new PxfRuntimeException("Writer executor service pool was shutdown or terminated"); + throw firstException; + } } else { // Pooling is not used, call directly and process potential error SQLException e = writerCallable.call(); if (e != null) { - throw e; + firstException = e; + throw firstException; } } } - return true; } @@ -294,53 +308,59 @@ public void closeForWrite() throws Exception { } try { - if (poolSize > 1) { - // Process thread pool - Exception firstException = null; - for (Future task : poolTasks) { - // We need this construction to ensure that we try to close all connections opened by pool threads - try { - SQLException currentSqlException = task.get(); - if (currentSqlException != null) { - if (firstException == null) { - firstException = currentSqlException; + if (firstException == null) { + if (poolSize > 1) { + // Process thread pool + for (Future task : poolTasks) { + Exception exception; + // We need this construction to ensure that we try to close all connections opened by pool threads + try { + exception = task.get(); + if (exception != null) { + LOG.error("A SQLException in a pool thread occurred: {}", ExceptionUtils.getMessage(exception)); } - LOG.error( - "A SQLException in a pool thread occurred: " + currentSqlException.getClass() + " " + currentSqlException.getMessage() - ); + } catch (Exception e) { + String message = String.format("Failed to complete some tasks within writer executor service: %s", + ExceptionUtils.getMessage(e)); + LOG.error(message, e); + exception = new PxfRuntimeException(message, e); } - } catch (Exception e) { - // This exception must have been caused by some thread execution error. However, there may be other exception (maybe of class SQLException) that happened in one of threads that were not examined yet. That is why we do not modify firstException - if (LOG.isDebugEnabled()) { - LOG.debug( - "A runtime exception in a thread pool occurred: " + e.getClass() + " " + e.getMessage() - ); + if (exception != null) { + throw exception; } } } - try { - executorServiceWrite.shutdown(); - executorServiceWrite.shutdownNow(); - } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("executorServiceWrite.shutdown() or .shutdownNow() threw an exception: " + e.getClass() + " " + e.getMessage()); - } - } - if (firstException != null) { - throw firstException; - } - } - // Send data that is left - SQLException e = writerCallable.call(); - if (e != null) { - throw e; + // Send data that is left + SQLException e = writerCallable.call(); + if (e != null) { + throw e; + } + } else { + throw firstException; } } finally { closeStatementAndConnection(statementWrite); + if (Objects.nonNull(executorServiceWrite)) { + shutdownExecutorService(); + } } } + /** + * Cancel write operation. + */ + @Override + public void cancelWrite() throws SQLException { + LOG.debug("Accessor starts cancelWrite()"); + closeStatementAndConnection(statementWrite); + if (poolSize > 1) { + LOG.debug("Number of tasks to be canceled: {}", poolTasks.size()); + poolTasks.forEach(task -> task.cancel(true)); + LOG.debug("Shutdown writer executor service"); + shutdownExecutorService(); + } + } /** * Gets the text of the query by reading the file from the server configuration directory. The name of the file @@ -382,4 +402,12 @@ private String getQueryText() { private boolean parseJdbcUsePreparedStatementProperty() { return Utilities.parseBooleanProperty(configuration, JDBC_READ_PREPARED_STATEMENT_PROPERTY_NAME, false); } + + private void shutdownExecutorService() { + try { + executorServiceWrite.shutdownNow(); + } catch (Exception e) { + LOG.debug("Failed to shutdown writer executor service: {}", ExceptionUtils.getMessage(e)); + } + } } diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java index a355844508..6f931fa91b 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/writercallable/BatchWriterCallable.java @@ -71,6 +71,8 @@ public SQLException call() throws IOException, SQLException, ClassNotFoundExcept try { statement.executeBatch(); + if (Thread.interrupted()) + throw new SQLException("Writer was interrupted by timeout or by request"); } catch (BatchUpdateException bue) { SQLException cause = bue.getNextException(); diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java index 0632bd351a..13ce40c81b 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/writercallable/SimpleWriterCallable.java @@ -65,6 +65,8 @@ public SQLException call() throws IOException, SQLException, ClassNotFoundExcept try { statement.executeUpdate(); + if (Thread.interrupted()) + throw new SQLException("Writer was interrupted by timeout or by request"); } catch (SQLException e) { return e; } finally { diff --git a/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3SelectAccessor.java b/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3SelectAccessor.java index e5b79be27d..70f0a06345 100644 --- a/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3SelectAccessor.java +++ b/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3SelectAccessor.java @@ -145,6 +145,14 @@ public void closeForRead() throws IOException { } } + /** + * Cancel read operation. + */ + @Override + public void cancelRead() { + throw new UnsupportedOperationException("Cancel read operation is not supported"); + } + /** * Generates the {@link SelectObjectContentRequest} object from * the request context. @@ -330,4 +338,9 @@ public boolean writeNextObject(OneRow onerow) { public void closeForWrite() { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); } + + @Override + public void cancelWrite() { + throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); + } } diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/Bridge.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/Bridge.java index c6ae2ad108..5703e5b0ab 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/Bridge.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/Bridge.java @@ -59,4 +59,10 @@ public interface Bridge { * @throws Exception when an error occurs during the operation */ void endIteration() throws Exception; + + /** + * Cancel iteration for data access. Immediately cancel and close the connection to the external system. + * @throws Exception when an error occurs during the operation + */ + void cancelIteration() throws Exception; } diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java index d6ce412f7d..e639a5a99b 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java @@ -146,6 +146,19 @@ protected boolean isDataException(IOException ex) { || ex instanceof UTFDataFormatException || ex instanceof ZipException); } + /** + * {@inheritDoc} + */ + @Override + public void cancelIteration() throws Exception { + try { + accessor.cancelRead(); + } catch (Exception e) { + LOG.error("Failed to cancel read bridge iteration: {}", e.getMessage()); + throw e; + } + } + /** * {@inheritDoc} */ @@ -153,5 +166,4 @@ protected boolean isDataException(IOException ex) { public boolean setNext(DataInputStream inputStream) { throw new UnsupportedOperationException("Write operation is not supported."); } - } diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java index 92feeed7d8..ab2c2f8653 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java @@ -109,6 +109,19 @@ public void endIteration() throws Exception { } } + /** + * {@inheritDoc} + */ + @Override + public void cancelIteration() throws Exception { + try { + accessor.cancelWrite(); + } catch (Exception e) { + LOG.error("Failed to cancel write bridge iteration: {}", e.getMessage()); + throw e; + } + } + /** * {@inheritDoc} */ @@ -116,5 +129,4 @@ public void endIteration() throws Exception { public Writable getNext() { throw new UnsupportedOperationException("Current operation is not supported"); } - } diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/BaseBridgeTest.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/BaseBridgeTest.java index fab02dae0c..4b10d16d3b 100644 --- a/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/BaseBridgeTest.java +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/BaseBridgeTest.java @@ -84,6 +84,10 @@ public boolean setNext(DataInputStream inputStream) { public void endIteration() { } + @Override + public void cancelIteration() { + } + Accessor getAccessor() { return accessor; } From b7c34d686a731e0e3c7b6b8f3769b28f01ec8fc3 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Wed, 6 Mar 2024 01:25:00 +0200 Subject: [PATCH 15/24] ADBDEV-4987: Implement queries termination --- .../pxf/service/controller/ReadService.java | 8 +++ .../service/controller/ReadServiceImpl.java | 59 ++++++++----------- .../service/controller/RequestIdentifier.java | 29 +++++++++ .../pxf/service/controller/WriteService.java | 17 ++++++ .../service/controller/WriteServiceImpl.java | 48 ++++++++++++++- .../profile/ProfileReloadServiceImpl.java | 25 ++++++-- .../pxf/service/bridge/TestAccessor.java | 7 +++ .../controller/ReadServiceImplTest.java | 2 +- .../pxf/service/rest/PxfResourceIT.java | 4 ++ 9 files changed, 157 insertions(+), 42 deletions(-) create mode 100644 server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/RequestIdentifier.java diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadService.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadService.java index cdf60c66ec..4f917d2067 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadService.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadService.java @@ -26,4 +26,12 @@ public interface ReadService { * @return if read request cancellation succeeded */ boolean cancelRead(RequestContext context); + + /** + * Tries to cancel active read requests to the external system with specified profile and server. + * + * @param profile the name of the profile defined in the external table. For example, jdbc, hdfs, s3, etc... + * @param server the named server configuration that PXF uses to access the data. PXF uses the default server if not specified. + */ + void cancelReadExecutions(String profile, String server); } diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadServiceImpl.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadServiceImpl.java index b511488270..893952d144 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadServiceImpl.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/ReadServiceImpl.java @@ -1,10 +1,6 @@ package org.greenplum.pxf.service.controller; import com.google.common.io.CountingOutputStream; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.greenplum.pxf.api.io.Writable; @@ -27,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; /** * Implementation of the ReadService. @@ -66,7 +63,24 @@ public void readData(RequestContext context, OutputStream outputStream) { @Override public boolean cancelRead(RequestContext context) { - return cancelExecution(context); + RequestIdentifier requestIdentifier = new RequestIdentifier(context); + Bridge bridge = readExecutionMap.remove(requestIdentifier); + return cancelExecution(requestIdentifier, bridge); + } + + @Override + public void cancelReadExecutions(String profile, String server) { + Predicate identifierFilter = getIdentifierFilter(profile, server); + readExecutionMap.forEach((requestIdentifier, bridge) -> { + if (identifierFilter.test(requestIdentifier)) { + cancelExecution(requestIdentifier, bridge); + } + }); + } + + private Predicate getIdentifierFilter(String profile, String server) { + return key -> (StringUtils.isBlank(profile) || key.getProfile().equals(profile)) + && (StringUtils.isBlank(server) || key.getServer().equals(server)); } /** @@ -139,43 +153,30 @@ private OperationResult writeStream(RequestContext context, OutputStream outputS } private void registerExecution(RequestContext context, Bridge readBridge) { - RequestIdentifier requestIdentifier = getRequestIdentifier(context); + RequestIdentifier requestIdentifier = new RequestIdentifier(context); readExecutionMap.put(requestIdentifier, readBridge); } private void removeExecution(RequestContext context) { - RequestIdentifier requestIdentifier = getRequestIdentifier(context); + RequestIdentifier requestIdentifier = new RequestIdentifier(context); readExecutionMap.remove(requestIdentifier); } - private boolean cancelExecution(RequestContext context) { - RequestIdentifier requestIdentifier = getRequestIdentifier(context); - Bridge bridge = readExecutionMap.remove(requestIdentifier); + private boolean cancelExecution(RequestIdentifier requestIdentifier, Bridge bridge) { if (bridge == null) { log.debug("Couldn't cancel read request, request {} not found", requestIdentifier); return false; } try { log.debug("Cancelling read request {}", requestIdentifier); - bridge.endIteration(); + bridge.cancelIteration(); } catch (Exception e) { - log.warn("Ignoring error encountered during bridge.endIteration()", e); + log.warn("Ignoring error encountered during bridge.cancelIteration()", e); return false; } return true; } - private RequestIdentifier getRequestIdentifier(RequestContext context) { - return new RequestIdentifier( - context.getTransactionId(), - context.getSegmentId(), - context.getSchemaName(), - context.getTableName(), - context.getClientPort() - ); - } - - /** * Processes a single fragment identified in the RequestContext and updates query statistics. * @@ -258,16 +259,4 @@ private void updateProfile(RequestContext context, String profile) { context.setProfileScheme(profileProtocol); } } - - @RequiredArgsConstructor - @Getter - @EqualsAndHashCode - @ToString - private static class RequestIdentifier { - private final String transactionId; - private final int segmentId; - private final String schemaName; - private final String tableName; - private final int remotePort; - } } diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/RequestIdentifier.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/RequestIdentifier.java new file mode 100644 index 0000000000..675370f80d --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/RequestIdentifier.java @@ -0,0 +1,29 @@ +package org.greenplum.pxf.service.controller; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.greenplum.pxf.api.model.RequestContext; + +@Getter +@EqualsAndHashCode +@ToString +public class RequestIdentifier { + private final String transactionId; + private final int segmentId; + private final String schemaName; + private final String tableName; + private final int remotePort; + private final String profile; + private final String server; + + public RequestIdentifier(RequestContext context) { + this.transactionId = context.getTransactionId(); + this.segmentId = context.getSegmentId(); + this.schemaName = context.getSchemaName(); + this.tableName = context.getTableName(); + this.remotePort = context.getClientPort(); + this.profile = context.getProfile(); + this.server = context.getServerName(); + } +} diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/WriteService.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/WriteService.java index 828eb0b761..a7ab06b7d8 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/WriteService.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/WriteService.java @@ -19,4 +19,21 @@ public interface WriteService { * @throws Exception if any error happened during processing */ String writeData(RequestContext context, InputStream inputStream) throws Exception; + + /** + * Tries to cancel active write request to the external system specified by the RequestContext. + * Returns true if write request was found and cancelled, false otherwise + * + * @param context request context + * @return if write request cancellation succeeded + */ + boolean cancelWrite(RequestContext context); + + /** + * Tries to cancel active write requests to the external system with specified profile and server. + * + * @param profile the name of the profile defined in the external table. For example, jdbc, hdfs, s3, etc... + * @param server the named server configuration that PXF uses to access the data. PXF uses the default server if not specified. + */ + void cancelWriteExecutions(String profile, String server); } diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/WriteServiceImpl.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/WriteServiceImpl.java index 7b686e1281..3b27896d79 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/WriteServiceImpl.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/WriteServiceImpl.java @@ -2,6 +2,7 @@ import com.google.common.io.CountingInputStream; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; import org.greenplum.pxf.api.model.ConfigurationFactory; import org.greenplum.pxf.api.model.RequestContext; import org.greenplum.pxf.api.utilities.Utilities; @@ -13,6 +14,9 @@ import java.io.DataInputStream; import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; /** * Implementation of the WriteService. @@ -21,6 +25,8 @@ @Slf4j public class WriteServiceImpl extends BaseServiceImpl implements WriteService { + private final Map writeExecutionMap = new ConcurrentHashMap<>(); + /** * Creates a new instance. * @@ -46,6 +52,28 @@ public String writeData(RequestContext context, InputStream inputStream) throws return returnMsg; } + @Override + public boolean cancelWrite(RequestContext context) { + RequestIdentifier requestIdentifier = new RequestIdentifier(context); + Bridge bridge = writeExecutionMap.remove(requestIdentifier); + return cancelExecution(requestIdentifier, bridge); + } + + @Override + public void cancelWriteExecutions(String profile, String server) { + Predicate identifierFilter = getIdentifierFilter(profile, server); + writeExecutionMap.forEach((requestIdentifier, bridge) -> { + if (identifierFilter.test(requestIdentifier)) { + cancelExecution(requestIdentifier, bridge); + } + }); + } + + private Predicate getIdentifierFilter(String profile, String server) { + return key -> (StringUtils.isBlank(profile) || key.getProfile().equals(profile)) + && (StringUtils.isBlank(server) || key.getServer().equals(server)); + } + /** * Reads the input stream, iteratively submits data from the stream to created bridge. * @@ -59,6 +87,9 @@ private OperationResult readStream(RequestContext context, InputStream inputStre OperationStats operationStats = new OperationStats(OperationStats.Operation.WRITE, metricsReporter, context); OperationResult operationResult = new OperationResult(); + RequestIdentifier requestIdentifier = new RequestIdentifier(context); + writeExecutionMap.put(requestIdentifier, bridge); + // dataStream (and inputStream as the result) will close automatically at the end of the try block CountingInputStream countingInputStream = new CountingInputStream(inputStream); try (DataInputStream dataStream = new DataInputStream(countingInputStream)) { @@ -77,7 +108,7 @@ private OperationResult readStream(RequestContext context, InputStream inputStre operationResult.setException(e); } } - + writeExecutionMap.remove(requestIdentifier); // in the case where we fail to report a record due to an exception, // report the number of bytes that we were able to read before failure operationStats.setByteCount(countingInputStream.getCount()); @@ -87,4 +118,19 @@ private OperationResult readStream(RequestContext context, InputStream inputStre return operationResult; } + + private boolean cancelExecution(RequestIdentifier requestIdentifier, Bridge bridge) { + if (bridge == null) { + log.debug("Couldn't cancel write request, request {} not found", requestIdentifier); + return false; + } + try { + log.debug("Cancelling write request {}", requestIdentifier); + bridge.cancelIteration(); + } catch (Exception e) { + log.warn("Ignoring error encountered during bridge.cancelIteration()", e); + return false; + } + return true; + } } diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java index b5ca05d2aa..c8404aedc8 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java @@ -4,6 +4,8 @@ import org.apache.commons.lang.StringUtils; import org.greenplum.pxf.api.error.PxfRuntimeException; import org.greenplum.pxf.api.model.Reloader; +import org.greenplum.pxf.service.controller.ReadService; +import org.greenplum.pxf.service.controller.WriteService; import org.greenplum.pxf.service.rest.dto.ProfileReloadRequestDto; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; @@ -15,9 +17,15 @@ @Slf4j @Component public class ProfileReloadServiceImpl implements ProfileReloadService { + private final ReadService readService; + private final WriteService writeService; private final Map profileReloaderMap; - public ProfileReloadServiceImpl(@Lazy Map profileReloaderMap) { + public ProfileReloadServiceImpl(ReadService readService, + WriteService writeService, + @Lazy Map profileReloaderMap) { + this.readService = readService; + this.writeService = writeService; this.profileReloaderMap = profileReloaderMap; } @@ -40,9 +48,10 @@ private void reloadAll(String profile) { Predicate profilePredicate = key -> (StringUtils.isBlank(profile) || key.equals(profile)); profileReloaderMap.forEach((profileName, reloader) -> { if (profilePredicate.test(profileName)) { - // TODO: Terminate all active queries before reload (ADBDEV-4987) + cancelQueryExecutions(profileName, null); + log.info("Canceled running queries with profile '{}' for all servers", profileName); reloader.reloadAll(); - log.info("Reload profile '{}' for all servers with reloader {}", profileName, reloader); + log.info("Reloaded profile '{}' for all servers with reloader {}", profileName, reloader); } else { String message = String.format( "Profile '%s' doesn't support reloading methods. Skipping reloading for all servers. " + @@ -56,9 +65,10 @@ private void reloadAll(String profile) { private void reload(String profile, String server) { Reloader reloader = profileReloaderMap.get(profile); if (Objects.nonNull(reloader)) { - // TODO: Terminate all active queries before reload (ADBDEV-4987) + cancelQueryExecutions(profile, server); + log.info("Canceled running queries with profile '{}' and server '{}'", profile, server); reloader.reload(server); - log.info("Reload profile '{}' for server '{}' with reloader {}", profile, server, reloader); + log.info("Reloaded profile '{}' for server '{}' with reloader {}", profile, server, reloader); } else { String message = String.format( "Profile '%s' doesn't support reloading methods. Skipping reloading for server '%s'. " + @@ -67,4 +77,9 @@ private void reload(String profile, String server) { throw new PxfRuntimeException(message); } } + + private void cancelQueryExecutions(String profile, String server) { + readService.cancelReadExecutions(profile, server); + writeService.cancelWriteExecutions(profile, server); + } } diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/TestAccessor.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/TestAccessor.java index 52c320eae1..11c7361edd 100644 --- a/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/TestAccessor.java +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/TestAccessor.java @@ -23,7 +23,10 @@ public OneRow readNextObject() { @Override public void closeForRead() { + } + @Override + public void cancelRead() { } @Override @@ -40,6 +43,10 @@ public boolean writeNextObject(OneRow onerow) { public void closeForWrite() { } + @Override + public void cancelWrite() { + } + @Override public void setRequestContext(RequestContext context) { } diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/controller/ReadServiceImplTest.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/controller/ReadServiceImplTest.java index b91043c73e..7a14cb88fe 100644 --- a/server/pxf-service/src/test/java/org/greenplum/pxf/service/controller/ReadServiceImplTest.java +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/controller/ReadServiceImplTest.java @@ -280,7 +280,7 @@ public void testReadWithCancel() throws Exception { inOrder.verify(mockOutputStream).write("hello".getBytes(StandardCharsets.UTF_8), 0, 5); inOrder.verify(mockMetricReporter).reportCounter(MetricsReporter.PxfMetric.RECORDS_SENT, 1, mockContext); inOrder.verify(mockMetricReporter).reportCounter(MetricsReporter.PxfMetric.BYTES_SENT, 5, mockContext); - inOrder.verify(mockBridge1).endIteration(); + inOrder.verify(mockBridge1).cancelIteration(); inOrder.verifyNoMoreInteractions(); } diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/rest/PxfResourceIT.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/rest/PxfResourceIT.java index bddd4742a0..d18a7666f2 100644 --- a/server/pxf-service/src/test/java/org/greenplum/pxf/service/rest/PxfResourceIT.java +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/rest/PxfResourceIT.java @@ -134,6 +134,10 @@ public void readData(RequestContext context, OutputStream out) { public boolean cancelRead(RequestContext context) { return true; } + + @Override + public void cancelReadExecutions(String profile,String server) { + } }; } } From a3d1d3c8d661b45016dc6c641948452b051edbe7 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Thu, 7 Mar 2024 20:52:16 +0200 Subject: [PATCH 16/24] ADBDEV-4979: Implement pxf profile reload CLI --- cli/cmd/cluster.go | 10 +++++++++- cli/cmd/pxf.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/cli/cmd/cluster.go b/cli/cmd/cluster.go index bb56384d07..c2ab14908a 100644 --- a/cli/cmd/cluster.go +++ b/cli/cmd/cluster.go @@ -49,8 +49,12 @@ var ( restartCmd = createCobraCommand("restart", "Restart the PXF server on coordinator, standby coordinator, and all segment hosts", &RestartCommand) prepareCmd = createCobraCommand("prepare", "Prepares a new base directory specified by the $PXF_BASE environment variable", &PrepareCommand) migrateCmd = createCobraCommand("migrate", "Migrates configurations from older installations of PXF", &MigrateCommand) + reloadCmd = createCobraCommand("reload", "Reload the PXF caches on coordinator, standby coordinator, and all segment hosts for profiles and terminate all related queries.", &ReloadCommand) // DeleteOnSync is a boolean for determining whether to use rsync with --delete, exported for tests - DeleteOnSync bool + DeleteOnSync bool + ReloadProfileName string + ReloadServerName string + ReloadAutoConfirm bool ) func init() { @@ -66,6 +70,10 @@ func init() { clusterCmd.AddCommand(restartCmd) clusterCmd.AddCommand(prepareCmd) clusterCmd.AddCommand(migrateCmd) + clusterCmd.AddCommand(reloadCmd) + reloadCmd.Flags().StringVarP(&ReloadProfileName, "profile", "p", "", "profile that PXF uses to access the data") + reloadCmd.Flags().StringVarP(&ReloadServerName, "server", "s", "", "the name of a directory residing in $PXF_BASE/servers/") + reloadCmd.Flags().BoolVarP(&ReloadAutoConfirm, "auto", "a", false, "auto confirm the action of reload") } func exitWithReturnCode(err error) { diff --git a/cli/cmd/pxf.go b/cli/cmd/pxf.go index e7850ed7a0..d2796229c8 100644 --- a/cli/cmd/pxf.go +++ b/cli/cmd/pxf.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/greenplum-db/gp-common-go-libs/cluster" + "github.com/greenplum-db/gp-common-go-libs/gplog" ) type envVar string @@ -20,6 +21,9 @@ const ( javaHome envVar = "JAVA_HOME" // For pxf migrate pxfConf envVar = "PXF_CONF" + // For pxf profile reload + pxfHost envVar = "PXF_HOST" + pxfPort envVar = "PXF_PORT" ) type messageType int @@ -70,6 +74,35 @@ func (cmd *command) GetFunctionToExecute() (func(string) string, error) { hostname, inputs[pxfBase]) }, nil + case reload: + pxfDefaultHost := "localhost" + pxfDefaultPort := "5888" + var pxfHostStr string + var pxfPortStr string + reloadCommandTemplate := "curl --silent --fail --show-error --request POST http://%s:%s/pxf/reload --header \"Content-Type: application/json\" --data '{\"profile\":\"%s\",\"server\":\"%s\"}'" + + // Set pxf host + pxfHostStr, isPxfHostSet := os.LookupEnv(string(pxfHost)) + if !isPxfHostSet { + pxfHostStr = pxfDefaultHost + } + + // Set pxf port + pxfPortStr, isPxfPortSet := os.LookupEnv(string(pxfPort)) + if !isPxfPortSet { + pxfPortStr = pxfDefaultPort + } + + reloadCommand := fmt.Sprintf(reloadCommandTemplate, pxfHostStr, pxfPortStr, ReloadProfileName, ReloadServerName) + if !ReloadAutoConfirm { + cmd.warn = true + err := cmd.Warn(os.Stdin) + if err != nil { + return nil, err + } + } + gplog.Info(fmt.Sprintf("Execute command: %s", reloadCommand)) + return func(_ string) string { return reloadCommand }, nil default: var effectivePxfBase string @@ -127,6 +160,7 @@ const ( restart = "restart" prepare = "prepare" migrate = "migrate" + reload = "reload" ) // The pxf cli commands, exported for testing @@ -190,6 +224,19 @@ var ( // since the files are already on coordinator, we exclude coordinator but include standby coordinator whereToRun: cluster.ON_LOCAL | cluster.ON_HOSTS | cluster.EXCLUDE_MASTER | cluster.INCLUDE_MIRRORS, } + ReloadCommand = command{ + name: reload, + messages: map[messageType]string{ + success: "PXF successfully reloaded profiles on %d out of %d host%s\n", + status: "PXF is reloading profiles on coordinator host%s and %d segment host%s...\n", + standby: " standby coordinator host and", + err: "PXF failed to reload profile on %d out of %d host%s. Check the PXF logs located in the '$PXF_BASE/logs' directory\n", + warning: "Do you really want to reload profile(s) and terminate all related queries? Yy|Nn (default=N):", + }, + warn: false, + envVars: []envVar{pxfBase}, + whereToRun: cluster.ON_REMOTE | cluster.ON_HOSTS | cluster.INCLUDE_COORDINATOR | cluster.INCLUDE_MIRRORS, + } StatusCommand = command{ name: statuses, messages: map[messageType]string{ From c19114c4fcd741496aa6f78235d5f3a95b2bb220 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Thu, 7 Mar 2024 20:52:48 +0200 Subject: [PATCH 17/24] ADBDEV-4979: Fix make cli in the test environment --- automation/arenadata/scripts/compile_pxf_without_test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/automation/arenadata/scripts/compile_pxf_without_test.sh b/automation/arenadata/scripts/compile_pxf_without_test.sh index c14c5a8d42..cc36fb285f 100644 --- a/automation/arenadata/scripts/compile_pxf_without_test.sh +++ b/automation/arenadata/scripts/compile_pxf_without_test.sh @@ -8,7 +8,7 @@ GPHOME=/usr/local/greenplum-db-devel bash --login -c " export PXF_HOME=${GPHOME}/pxf make -C '${PWD}/pxf_src/external-table' install - make -C '${PWD}/pxf_src/cli/go/src/pxf-cli' install + make -C '${PWD}/pxf_src/cli' install make -C '${PWD}/pxf_src/server' install-server " From 22d18da7ba35090ae67e2687f78e54aa654ec556 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Mon, 11 Mar 2024 16:08:33 +0200 Subject: [PATCH 18/24] ADBDEV-4987: Add flag to cancel read or write request --- .../diagnostic/UserDataVerifyAccessor.java | 1 + .../pxf/plugins/jdbc/JdbcAccessor.java | 22 +++++++++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java b/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java index 31bdc4deba..685eeda124 100644 --- a/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java +++ b/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java @@ -64,6 +64,7 @@ public void closeForRead() { @Override public void cancelRead() { + throw new UnsupportedOperationException("Cancel read operation is not supported"); } @Override diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java index a596ef1d56..627117b6c8 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java @@ -72,6 +72,7 @@ public class JdbcAccessor extends JdbcBasePlugin implements Accessor { private ExecutorService executorServiceWrite = null; private ConcurrentLinkedQueue> poolTasks = null; private Exception firstException = null; + private boolean isCanceled; /** * Creates a new instance of the JdbcAccessor @@ -168,6 +169,12 @@ private boolean openForReadInner(Connection connection) throws SQLException { */ @Override public OneRow readNextObject() throws SQLException { + if (isCanceled) { + String message = "The read operation was canceled"; + LOG.warn(message); + throw new PxfRuntimeException(message); + } + if (resultSetRead.next()) { return new OneRow(resultSetRead); } @@ -184,12 +191,12 @@ public void closeForRead() throws SQLException { /** * Cancel read operation. - * - * @throws SQLException if the cancel operation failed */ @Override - public void cancelRead() throws SQLException { - closeForRead(); + public void cancelRead() { + // We don't need to close statement and connection here because closeForRead() will be invoked anyway + // Sometimes resultSetRead.next() has unpredictable behaviour while closing statement and connection + isCanceled = true; } /** @@ -272,6 +279,12 @@ public boolean writeNextObject(OneRow row) throws Exception { if (writerCallable == null) { throw new IllegalStateException("The JDBC connection was not properly initialized (writerCallable is null)"); } + if (isCanceled) { + String message = "The write operation was canceled"; + LOG.warn(message); + firstException = new PxfRuntimeException(message); + throw firstException; + } writerCallable.supply(row); if (writerCallable.isCallRequired()) { @@ -353,6 +366,7 @@ public void closeForWrite() throws Exception { @Override public void cancelWrite() throws SQLException { LOG.debug("Accessor starts cancelWrite()"); + isCanceled = true; closeStatementAndConnection(statementWrite); if (poolSize > 1) { LOG.debug("Number of tasks to be canceled: {}", poolTasks.size()); From 99b6ed857437c666d7ed57d6db9a2cf4d9bc1b56 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Mon, 11 Mar 2024 18:55:41 +0200 Subject: [PATCH 19/24] ADBDEV-5095: Add spring security to protect endpoints --- server/pxf-service/build.gradle | 1 + .../pxf/service/security/SecurityConfig.java | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+) create mode 100644 server/pxf-service/src/main/java/org/greenplum/pxf/service/security/SecurityConfig.java diff --git a/server/pxf-service/build.gradle b/server/pxf-service/build.gradle index db1dfda931..aded450a36 100644 --- a/server/pxf-service/build.gradle +++ b/server/pxf-service/build.gradle @@ -36,6 +36,7 @@ dependencies { implementation("commons-collections:commons-collections") implementation("commons-lang:commons-lang") implementation("org.springframework.boot:spring-boot-starter-log4j2") + implementation("org.springframework.boot:spring-boot-starter-security") implementation("org.apache.logging.log4j:log4j-spring-boot") implementation('org.springframework.boot:spring-boot-starter-actuator') implementation('io.micrometer:micrometer-registry-prometheus') diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/security/SecurityConfig.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/security/SecurityConfig.java new file mode 100644 index 0000000000..766688d289 --- /dev/null +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/security/SecurityConfig.java @@ -0,0 +1,25 @@ +package org.greenplum.pxf.service.security; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.security.config.annotation.web.builders.HttpSecurity; +import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; +import org.springframework.security.web.SecurityFilterChain; + +@Configuration +@EnableWebSecurity +public class SecurityConfig { + + private static final String LOCALHOST_IP_ADDRESS = "127.0.0.1"; + + @Bean + public SecurityFilterChain filterChain(HttpSecurity http) throws Exception { + + http + .csrf().disable() + .authorizeRequests() + .antMatchers("/pxf/reload").hasIpAddress(LOCALHOST_IP_ADDRESS) + .antMatchers("/**").permitAll(); + return http.build(); + } +} From 089dd9d4956693425f6615004fea36872e464583 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Tue, 12 Mar 2024 11:44:20 +0200 Subject: [PATCH 20/24] ADBDEV-4987: Add CancelableOperation interface to support canceling read and write operations --- .../pxf/api/examples/DemoAccessor.java | 18 ------------------ .../org/greenplum/pxf/api/model/Accessor.java | 14 -------------- .../pxf/api/model/CancelableOperation.java | 17 +++++++++++++++++ .../pxf/api/utilities/UtilitiesTest.java | 15 --------------- .../pxf/diagnostic/UserDataVerifyAccessor.java | 10 ---------- .../pxf/plugins/hbase/HBaseAccessor.java | 16 ---------------- .../pxf/plugins/hdfs/AvroFileAccessor.java | 8 -------- .../plugins/hdfs/HdfsAtomicDataAccessor.java | 8 -------- .../hdfs/HdfsSplittableDataAccessor.java | 8 -------- .../pxf/plugins/hdfs/LineBreakAccessor.java | 8 -------- .../pxf/plugins/hdfs/ParquetFileAccessor.java | 16 ---------------- .../plugins/hdfs/QuotedLineBreakAccessor.java | 8 -------- .../pxf/plugins/hdfs/SequenceFileAccessor.java | 8 -------- .../hdfs/orc/ORCVectorizedAccessor.java | 16 ---------------- .../hdfs/HdfsSplittableDataAccessorTest.java | 4 ---- .../pxf/plugins/hive/HiveAccessor.java | 8 -------- .../pxf/plugins/jdbc/JdbcAccessor.java | 3 ++- .../pxf/plugins/s3/S3SelectAccessor.java | 13 ------------- .../pxf/service/bridge/ReadBridge.java | 7 ++++++- .../pxf/service/bridge/WriteBridge.java | 7 ++++++- .../pxf/service/bridge/TestAccessor.java | 7 ------- 21 files changed, 31 insertions(+), 188 deletions(-) create mode 100644 server/pxf-api/src/main/java/org/greenplum/pxf/api/model/CancelableOperation.java diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/examples/DemoAccessor.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/examples/DemoAccessor.java index 08a03677b6..b6c35b8a01 100644 --- a/server/pxf-api/src/main/java/org/greenplum/pxf/api/examples/DemoAccessor.java +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/examples/DemoAccessor.java @@ -84,14 +84,6 @@ public void closeForRead() throws Exception { /* Demo close doesn't do anything */ } - /** - * cancel read operation. no action here - */ - @Override - public void cancelRead() throws Exception { - /* Demo cancel doesn't do anything */ - } - /** * Opens the resource for write. * @@ -124,14 +116,4 @@ public boolean writeNextObject(OneRow onerow) throws Exception { public void closeForWrite() throws Exception { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); } - - /** - * Cancel write operation. - * - * @throws Exception if the cancel operation failed - */ - @Override - public void cancelWrite() throws Exception { - throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); - } } diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Accessor.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Accessor.java index 64675f65dd..733c6e9fb9 100644 --- a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Accessor.java +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Accessor.java @@ -49,13 +49,6 @@ public interface Accessor extends Plugin { */ void closeForRead() throws Exception; - /** - * Cancel read operation. Might contain additional logic comparing with {@link Accessor#closeForRead()} - * - * @throws Exception if the cancel operation failed - */ - void cancelRead() throws Exception; - /** * Opens the resource for write. * @@ -79,11 +72,4 @@ public interface Accessor extends Plugin { * @throws Exception if closing the resource failed */ void closeForWrite() throws Exception; - - /** - * Cancel write operation. Might contain additional logic comparing with {@link Accessor#closeForWrite()} - * - * @throws Exception if the cancel the operation failed - */ - void cancelWrite() throws Exception; } diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/CancelableOperation.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/CancelableOperation.java new file mode 100644 index 0000000000..cd21ef1d35 --- /dev/null +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/CancelableOperation.java @@ -0,0 +1,17 @@ +package org.greenplum.pxf.api.model; + +public interface CancelableOperation { + /** + * Cancel read operation. Might contain additional logic comparing with {@link Accessor#closeForRead()} + * + * @throws Exception if the cancel operation failed + */ + void cancelRead() throws Exception; + + /** + * Cancel write operation. Might contain additional logic comparing with {@link Accessor#closeForWrite()} + * + * @throws Exception if the cancel the operation failed + */ + void cancelWrite() throws Exception; +} diff --git a/server/pxf-api/src/test/java/org/greenplum/pxf/api/utilities/UtilitiesTest.java b/server/pxf-api/src/test/java/org/greenplum/pxf/api/utilities/UtilitiesTest.java index b8721c9448..15bf89f349 100644 --- a/server/pxf-api/src/test/java/org/greenplum/pxf/api/utilities/UtilitiesTest.java +++ b/server/pxf-api/src/test/java/org/greenplum/pxf/api/utilities/UtilitiesTest.java @@ -62,10 +62,6 @@ public OneRow readNextObject() { public void closeForRead() { } - @Override - public void cancelRead() { - } - @Override public boolean openForWrite() { return false; @@ -78,10 +74,7 @@ public boolean writeNextObject(OneRow onerow) { @Override public void closeForWrite() { - } - @Override - public void cancelWrite() { } @Override @@ -118,10 +111,6 @@ public OneRow readNextObject() { public void closeForRead() { } - @Override - public void cancelRead() { - } - @Override public boolean openForWrite() { return false; @@ -136,10 +125,6 @@ public boolean writeNextObject(OneRow onerow) { public void closeForWrite() { } - @Override - public void cancelWrite() { - } - @Override public void setRequestContext(RequestContext context) { } diff --git a/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java b/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java index 685eeda124..48aa6da3cc 100644 --- a/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java +++ b/server/pxf-diagnostic/src/main/java/org/greenplum/pxf/diagnostic/UserDataVerifyAccessor.java @@ -62,11 +62,6 @@ public OneRow readNextObject() { public void closeForRead() { } - @Override - public void cancelRead() { - throw new UnsupportedOperationException("Cancel read operation is not supported"); - } - @Override public boolean openForWrite() { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); @@ -81,9 +76,4 @@ public boolean writeNextObject(OneRow onerow) { public void closeForWrite() { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); } - - @Override - public void cancelWrite() { - throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); - } } diff --git a/server/pxf-hbase/src/main/java/org/greenplum/pxf/plugins/hbase/HBaseAccessor.java b/server/pxf-hbase/src/main/java/org/greenplum/pxf/plugins/hbase/HBaseAccessor.java index 912fa62a5f..5eef76a69f 100644 --- a/server/pxf-hbase/src/main/java/org/greenplum/pxf/plugins/hbase/HBaseAccessor.java +++ b/server/pxf-hbase/src/main/java/org/greenplum/pxf/plugins/hbase/HBaseAccessor.java @@ -146,14 +146,6 @@ public void closeForRead() throws Exception { HBaseUtilities.closeConnection(null, connection); } - /** - * Cancel read operation. - */ - @Override - public void cancelRead() { - throw new UnsupportedOperationException("Cancel read operation is not supported"); - } - /** * Opens the resource for write. * @@ -183,14 +175,6 @@ public void closeForWrite() { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); } - /** - * Cancel write operation. - */ - @Override - public void cancelWrite() { - throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); - } - /** * Returns the next row in the HBase table, null if end of fragment. */ diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/AvroFileAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/AvroFileAccessor.java index 35dccc0005..99bcf8d25f 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/AvroFileAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/AvroFileAccessor.java @@ -218,14 +218,6 @@ public void closeForWrite() throws Exception { context.getServerName()); } - /** - * Cancel write operation. - */ - @Override - public void cancelWrite() { - throw new UnsupportedOperationException("Cancel write operation is not supported"); - } - /** * Closes the resource for write. * diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java index c164db88f2..6ae1bea85d 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java @@ -105,14 +105,6 @@ public void closeForRead() throws Exception { } } - /** - * Cancel read operation. - */ - @Override - public void cancelRead() { - throw new UnsupportedOperationException("Cancel read operation is not supported"); - } - /* * Making sure that only the segment that got assigned the first data * fragment will read the (whole) file. diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java index c0ec3e0873..b0a9657a9f 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java @@ -142,14 +142,6 @@ public void closeForRead() throws Exception { } } - /** - * Cancel read operation. - */ - @Override - public void cancelRead() { - throw new UnsupportedOperationException("Cancel read operation is not supported"); - } - /** * Helper routine to get compression codec by class name or alias. * diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java index b06f00664b..a6dc865874 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java @@ -195,14 +195,6 @@ public void closeForWrite() throws IOException { } } - /** - * Cancel write operation. - */ - @Override - public void cancelWrite() { - throw new UnsupportedOperationException("Cancel write operation is not supported"); - } - /* * Creates output stream from given file. If compression codec is provided, * wrap it around stream. diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java index 566c424dc7..f5c529f2a2 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java @@ -210,14 +210,6 @@ public void closeForRead() throws IOException { } } - /** - * Cancel read operation. - */ - @Override - public void cancelRead() throws IOException { - throw new UnsupportedOperationException("Cancel read operation is not supported"); - } - /** * Opens the resource for write. * Uses compression codec based on user input which @@ -302,14 +294,6 @@ public void closeForWrite() throws IOException, InterruptedException { context.getServerName()); } - /** - * Cancel write operation. - */ - @Override - public void cancelWrite() { - throw new UnsupportedOperationException("Cancel write operation is not supported"); - } - /** * Returns the parquet record filter for the given filter string * diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/QuotedLineBreakAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/QuotedLineBreakAccessor.java index 6110341e68..9610ec9244 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/QuotedLineBreakAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/QuotedLineBreakAccessor.java @@ -179,12 +179,4 @@ public boolean writeNextObject(OneRow onerow) { public void closeForWrite() { throw new UnsupportedOperationException(String.format(UNSUPPORTED_ERR_MESSAGE, context.getProfile())); } - - /** - * Cancel write operation. - */ - @Override - public void cancelWrite() { - throw new UnsupportedOperationException(String.format(UNSUPPORTED_ERR_MESSAGE, context.getProfile())); - } } diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java index 1f9e5884da..cb4836fe0f 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java @@ -201,14 +201,6 @@ public void closeForWrite() throws Exception { } } - /** - * Cancel write operation. - */ - @Override - public void cancelWrite() { - throw new UnsupportedOperationException("Cancel write operation is not supported"); - } - public CompressionType getCompressionType() { return compressionType; } diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/orc/ORCVectorizedAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/orc/ORCVectorizedAccessor.java index 7c610a6774..645fe935f5 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/orc/ORCVectorizedAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/orc/ORCVectorizedAccessor.java @@ -163,14 +163,6 @@ public void closeForRead() throws IOException { } } - /** - * Cancel read operation. - */ - @Override - public void cancelRead() { - throw new UnsupportedOperationException("Cancel read operation is not supported"); - } - @Override public boolean openForWrite() throws IOException { HcfsType hcfsType = HcfsType.getHcfsType(context); @@ -225,14 +217,6 @@ public void closeForWrite() throws IOException { } } - /** - * Cancel write operation. - */ - @Override - public void cancelWrite() { - throw new UnsupportedOperationException("Cancel write operation is not supported"); - } - /** * Given a filter string, builds the SearchArgument object to perform * predicated pushdown for ORC diff --git a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessorTest.java b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessorTest.java index 5ca525cf80..44e0d6d48b 100644 --- a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessorTest.java +++ b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/HdfsSplittableDataAccessorTest.java @@ -39,10 +39,6 @@ public void closeForWrite() throws Exception { } - @Override - public void cancelWrite() { - } - @Override protected Object getReader(JobConf jobConf, InputSplit split) throws IOException { return null; diff --git a/server/pxf-hive/src/main/java/org/greenplum/pxf/plugins/hive/HiveAccessor.java b/server/pxf-hive/src/main/java/org/greenplum/pxf/plugins/hive/HiveAccessor.java index 1a4973b366..cc17c5217d 100644 --- a/server/pxf-hive/src/main/java/org/greenplum/pxf/plugins/hive/HiveAccessor.java +++ b/server/pxf-hive/src/main/java/org/greenplum/pxf/plugins/hive/HiveAccessor.java @@ -337,14 +337,6 @@ public void closeForWrite() { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); } - /** - * Cancel write operation. - */ - @Override - public void cancelWrite() { - throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); - } - /** * Creates the RecordReader suitable for this given split. * diff --git a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java index 627117b6c8..d2b308d879 100644 --- a/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java +++ b/server/pxf-jdbc/src/main/java/org/greenplum/pxf/plugins/jdbc/JdbcAccessor.java @@ -26,6 +26,7 @@ import org.greenplum.pxf.api.OneRow; import org.greenplum.pxf.api.error.PxfRuntimeException; import org.greenplum.pxf.api.model.Accessor; +import org.greenplum.pxf.api.model.CancelableOperation; import org.greenplum.pxf.api.model.ConfigurationFactory; import org.greenplum.pxf.api.security.SecureLogin; import org.greenplum.pxf.api.utilities.Utilities; @@ -57,7 +58,7 @@ * The INSERT queries are processed by {@link java.sql.PreparedStatement} and * built-in JDBC batches of arbitrary size */ -public class JdbcAccessor extends JdbcBasePlugin implements Accessor { +public class JdbcAccessor extends JdbcBasePlugin implements Accessor, CancelableOperation { private static final Logger LOG = LoggerFactory.getLogger(JdbcAccessor.class); diff --git a/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3SelectAccessor.java b/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3SelectAccessor.java index 70f0a06345..e5b79be27d 100644 --- a/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3SelectAccessor.java +++ b/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3SelectAccessor.java @@ -145,14 +145,6 @@ public void closeForRead() throws IOException { } } - /** - * Cancel read operation. - */ - @Override - public void cancelRead() { - throw new UnsupportedOperationException("Cancel read operation is not supported"); - } - /** * Generates the {@link SelectObjectContentRequest} object from * the request context. @@ -338,9 +330,4 @@ public boolean writeNextObject(OneRow onerow) { public void closeForWrite() { throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); } - - @Override - public void cancelWrite() { - throw new UnsupportedOperationException(UNSUPPORTED_ERR_MESSAGE); - } } diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java index e639a5a99b..99a95e6915 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java @@ -22,6 +22,7 @@ import org.greenplum.pxf.api.OneRow; import org.greenplum.pxf.api.error.BadRecordException; import org.greenplum.pxf.api.io.Writable; +import org.greenplum.pxf.api.model.CancelableOperation; import org.greenplum.pxf.api.model.RequestContext; import org.greenplum.pxf.service.BridgeOutputBuilder; import org.greenplum.pxf.service.utilities.BasePluginFactory; @@ -152,7 +153,11 @@ protected boolean isDataException(IOException ex) { @Override public void cancelIteration() throws Exception { try { - accessor.cancelRead(); + if (accessor instanceof CancelableOperation) { + ((CancelableOperation) accessor).cancelRead(); + } else { + throw new UnsupportedOperationException("Accessor does not support canceling read operation"); + } } catch (Exception e) { LOG.error("Failed to cancel read bridge iteration: {}", e.getMessage()); throw e; diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java index ab2c2f8653..f771372e3e 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java @@ -22,6 +22,7 @@ import org.greenplum.pxf.api.OneField; import org.greenplum.pxf.api.OneRow; import org.greenplum.pxf.api.io.Writable; +import org.greenplum.pxf.api.model.CancelableOperation; import org.greenplum.pxf.api.model.InputStreamHandler; import org.greenplum.pxf.api.model.OutputFormat; import org.greenplum.pxf.api.model.RequestContext; @@ -115,7 +116,11 @@ public void endIteration() throws Exception { @Override public void cancelIteration() throws Exception { try { - accessor.cancelWrite(); + if (accessor instanceof CancelableOperation) { + ((CancelableOperation) accessor).cancelWrite(); + } else { + throw new UnsupportedOperationException("Accessor does not support canceling write operation"); + } } catch (Exception e) { LOG.error("Failed to cancel write bridge iteration: {}", e.getMessage()); throw e; diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/TestAccessor.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/TestAccessor.java index 11c7361edd..52c320eae1 100644 --- a/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/TestAccessor.java +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/bridge/TestAccessor.java @@ -23,10 +23,7 @@ public OneRow readNextObject() { @Override public void closeForRead() { - } - @Override - public void cancelRead() { } @Override @@ -43,10 +40,6 @@ public boolean writeNextObject(OneRow onerow) { public void closeForWrite() { } - @Override - public void cancelWrite() { - } - @Override public void setRequestContext(RequestContext context) { } From d3206e5e1b3489861d8b645c2ff541c94cc4b74c Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Tue, 12 Mar 2024 11:45:27 +0200 Subject: [PATCH 21/24] ADBDEV-5095: Fix endpoint unit tests after adding spring security --- .../java/org/greenplum/pxf/service/rest/PxfResourceIT.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/pxf-service/src/test/java/org/greenplum/pxf/service/rest/PxfResourceIT.java b/server/pxf-service/src/test/java/org/greenplum/pxf/service/rest/PxfResourceIT.java index d18a7666f2..1ce8b205a1 100644 --- a/server/pxf-service/src/test/java/org/greenplum/pxf/service/rest/PxfResourceIT.java +++ b/server/pxf-service/src/test/java/org/greenplum/pxf/service/rest/PxfResourceIT.java @@ -7,6 +7,7 @@ import org.greenplum.pxf.service.RequestParser; import org.greenplum.pxf.service.controller.ReadService; import org.greenplum.pxf.service.controller.WriteService; +import org.greenplum.pxf.service.security.SecurityConfig; import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.springframework.beans.factory.annotation.Autowired; @@ -14,6 +15,7 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; import org.springframework.http.MediaType; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.ResultActions; @@ -34,6 +36,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @WebMvcTest({PxfReadResource.class, PxfWriteResource.class, PxfLegacyResource.class}) +@Import(SecurityConfig.class) public class PxfResourceIT { @Autowired From 4ec984a70559707f54c743328dd6d0c929343821 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Tue, 12 Mar 2024 13:17:12 +0200 Subject: [PATCH 22/24] ADBDEV-4987: Change logging in ReadBridge#cancelIteration and WriteBridge#cancelIteration --- .../main/java/org/greenplum/pxf/service/bridge/ReadBridge.java | 2 +- .../main/java/org/greenplum/pxf/service/bridge/WriteBridge.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java index 99a95e6915..8360852776 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/ReadBridge.java @@ -156,7 +156,7 @@ public void cancelIteration() throws Exception { if (accessor instanceof CancelableOperation) { ((CancelableOperation) accessor).cancelRead(); } else { - throw new UnsupportedOperationException("Accessor does not support canceling read operation"); + LOG.debug("Accessor [{}] does not support canceling read operation", accessor.getClass().getSimpleName()); } } catch (Exception e) { LOG.error("Failed to cancel read bridge iteration: {}", e.getMessage()); diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java index f771372e3e..7f56951a95 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/bridge/WriteBridge.java @@ -119,7 +119,7 @@ public void cancelIteration() throws Exception { if (accessor instanceof CancelableOperation) { ((CancelableOperation) accessor).cancelWrite(); } else { - throw new UnsupportedOperationException("Accessor does not support canceling write operation"); + LOG.debug("Accessor [{}] does not support canceling write operation", accessor.getClass().getSimpleName()); } } catch (Exception e) { LOG.error("Failed to cancel write bridge iteration: {}", e.getMessage()); From 51af1ea4a628c0c75861c574744fa9c8dee99c5c Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Tue, 12 Mar 2024 18:08:52 +0200 Subject: [PATCH 23/24] ADBDEV-4987: Add license agreement --- .../pxf/api/model/CancelableOperation.java | 19 +++++++++++++++++++ .../org/greenplum/pxf/api/model/Reloader.java | 19 +++++++++++++++++++ .../service/controller/RequestIdentifier.java | 19 +++++++++++++++++++ .../profile/ProfileReloadConfiguration.java | 19 +++++++++++++++++++ .../service/profile/ProfileReloadService.java | 19 +++++++++++++++++++ .../profile/ProfileReloadServiceImpl.java | 19 +++++++++++++++++++ .../rest/MaintenanceRestController.java | 19 +++++++++++++++++++ .../rest/dto/ProfileReloadRequestDto.java | 19 +++++++++++++++++++ .../pxf/service/security/SecurityConfig.java | 19 +++++++++++++++++++ 9 files changed, 171 insertions(+) diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/CancelableOperation.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/CancelableOperation.java index cd21ef1d35..2c4cbb7484 100644 --- a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/CancelableOperation.java +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/CancelableOperation.java @@ -1,5 +1,24 @@ package org.greenplum.pxf.api.model; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + public interface CancelableOperation { /** * Cancel read operation. Might contain additional logic comparing with {@link Accessor#closeForRead()} diff --git a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Reloader.java b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Reloader.java index 89a71aa217..be92ff39b9 100644 --- a/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Reloader.java +++ b/server/pxf-api/src/main/java/org/greenplum/pxf/api/model/Reloader.java @@ -1,5 +1,24 @@ package org.greenplum.pxf.api.model; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + public interface Reloader { void reloadAll(); diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/RequestIdentifier.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/RequestIdentifier.java index 675370f80d..0d63ed183f 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/RequestIdentifier.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/controller/RequestIdentifier.java @@ -1,5 +1,24 @@ package org.greenplum.pxf.service.controller; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadConfiguration.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadConfiguration.java index 524e2a47f5..d11e4600ad 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadConfiguration.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadConfiguration.java @@ -1,5 +1,24 @@ package org.greenplum.pxf.service.profile; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import lombok.extern.slf4j.Slf4j; import org.greenplum.pxf.api.model.Plugin; import org.greenplum.pxf.api.model.Reloader; diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadService.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadService.java index 83efd74dc6..d6beb8dd61 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadService.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadService.java @@ -1,5 +1,24 @@ package org.greenplum.pxf.service.profile; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import org.greenplum.pxf.service.rest.dto.ProfileReloadRequestDto; public interface ProfileReloadService { diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java index c8404aedc8..7196d1f491 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java @@ -1,5 +1,24 @@ package org.greenplum.pxf.service.profile; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.greenplum.pxf.api.error.PxfRuntimeException; diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java index e7bc588d28..d6e364251f 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/MaintenanceRestController.java @@ -1,5 +1,24 @@ package org.greenplum.pxf.service.rest; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import lombok.RequiredArgsConstructor; import org.greenplum.pxf.service.profile.ProfileReloadService; import org.greenplum.pxf.service.rest.dto.ProfileReloadRequestDto; diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/dto/ProfileReloadRequestDto.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/dto/ProfileReloadRequestDto.java index ab3440fe05..4856f4748a 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/dto/ProfileReloadRequestDto.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/rest/dto/ProfileReloadRequestDto.java @@ -1,5 +1,24 @@ package org.greenplum.pxf.service.rest.dto; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import lombok.Getter; import lombok.RequiredArgsConstructor; diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/security/SecurityConfig.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/security/SecurityConfig.java index 766688d289..f558d0f53a 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/security/SecurityConfig.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/security/SecurityConfig.java @@ -1,5 +1,24 @@ package org.greenplum.pxf.service.security; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.security.config.annotation.web.builders.HttpSecurity; From 5d8667d86a79d0c8c9251f2e2a8dad0aaf709929 Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Fri, 15 Mar 2024 12:27:40 +0200 Subject: [PATCH 24/24] ADBDEV-5139: Convert profile to lower case When PXF parses the profile which comes from the external table definition it also converts the profile name to lower case --- .../pxf/service/profile/ProfileReloadServiceImpl.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java index 7196d1f491..0657ff88ab 100644 --- a/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java +++ b/server/pxf-service/src/main/java/org/greenplum/pxf/service/profile/ProfileReloadServiceImpl.java @@ -50,7 +50,7 @@ public ProfileReloadServiceImpl(ReadService readService, @Override public void reloadProfile(ProfileReloadRequestDto reloadRequestDto) { - String profile = reloadRequestDto.getProfile(); + String profile = reloadRequestDto.getProfile() == null ? null : reloadRequestDto.getProfile().toLowerCase(); String server = reloadRequestDto.getServer(); log.info("Received a request to reload a profile with the parameters: profile={}, server={}", profile, server); if (StringUtils.isBlank(server)) { @@ -58,8 +58,10 @@ public void reloadProfile(ProfileReloadRequestDto reloadRequestDto) { } else if (StringUtils.isNotBlank(profile)) { reload(profile, server); } else { - throw new IllegalArgumentException(String.format("The provided parameters (profile=%s, server=%s) " + + String message = String.format(String.format("The provided parameters (profile=%s, server=%s) " + "are not correct. Please add profile", profile, server)); + log.error(message); + throw new IllegalArgumentException(message); } }