Skip to content

Commit

Permalink
Move 'asset-subsystems' to root; remove dev hacks
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin-paladin committed Jun 24, 2024
1 parent 3cbdd8c commit 213bdd0
Show file tree
Hide file tree
Showing 30 changed files with 247 additions and 90 deletions.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.paladincloud.common;

import com.paladincloud.common.assets.AssetGroups;
import com.paladincloud.common.assets.Assets;
import com.paladincloud.common.aws.Database;
import com.paladincloud.common.aws.S3;
Expand Down Expand Up @@ -32,13 +33,19 @@ S3 provideS3() {

@Singleton
@Provides
AssetTypes provideAssetTypes(ElasticSearch elasticSearch, Database database) {
return new AssetTypes(elasticSearch, database);
AssetTypes provideAssetTypes(ElasticSearch elasticSearch, Database database, AssetGroups assetGroups) {
return new AssetTypes(elasticSearch, database, assetGroups);
}

@Singleton
@Provides
Assets provideAssets(ElasticSearch elasticSearch, AssetTypes assetTypes, S3 s3, Database database) {
return new Assets(elasticSearch, assetTypes, s3, database);
}

@Singleton
@Provides
AssetGroups provideAssetGroups(ElasticSearch elasticSearch, Database database) {
return new AssetGroups(elasticSearch, database);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package com.paladincloud.common.assets;

import com.paladincloud.common.aws.Database;
import com.paladincloud.common.errors.JobException;
import com.paladincloud.common.search.ElasticSearch;
import com.paladincloud.common.search.ElasticSearch.HttpMethod;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class AssetGroups {

private static final Logger LOGGER = LogManager.getLogger(AssetGroups.class);
private static final String ASSET_GROUP_FOR_ALL_SOURCES = "all-sources";
private static final String ROW_EXISTS = "row_exists";
private static final Map<String, List<Map<String, String>>> databaseCache = new HashMap<>();
private static final List<String> dataSourceCache = new ArrayList<>();
private final ElasticSearch elasticSearch;
private final Database database;

@Inject
public AssetGroups(ElasticSearch elasticSearch, Database database) {
this.elasticSearch = elasticSearch;
this.database = database;
}

public void createDefaultGroup(String dataSource) {
try {
String queryForAllResources = STR."""
SELECT EXISTS (SELECT 1 FROM cf_AssetGroupDetails
WHERE groupName = '\{ASSET_GROUP_FOR_ALL_SOURCES}') AS \{ROW_EXISTS}
""";
var response = database.executeQuery(queryForAllResources).stream().findFirst();
var isAllSourcesMissing =
response.isPresent() && response.get().get(ROW_EXISTS).equals("0");
if (isAllSourcesMissing) {
// Creates ASSET_GROUP_FOR_ALL_SOURCES if not present
String aliasQuery = generateDefaultAssetGroupAliasQuery(dataSource);
insertDefaultAssetGroup(aliasQuery);
elasticSearch.invoke(HttpMethod.POST, "_aliases/", aliasQuery);
LOGGER.info("Created default asset group: {}", ASSET_GROUP_FOR_ALL_SOURCES);
}
} catch (Exception e) {
throw new JobException("Failed creating default asset group", e);
}
}

public void updateImpactedAliases(List<String> aliases, String dataSource) {
if (aliases.isEmpty()) {
return;
}

String query = STR."""
SELECT DISTINCT agd.groupId, agd.groupName, agd.groupType, agd.aliasQuery FROM cf_AssetGroupDetails AS agd
WHERE (agd.groupId NOT IN (
SELECT DISTINCT agcd.groupId
FROM cf_AssetGroupCriteriaDetails AS agcd
WHERE agcd.attributeName = 'CloudType') AND agd.groupType <> 'user' AND agd.groupType <> 'system'
AND agd.groupName <> '\{ASSET_GROUP_FOR_ALL_SOURCES}' and aliasQuery like '%_*%')
OR (agd.groupType = 'user')
OR (agd.groupName = '\{dataSource}' AND agd.groupType = 'system')
OR (agd.groupName = '\{ASSET_GROUP_FOR_ALL_SOURCES}')
OR (agd.groupType <> 'user'
AND agd.groupType <> 'system' AND agd.groupName <> '\{ASSET_GROUP_FOR_ALL_SOURCES}' and aliasQuery like '\{dataSource}_*%')
""".trim();

// var assetGroupsList = database.executeQuery()

LOGGER.warn("Updating aliases for {}: {}", dataSource, aliases);
}

private List<String> getCachedDataSourcesOrFetch(String dataSource) {
if (dataSourceCache.isEmpty()) {
dataSourceCache.addAll(getEnabledDataSources(dataSource));
}
return dataSourceCache;
}

private List<String> getEnabledDataSources(String dataSource) {
var dataSourceList = getCachedResultOrFetch(
"SELECT DISTINCT dataSourceName FROM cf_Target");
if (dataSourceList.isEmpty()) {
LOGGER.error("There are NO data sources");
return new ArrayList<>();
} else {
var enabledSources = dataSourceList.stream().map(row -> {
// Map to just the data source name
return row.get("dataSourceName");
}).filter(source -> {
var response = getCachedResultOrFetch(
STR."SELECT `value` FROM pac_config_properties WHERE cfkey = '\{source}.enabled'").stream()
.findFirst();
return response.isPresent() && response.get().get("value").equals("true");
}).collect(Collectors.toSet());
if (enabledSources.isEmpty()) {
LOGGER.error("There are NO enabled data sources");
}

// Assume the current data source is enabled
enabledSources.add(dataSource);
return new ArrayList<>(enabledSources);
}
}

private List<Map<String, String>> getCachedResultOrFetch(String query) {
if (databaseCache.containsKey(query)) {
return databaseCache.get(query);
} else {
var response = database.executeQuery(query);
databaseCache.put(query, response);
return response;
}
}

private void insertDefaultAssetGroup(String aliasQuery) {
Map<String, String> data = new HashMap<>();
data.put("groupId", UUID.randomUUID().toString());
data.put("groupName", ASSET_GROUP_FOR_ALL_SOURCES);
data.put("dataSource", "");
data.put("displayName", "All Sources");
data.put("groupType", "system");
data.put("createdBy", "admin@paladincloud.io");
data.put("createdUser", "admin@paladincloud.io");
data.put("modifiedUser", "admin@paladincloud.io");
data.put("description", "All assets from all Sources");
data.put("aliasQuery", aliasQuery);
data.put("isVisible", "1");
database.insert("cf_AssetGroupDetails", data);
}

private String generateDefaultAssetGroupAliasQuery(String dataSource) {
var actions = getCachedDataSourcesOrFetch(dataSource).stream().map(sourceName -> STR."""
{
"add": {
"index": "\{sourceName.toLowerCase().trim()}_*",
"alias": "\{ASSET_GROUP_FOR_ALL_SOURCES}"
}
}
""".trim());
return STR."""
{
"actions": [\{String.join(",", actions.toList())}]
}
""".trim();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,6 @@ private static void addTags(Map<String, Object> doc) {
doc.put(STR."\{AssetDocumentFields.TAGS_PREFIX}\{firstChar}\{remainder}", value);
});
}

}

private List<Map<String, Object>> fetchFromS3(String bucket, String path,
String dataSource, String type) {
try {
return s3.fetchData(bucket, path);
} catch (IOException e) {
throw new JobException(
STR."Exception fetching asset data for \{dataSource} from \{type}; path=\{path}",
e);
}
}

private static String assetsPathPrefix(String dataSource) {
Expand Down Expand Up @@ -220,6 +208,17 @@ private static void override(Map<String, Object> document,
}
}

private List<Map<String, Object>> fetchFromS3(String bucket, String path, String dataSource,
String type) {
try {
return s3.fetchData(bucket, path);
} catch (IOException e) {
throw new JobException(
STR."Exception fetching asset data for \{dataSource} from \{type}; path=\{path}",
e);
}
}

private void setMissingAccountName(Map<String, Object> newDocument,
Map<String, String> accountIdNameMap) {
String accountId = Stream.of(newDocument.get(AssetDocumentFields.PROJECT_ID),
Expand All @@ -244,11 +243,6 @@ public void upload(String dataSource) {
var allFilenames = s3.listObjects(ConfigService.get(ConfigConstants.S3.BUCKET_NAME),
assetsPathPrefix(dataSource));
var types = assetTypes.getTypesWithDisplayName(dataSource);
// TODO: DON'T CHECK THIS DEV HACK IN!
types = types.entrySet().stream().filter(e -> e.getKey().equals("cloudfunction"))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
LOGGER.error("LIMITED TYPES TO {}", types);

var fileTypes = FilesAndTypes.matchFilesAndTypes(allFilenames, types.keySet());
if (!fileTypes.unknownFiles.isEmpty()) {
LOGGER.warn("Unknown files: {}", fileTypes.unknownFiles);
Expand Down Expand Up @@ -307,10 +301,11 @@ public void upload(String dataSource) {
tags.size(), existingDocuments.size());
if (newDocuments.isEmpty()) {
// ERROR condition - update elastic index, it looks like
// TODO: Properly handle no discovered assets
throw new RuntimeException("Handle no discovered assets");
} else {
var updatableFields = database.executeQuery(
STR."select updatableFields from cf_pac_updatable_fields where resourceType ='\{type}'");
STR."select updatableFields from cf_pac_updatable_fields where resourceType ='\{type}'");
var overrides = database.executeQuery(
STR."select _resourceid,fieldname,fieldvalue from pacman_field_override where resourcetype = '\{type}'");
var overridesMap = overrides.parallelStream().collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.paladincloud.common.errors.JobException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
Expand All @@ -13,6 +14,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;

Expand Down Expand Up @@ -55,4 +57,19 @@ public List<Map<String, String>> executeQuery(String query) {

return results;
}

public void insert(String tableName, Map<String, String> row) {
var placeholders = String.join(",", Stream.generate(() -> "?").limit(row.size()).toList());
var columns = row.keySet().stream().toList();
var query = STR."INSERT INTO \{tableName} (\{String.join(",", columns)}) VALUES (\{placeholders})";
try (Connection conn = getConnection(); PreparedStatement statement = conn.prepareStatement(
query)) {
for (var index = 0; index < columns.size(); index++) {
statement.setString(index, row.get(columns.get(index)));
}
statement.executeUpdate();
} catch (SQLException e) {
throw new JobException("Error inserting row", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ public class AssetTypes {

private final ElasticSearch elasticSearch;
private final Database database;
private final AssetGroups assetGroups;


@Inject
public AssetTypes(ElasticSearch elasticSearch, Database database) {
public AssetTypes(ElasticSearch elasticSearch, Database database, AssetGroups assetGroups) {
this.elasticSearch = elasticSearch;
this.database = database;
this.assetGroups = assetGroups;
}

public Set<String> getTypes(String dataSource) {
Expand Down Expand Up @@ -66,16 +68,18 @@ public String getResourceNameType(String ds, String type) {
}

private Map<String, Map<String, String>> getTypeConfig(String dataSource) {
var targetTypesInclude = StringUtils.split(
ConfigService.get(Config.CONFIG_TARGET_TYPE_INCLUDE), ",", true);
var targetTypesExclude = StringUtils.split(
ConfigService.get(Config.CONFIG_TARGET_TYPE_EXCLUDE), ",", true);
var assetTypeOverride = StringUtils.split(
ConfigService.get(ConfigConstants.Dev.ASSET_TYPE_OVERRIDE), ",", true);
var targetTypesInclude = StringUtils.split(ConfigService.get(Config.TARGET_TYPE_INCLUDE),
",", true);
var targetTypesExclude = StringUtils.split(ConfigService.get(Config.TARGET_TYPE_EXCLUDE),
",", true);

if (typeInfo == null) {
typeInfo = new HashMap<>();

var query = ConfigService.get(Config.CONFIG_TYPES_QUERY)
+ STR." and dataSourceName = '\{dataSource}'";
var query =
ConfigService.get(Config.TYPES_QUERY) + STR." and dataSourceName = '\{dataSource}'";
var typeList = database.executeQuery(query);
try {
for (var type : typeList) {
Expand All @@ -85,8 +89,13 @@ private Map<String, Map<String, String>> getTypeConfig(String dataSource) {
new TypeReference<Map<String, String>>() {
});
config.put("displayName", displayName);
if ((targetTypesInclude.isEmpty() || targetTypesInclude.contains(typeName))
&& (!targetTypesExclude.contains(typeName))) {
if (!assetTypeOverride.isEmpty()) {
if (assetTypeOverride.contains(typeName)) {
typeInfo.put(typeName, config);
}
} else if (
(targetTypesInclude.isEmpty() || targetTypesInclude.contains(typeName))
&& (!targetTypesExclude.contains(typeName))) {
typeInfo.put(typeName, config);
}
}
Expand All @@ -95,6 +104,10 @@ private Map<String, Map<String, String>> getTypeConfig(String dataSource) {
}
}

if (!assetTypeOverride.isEmpty()) {
LOGGER.warn("Asset types overridden (requested = {}); actual = {}", assetTypeOverride,
typeInfo.keySet());
}
return typeInfo;
}

Expand All @@ -104,10 +117,6 @@ public void setupIndexAndTypes(String dataSource) {
for (var type : types) {
var indexName = StringExtras.indexName(dataSource, type);
if (elasticSearch.indexMissing(indexName)) {
if (!indexName.equals("kvt_gcp_cloudfunction")) {
LOGGER.error("Skipping index creation for {}", indexName);
continue;
}
newAssets.add(indexName);

var payload = STR."""
Expand Down Expand Up @@ -150,8 +159,8 @@ public void setupIndexAndTypes(String dataSource) {
}
}

AssetGroups.createDefaultGroup(dataSource);
AssetGroups.updateImpactedAliases(newAssets.stream().toList(), dataSource);
assetGroups.createDefaultGroup(dataSource);
assetGroups.updateImpactedAliases(newAssets.stream().toList(), dataSource);

try {
elasticSearch.createIndex("exceptions");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@

public interface ConfigConstants {

interface Dev {
String INDEX_PREFIX = "param.index-prefix";
String ASSET_TYPE_OVERRIDE = "param.asset-type-override";
}

interface Config {

String CONFIG_TYPES_QUERY = "param.config-query";
String CONFIG_TARGET_TYPE_INCLUDE = "param.target-type-include";
String CONFIG_TARGET_TYPE_EXCLUDE = "param.target-type-exclude";
String TYPES_QUERY = "param.config-query";
String TARGET_TYPE_INCLUDE = "param.target-type-include";
String TARGET_TYPE_EXCLUDE = "param.target-type-exclude";
}

interface Elastic {

String ELASTIC_HOST = "batch.elastic-search.host";
String ELASTIC_PORT = "batch.elastic-search.port";
String HOST = "batch.elastic-search.host";
String PORT = "batch.elastic-search.port";
}

interface RDS {
Expand Down
Loading

0 comments on commit 213bdd0

Please sign in to comment.