Skip to content

Commit

Permalink
Added Kafka AVRO support also on key. #2418
Browse files Browse the repository at this point in the history
  • Loading branch information
vertigo17 committed Dec 28, 2022
1 parent b87a730 commit cbe4a39
Show file tree
Hide file tree
Showing 16 changed files with 289 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,9 @@ public Answer create(AppService object) {
MessageEvent msg;
StringBuilder query = new StringBuilder()
.append("INSERT INTO appservice (`Service`, `Group`, `Application`, `Type`, `Method`, `ServicePath`, `isFollowRedir`, `Operation`, `ServiceRequest`, ")
.append(" `isAvroEnable`, `SchemaRegistryUrl`, `AvroSchema`, `ParentContentService`, `KafkaTopic`, `KafkaKey`, ")
.append(" `isAvroEnable`, `SchemaRegistryUrl`, `AvroSchemaKey`, `AvroSchemaValue`, `ParentContentService`, `KafkaTopic`, `KafkaKey`, ")
.append(" `KafkaFilterPath`, `KafkaFilterValue`, `KafkaFilterHeaderPath`, `KafkaFilterHeaderValue`, `AttachementURL`, `Description`, `FileName`, `UsrCreated`) ")
.append("VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
.append("VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");

LOG.debug(SQL_MESSAGE, query);

Expand All @@ -458,7 +458,8 @@ public Answer create(AppService object) {
preStat.setString(i++, object.getServiceRequest());
preStat.setBoolean(i++, object.isAvroEnable());
preStat.setString(i++, object.getSchemaRegistryURL());
preStat.setString(i++, object.getAvroSchema());
preStat.setString(i++, object.getAvroSchemaKey());
preStat.setString(i++, object.getAvroSchemaValue());
if (StringUtil.isNotEmpty(object.getParentContentService())) {
preStat.setString(i++, object.getParentContentService());
} else {
Expand Down Expand Up @@ -497,7 +498,7 @@ public Answer update(String service, AppService object) {
MessageEvent msg;
StringBuilder query = new StringBuilder()
.append("UPDATE appservice srv SET `Service` = ?, `Group` = ?, `ServicePath` = ?, `isFollowRedir` = ?, `Operation` = ?, ServiceRequest = ?, ")
.append("`isAvroEnable` = ?, `SchemaRegistryUrl` = ?, `AvroSchema` = ?, ParentContentService = ?, KafkaTopic = ?, KafkaKey = ?, ")
.append("`isAvroEnable` = ?, `SchemaRegistryUrl` = ?, `AvroSchemaKey` = ?, `AvroSchemaValue` = ?, ParentContentService = ?, KafkaTopic = ?, KafkaKey = ?, ")
.append("KafkaFilterPath = ?, KafkaFilterValue = ?, KafkaFilterHeaderPath = ?, KafkaFilterHeaderValue = ?, AttachementURL = ?, ")
.append("`Description` = ?, `Type` = ?, Method = ?, `UsrModif`= ?, `DateModif` = NOW(), `FileName` = ?");
if ((object.getApplication() != null) && (!object.getApplication().isEmpty())) {
Expand All @@ -523,7 +524,8 @@ public Answer update(String service, AppService object) {
preStat.setString(i++, object.getServiceRequest());
preStat.setBoolean(i++, object.isAvroEnable());
preStat.setString(i++, object.getSchemaRegistryURL());
preStat.setString(i++, object.getAvroSchema());
preStat.setString(i++, object.getAvroSchemaKey());
preStat.setString(i++, object.getAvroSchemaValue());
if (StringUtil.isEmpty(object.getParentContentService())) {
preStat.setString(i++, null);
} else {
Expand Down Expand Up @@ -646,10 +648,11 @@ public AppService loadFromResultSet(ResultSet rs) throws SQLException {
boolean isFollowRedir = rs.getBoolean("srv.isFollowRedir");
boolean isAvroEnable = rs.getBoolean("srv.isAvroEnable");
String schemaRegistryURL = ParameterParserUtil.parseStringParam(rs.getString("srv.SchemaRegistryUrl"), "");
String avroSchema = ParameterParserUtil.parseStringParam(rs.getString("srv.AvroSchema"), "");
String avroSchemaKey = ParameterParserUtil.parseStringParam(rs.getString("srv.AvroSchemaKey"), "");
String avroSchemaValue = ParameterParserUtil.parseStringParam(rs.getString("srv.AvroSchemaValue"), "");
String parentContentService = ParameterParserUtil.parseStringParam(rs.getString("srv.ParentContentService"), "");
return factoryAppService.create(service, type, method, application, group, serviceRequest, kafkaTopic, kafkaKey, kafkaFilterPath, kafkaFilterValue, kafkaFilterHeaderPath, kafkaFilterHeaderValue,
description, servicePath, isFollowRedir, attachementURL, operation, isAvroEnable, schemaRegistryURL, avroSchema, parentContentService, usrCreated, dateCreated, usrModif, dateModif, fileName);
description, servicePath, isFollowRedir, attachementURL, operation, isAvroEnable, schemaRegistryURL, avroSchemaKey, avroSchemaValue, parentContentService, usrCreated, dateCreated, usrModif, dateModif, fileName);
}

private static void deleteFolder(File folder, boolean deleteit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class AppService {
private String kafkaFilterHeaderValue;
private boolean isAvroEnable;
private String schemaRegistryURL;
private String avroSchema;
private String avroSchemaValue;
private String avroSchemaKey;
private String parentContentService;
private String group; // Information in order to group the services in order to organise them
private String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public interface IFactoryAppService {
* @param operation
* @param isAvroEnable
* @param schemaRegistryUrl
* @param avroSchema
* @param avroSchemaKey
* @param avroSchemaValue
* @param parentContentService
* @param usrCreated
* @param dateCreated
Expand All @@ -60,6 +61,6 @@ public interface IFactoryAppService {
*/
AppService create(String service, String type, String method, String application, String group, String serviceRequest,
String kafkaTopic, String kafkaKey, String kafkaFilterPath, String kafkaFilterValue, String kafkaFilterHeaderPath, String kafkaFilterHeaderValue,
String description, String servicePath, boolean isFollowRedir, String attachementURL, String operation, boolean isAvroEnable, String schemaRegistryUrl, String avroSchema, String parentContentService,
String description, String servicePath, boolean isFollowRedir, String attachementURL, String operation, boolean isAvroEnable, String schemaRegistryUrl, String avroSchemaKey, String avroSchemaValue, String parentContentService,
String usrCreated, Timestamp dateCreated, String usrModif, Timestamp dateModif, String fileName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class FactoryAppService implements IFactoryAppService {
public AppService create(String service, String type, String method, String application, String group, String serviceRequest, String kafkaTopic, String kafkaKey,
String kafkaFilterPath, String kafkaFilterValue, String kafkaFilterHeaderPath, String kafkaFilterHeaderValue,
String description,
String servicePath, boolean isFollowRedir, String attachementURL, String operation, boolean isAvroEnable, String schemaRegistryUrl, String avroSchema, String parentContentService,
String servicePath, boolean isFollowRedir, String attachementURL, String operation, boolean isAvroEnable, String schemaRegistryUrl, String avroSchemaKey, String avroSchemaValue, String parentContentService,
String usrCreated, Timestamp dateCreated, String usrModif, Timestamp dateModif, String fileName) {
AppService s = new AppService();
s.setService(service);
Expand Down Expand Up @@ -71,7 +71,8 @@ public AppService create(String service, String type, String method, String appl
s.setKafkaFilterHeaderValue(kafkaFilterHeaderValue);
s.setAvroEnable(isAvroEnable);
s.setSchemaRegistryURL(schemaRegistryUrl);
s.setAvroSchema(avroSchema);
s.setAvroSchemaKey(avroSchemaKey);
s.setAvroSchemaValue(avroSchemaValue);
s.setParentContentService(parentContentService);
s.setRecordTraceFile(true);
s.setFollowRedir(isFollowRedir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ private MessageEvent doActionGetRobotFile(TestCaseExecution execution, TestCaseS
}
LOG.debug(contentJSON.toString(1));

AppService appSrv = factoryAppService.create("", "", "", "", "", "", "", "", "", "", "", "", "", "", false, "", "", false, "", "", "", "", null, "", null, "");
AppService appSrv = factoryAppService.create("", "", "", "", "", "", "", "", "", "", "", "", "", "", false, "", "", false, "", "", "", "", "", null, "", null, "");
JSONObject contentJSONnew = new JSONObject();

// We copy the header values for the service answered.
Expand Down Expand Up @@ -1893,7 +1893,7 @@ private MessageEvent doActionSetNetworkTrafficContent(TestCaseExecution exe, Tes

har = harService.enrichWithStats(har, exe.getCountryEnvironmentParameters().getDomain(), exe.getSystem(), exe.getNetworkTrafficIndexList());

AppService appSrv = factoryAppService.create("", AppService.TYPE_REST, AppService.METHOD_HTTPGET, "", "", "", "", "", "", "", "", "", "", "", true, "", "", false, "", "", "", "", null, "", null, null);
AppService appSrv = factoryAppService.create("", AppService.TYPE_REST, AppService.METHOD_HTTPGET, "", "", "", "", "", "", "", "", "", "", "", true, "", "", false, "", "", "", "", "", null, "", null, null);
appSrv.setResponseHTTPBody(har.toString());
appSrv.setResponseHTTPBodyContentType(AppService.RESPONSEHTTPBODYCONTENTTYPE_JSON);
appSrv.setRecordTraceFile(false);
Expand Down Expand Up @@ -1974,7 +1974,7 @@ private MessageEvent doActionSetConsoleContent(TestCaseExecution exe, TestCaseSt
consoleStat = consolelogService.enrichWithStats(consoleLogs);
consoleRecap.put("stat", consoleStat);

AppService appSrv = factoryAppService.create("", "", "", "", "", "", "", "", "", "", "", "", "", "", false, "", "", false, "", "", null, "", null, "", null, "");
AppService appSrv = factoryAppService.create("", "", "", "", "", "", "", "", "", "", "", "", "", "", false, "", "", false, "", "", "", null, "", null, "", null, "");
appSrv.setResponseHTTPBody(consoleRecap.toString());
appSrv.setResponseHTTPBodyContentType(AppService.RESPONSEHTTPBODYCONTENTTYPE_JSON);
appSrv.setRecordTraceFile(false);
Expand Down Expand Up @@ -2007,7 +2007,7 @@ private MessageEvent doActionSetContent(TestCaseExecution exe, TestCaseStepActio
*/
LOG.debug("Setting static content.");

AppService appSrv = factoryAppService.create("", "", "", "", "", "", "", "", "", "", "", "", "", "", false, "", "", false, "", "", "", "", null, "", null, "");
AppService appSrv = factoryAppService.create("", "", "", "", "", "", "", "", "", "", "", "", "", "", false, "", "", false, "", "", "", "", "", null, "", null, "");
appSrv.setResponseHTTPBody(textContent);
appSrv.setResponseHTTPBodyContentType(appServiceService.guessContentType(appSrv, AppService.RESPONSEHTTPBODYCONTENTTYPE_JSON));
appSrv.setRecordTraceFile(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public AnswerItem<AppService> callService(String service, String targetNbEvents,
if (StringUtil.isEmpty(service)) {
LOG.debug("Creating AppService from parameters.");
appService = factoryAppService.create("null", AppService.TYPE_SOAP, "", "", "", request, "", "", "", "", "", "", "Automatically created Service from datalib.",
servicePathParam, true, "", operation, false, "", "", null, null, null, null, null, null);
servicePathParam, true, "", operation, false, "", "", "", null, null, null, null, null, null);
service = "null";

} else {
Expand Down Expand Up @@ -471,7 +471,7 @@ public AnswerItem<AppService> callService(String service, String targetNbEvents,
* Call REST and store it into the execution.
*/
result = kafkaService.produceEvent(decodedTopic, decodedKey, decodedRequest, decodedServicePath, appService.getHeaderList(), appService.getContentList(),
token, appService.isAvroEnable(), decodedSchemaRegistryURL, appService.getAvroSchema(), timeOutMs);
token, appService.isAvroEnable(), decodedSchemaRegistryURL, appService.getAvroSchemaKey(), appService.getAvroSchemaValue(), timeOutMs);
message = result.getResultMessage();
break;

Expand Down Expand Up @@ -566,7 +566,7 @@ public AnswerItem<AppService> callService(String service, String targetNbEvents,
String kafkaKey = kafkaService.getKafkaConsumerKey(decodedTopic, decodedServicePath);
AnswerItem<String> resultSearch = kafkaService.searchEvent(tCExecution.getKafkaLatestOffset().get(kafkaKey), decodedTopic, decodedServicePath,
appService.getHeaderList(), appService.getContentList(), decodedFilterPath, decodedFilterValue, decodedFilterHeaderPath, decodedFilterHeaderValue,
appService.isAvroEnable(), decodedSchemaRegistryURL, targetNbEventsInt, targetNbSecInt);
appService.isAvroEnable(), decodedSchemaRegistryURL, StringUtil.isNotEmpty(appService.getAvroSchemaKey()), StringUtil.isNotEmpty(appService.getAvroSchemaValue()), targetNbEventsInt, targetNbSecInt);

if (!(resultSearch.isCodeStringEquals("OK"))) {
message = new MessageEvent(MessageEventEnum.ACTION_FAILED_CALLSERVICE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public AnswerItem<AppService> callFTP(String chain, String system, String conten

FTPClient ftp = new FTPClient();
AppService myResponse = factoryAppService.create(service, AppService.TYPE_FTP,
method, "", "", content, "", "", "", "", "", "", "", informations.get("path"), true, "", "", false, "", "", "", "", null, "", null, filePath);
method, "", "", content, "", "", "", "", "", "", "", informations.get("path"), true, "", "", false, "", "", "", "", "", null, "", null, filePath);

try {
if (proxyService.useProxy(StringUtil.getURLFromString(informations.get("host"), "", "", "ftp://"), system)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,22 @@ public interface IKafkaService {
* @param token
* @param isAvroEnable
* @param schemaRegistryURL
* @param avroSchema
* @param avroSchemaKey
* @param avroSchemaValue
* @param timeoutMs
* @return
*/
public AnswerItem<AppService> produceEvent(String topic, String key, String eventMessage,
String bootstrapServers, List<AppServiceHeader> serviceHeader, List<AppServiceContent> serviceContent, String token, boolean isAvroEnable, String schemaRegistryURL, String avroSchema, int timeoutMs);
String bootstrapServers, List<AppServiceHeader> serviceHeader, List<AppServiceContent> serviceContent, String token, boolean isAvroEnable, String schemaRegistryURL, String avroSchemaKey, String avroSchemaValue, int timeoutMs);

/**
* Get the last offset of every partition.
*
* @param topic
* @param bootstrapServers
* @param serviceHeader
* @param timeoutMs
* @return
* @return a map that contain the last offset of every partition.
* @throws InterruptedException
* @throws ExecutionException
*/
Expand All @@ -90,16 +92,20 @@ public AnswerItem<Map<TopicPartition, Long>> seekEvent(String topic, String boot
* @param targetNbEventsInt
* @param filterHeaderValue
* @param avroEnable
* @param avroEnableKey
* @param avroEnableValue
* @param schemaRegistryURL
* @param targetNbSecInt
* @return
*/
public AnswerItem<String> searchEvent(Map<TopicPartition, Long> mapOffsetPosition, String topic, String bootstrapServers,
List<AppServiceHeader> serviceHeader, List<AppServiceContent> serviceContent, String filterPath, String filterValue, String filterHeaderPath, String filterHeaderValue,
boolean avroEnable, String schemaRegistryURL, int targetNbEventsInt, int targetNbSecInt);
List<AppServiceHeader> serviceHeader, List<AppServiceContent> serviceContent, String filterPath, String filterValue, String filterHeaderPath, String filterHeaderValue,
boolean avroEnable, String schemaRegistryURL, boolean avroEnableKey, boolean avroEnableValue, int targetNbEventsInt, int targetNbSecInt);

/**
* Get the latest Offset of all partitions.
* Get the latest Offset of all partitions. This is triggered at the
* beginning of the execution only when at least a SEARCH KAFKA service is
* called.
*
* @param mainExecutionTestCaseStepList
* @param tCExecution
Expand Down
Loading

0 comments on commit cbe4a39

Please sign in to comment.