Skip to content

Commit

Permalink
SONAR-9480 fix support of multiple docs to recover with same id
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Brandhof committed Jul 11, 2017
1 parent 6f107dc commit 75e926d
Show file tree
Hide file tree
Showing 24 changed files with 461 additions and 153 deletions.
Expand Up @@ -138,6 +138,12 @@ private void defineRulesXoo(Context context) {
.setGapDescription("A certified architect will need roughly half an hour to start working on removal of modules, " + .setGapDescription("A certified architect will need roughly half an hour to start working on removal of modules, " +
"then it's about one hour per module."); "then it's about one hour per module.");


repo
.createRule("xoo-template")
.setTemplate(true)
.setName("Template of rule")
.setHtmlDescription("Template to be overridden by custom rules");

repo.done(); repo.done();


} }
Expand Down
Expand Up @@ -42,7 +42,7 @@ public void define_xoo_rules() {
assertThat(repo).isNotNull(); assertThat(repo).isNotNull();
assertThat(repo.name()).isEqualTo("Xoo"); assertThat(repo.name()).isEqualTo("Xoo");
assertThat(repo.language()).isEqualTo("xoo"); assertThat(repo.language()).isEqualTo("xoo");
assertThat(repo.rules()).hasSize(17); assertThat(repo.rules()).hasSize(18);


RulesDefinition.Rule rule = repo.rule(OneIssuePerLineSensor.RULE_KEY); RulesDefinition.Rule rule = repo.rule(OneIssuePerLineSensor.RULE_KEY);
assertThat(rule.name()).isNotEmpty(); assertThat(rule.name()).isNotEmpty();
Expand Down
19 changes: 0 additions & 19 deletions server/sonar-db-dao/src/main/java/org/sonar/db/es/EsQueueDto.java
Expand Up @@ -93,25 +93,6 @@ public String toString() {
return sb.toString(); return sb.toString();
} }


@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof EsQueueDto)) {
return false;
}

EsQueueDto that = (EsQueueDto) o;

return uuid.equals(that.uuid);
}

@Override
public int hashCode() {
return uuid.hashCode();
}

public static EsQueueDto create(Type docType, String docUuid) { public static EsQueueDto create(Type docType, String docUuid) {
return new EsQueueDto().setDocType(docType).setDocId(docUuid); return new EsQueueDto().setDocType(docType).setDocId(docUuid);
} }
Expand Down
Expand Up @@ -29,7 +29,7 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.math.RandomUtils; import org.apache.commons.lang.math.RandomUtils;
import org.sonar.api.Startable; import org.sonar.api.Startable;
import org.sonar.api.config.Settings; import org.sonar.api.config.Configuration;
import org.sonar.api.utils.System2; import org.sonar.api.utils.System2;
import org.sonar.api.utils.log.Logger; import org.sonar.api.utils.log.Logger;
import org.sonar.api.utils.log.Loggers; import org.sonar.api.utils.log.Loggers;
Expand Down Expand Up @@ -63,18 +63,18 @@ public class RecoveryIndexer implements Startable {
.setNameFormat("RecoveryIndexer-%d") .setNameFormat("RecoveryIndexer-%d")
.build()); .build());
private final System2 system2; private final System2 system2;
private final Settings settings; private final Configuration config;
private final DbClient dbClient; private final DbClient dbClient;
private final UserIndexer userIndexer; private final UserIndexer userIndexer;
private final RuleIndexer ruleIndexer; private final RuleIndexer ruleIndexer;
private final ActiveRuleIndexer activeRuleIndexer; private final ActiveRuleIndexer activeRuleIndexer;
private final long minAgeInMs; private final long minAgeInMs;
private final long loopLimit; private final long loopLimit;


public RecoveryIndexer(System2 system2, Settings settings, DbClient dbClient, public RecoveryIndexer(System2 system2, Configuration config, DbClient dbClient,
UserIndexer userIndexer, RuleIndexer ruleIndexer, ActiveRuleIndexer activeRuleIndexer) { UserIndexer userIndexer, RuleIndexer ruleIndexer, ActiveRuleIndexer activeRuleIndexer) {
this.system2 = system2; this.system2 = system2;
this.settings = settings; this.config = config;
this.dbClient = dbClient; this.dbClient = dbClient;
this.userIndexer = userIndexer; this.userIndexer = userIndexer;
this.ruleIndexer = ruleIndexer; this.ruleIndexer = ruleIndexer;
Expand Down Expand Up @@ -161,10 +161,7 @@ private static ListMultimap<EsQueueDto.Type, EsQueueDto> groupItemsByType(Collec
} }


private long getSetting(String key, long defaultValue) { private long getSetting(String key, long defaultValue) {
long val = settings.getLong(key); long val = config.getLong(key).orElse(defaultValue);
if (val <= 0) {
val = defaultValue;
}
LOGGER.debug(LOG_PREFIX + "{}={}", key, val); LOGGER.debug(LOG_PREFIX + "{}={}", key, val);
return val; return val;
} }
Expand Down
Expand Up @@ -19,17 +19,15 @@
*/ */
package org.sonar.server.es; package org.sonar.server.es;


import com.google.common.collect.Multimap;
import java.util.Collection; import java.util.Collection;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.function.Function; import java.util.function.Function;
import org.sonar.core.util.stream.MoreCollectors; import org.sonar.core.util.stream.MoreCollectors;
import org.sonar.db.DbClient; import org.sonar.db.DbClient;
import org.sonar.db.DbSession; import org.sonar.db.DbSession;
import org.sonar.db.es.EsQueueDto; import org.sonar.db.es.EsQueueDto;


import static java.util.stream.Collectors.toMap;

/** /**
* Clean-up the db table es_queue when documents * Clean-up the db table es_queue when documents
* are successfully indexed so that the recovery * are successfully indexed so that the recovery
Expand All @@ -50,11 +48,12 @@ public ResiliencyIndexingListener(DbClient dbClient, DbSession dbSession, Collec
@Override @Override
public void onSuccess(Collection<String> docIds) { public void onSuccess(Collection<String> docIds) {
if (!docIds.isEmpty()) { if (!docIds.isEmpty()) {
Map<String, EsQueueDto> itemsById = items.stream().collect(toMap(EsQueueDto::getDocId, Function.identity())); Multimap<String, EsQueueDto> itemsById = items.stream().collect(MoreCollectors.index(EsQueueDto::getDocId, Function.identity()));


Collection<EsQueueDto> itemsToDelete = docIds Collection<EsQueueDto> itemsToDelete = docIds
.stream() .stream()
.map(itemsById::get) .map(itemsById::get)
.flatMap(Collection::stream)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(MoreCollectors.toArrayList(docIds.size())); .collect(MoreCollectors.toArrayList(docIds.size()));
dbClient.esQueueDao().delete(dbSession, itemsToDelete); dbClient.esQueueDao().delete(dbSession, itemsToDelete);
Expand Down
Expand Up @@ -81,8 +81,7 @@ public boolean update(DbSession dbSession, RuleUpdate update, OrganizationDto or
apply(update, rule, userSession); apply(update, rule, userSession);
update(dbSession, rule); update(dbSession, rule);
updateParameters(dbSession, organization, update, rule); updateParameters(dbSession, organization, update, rule);
ruleIndexer.commitAndIndex(dbSession, rule.getKey()); ruleIndexer.commitAndIndex(dbSession, rule.getKey(), organization);
ruleIndexer.commitAndIndex(dbSession, organization, rule.getKey());


return true; return true;
} }
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.sonar.api.rule.RuleKey; import org.sonar.api.rule.RuleKey;
import org.sonar.db.rule.RuleExtensionForIndexingDto; import org.sonar.db.rule.RuleExtensionForIndexingDto;
import org.sonar.db.rule.RuleForIndexingDto; import org.sonar.db.rule.RuleForIndexingDto;
import org.sonar.db.rule.RuleMetadataDto;
import org.sonar.server.es.BaseDoc; import org.sonar.server.es.BaseDoc;


public class RuleExtensionDoc extends BaseDoc { public class RuleExtensionDoc extends BaseDoc {
Expand All @@ -41,7 +40,7 @@ public RuleExtensionDoc() {


@Override @Override
public String getId() { public String getId() {
return getRuleKey() + "|" + getScope().getScope(); return idOf(getRuleKey(), getScope());
} }


@Override @Override
Expand Down Expand Up @@ -81,13 +80,6 @@ public RuleExtensionDoc setTags(Set<String> tags) {
return this; return this;
} }


public static RuleExtensionDoc of(RuleKey key, RuleExtensionScope scope, RuleMetadataDto ruleExtension) {
return new RuleExtensionDoc()
.setRuleKey(key)
.setScope(scope)
.setTags(ruleExtension.getTags());
}

public static RuleExtensionDoc of(RuleForIndexingDto rule) { public static RuleExtensionDoc of(RuleForIndexingDto rule) {
return new RuleExtensionDoc() return new RuleExtensionDoc()
.setRuleKey(rule.getRuleKey()) .setRuleKey(rule.getRuleKey())
Expand All @@ -102,6 +94,10 @@ public static RuleExtensionDoc of(RuleExtensionForIndexingDto rule) {
.setTags(rule.getTagsAsSet()); .setTags(rule.getTagsAsSet());
} }


public static String idOf(RuleKey ruleKey, RuleExtensionScope scope) {
return ruleKey + "|" + scope.getScope();
}

@Override @Override
public String toString() { public String toString() {
return ReflectionToStringBuilder.toString(this); return ReflectionToStringBuilder.toString(this);
Expand Down
Expand Up @@ -45,9 +45,10 @@
import org.sonar.server.es.StartupIndexer; import org.sonar.server.es.StartupIndexer;


import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.sonar.core.util.stream.MoreCollectors.toHashSet; import static org.sonar.core.util.stream.MoreCollectors.toArrayList;
import static org.sonar.server.rule.index.RuleIndexDefinition.INDEX_TYPE_RULE; import static org.sonar.server.rule.index.RuleIndexDefinition.INDEX_TYPE_RULE;
import static org.sonar.server.rule.index.RuleIndexDefinition.INDEX_TYPE_RULE_EXTENSION; import static org.sonar.server.rule.index.RuleIndexDefinition.INDEX_TYPE_RULE_EXTENSION;


Expand Down Expand Up @@ -95,78 +96,70 @@ public void commitAndIndex(DbSession dbSession, RuleKey ruleKey) {
commitAndIndex(dbSession, singletonList(ruleKey)); commitAndIndex(dbSession, singletonList(ruleKey));
} }


public void commitAndIndex(DbSession dbSession, Collection<RuleKey> ruleDtos) { public void commitAndIndex(DbSession dbSession, Collection<RuleKey> ruleKeys) {
List<EsQueueDto> items = ruleDtos.stream() List<EsQueueDto> items = ruleKeys.stream()
.map(key -> EsQueueDto.create(EsQueueDto.Type.RULE, key.toString())) .map(RuleIndexer::createQueueDtoForRule)
.collect(MoreCollectors.toArrayList()); .collect(MoreCollectors.toArrayList());


dbClient.esQueueDao().insert(dbSession, items); dbClient.esQueueDao().insert(dbSession, items);
dbSession.commit(); dbSession.commit();
postCommit(dbSession, ruleDtos, items); postCommit(dbSession, items);
} }


public void commitAndIndex(DbSession dbSession, OrganizationDto organizationDto, RuleKey ruleKey) { /**
List<EsQueueDto> items = singletonList(EsQueueDto.create(EsQueueDto.Type.RULE_EXTENSION, ruleKey + "|" + organizationDto.getUuid())); * Commit a change on a rule and its extension on the given organization

*/
public void commitAndIndex(DbSession dbSession, RuleKey ruleKey, OrganizationDto organization) {
List<EsQueueDto> items = asList(createQueueDtoForRule(ruleKey), createQueueDtoForRuleExtension(ruleKey, organization));
dbClient.esQueueDao().insert(dbSession, items); dbClient.esQueueDao().insert(dbSession, items);
dbSession.commit(); dbSession.commit();
postCommit(dbSession, ruleKey, organizationDto, items); postCommit(dbSession, items);
} }


/** /**
* Entry point for Byteman tests. See directory tests/resilience. * This method is used by the Byteman script of integration tests.
* The parameter "ruleKeys" is used only by the Byteman script.
*/ */
private void postCommit(DbSession dbSession, Collection<RuleKey> ruleKeys, Collection<EsQueueDto> items) { private void postCommit(DbSession dbSession, List<EsQueueDto> items) {
index(dbSession, items);
}

private void postCommit(DbSession dbSession, RuleKey ruleKeys, OrganizationDto organizationDto, Collection<EsQueueDto> items) {
index(dbSession, items); index(dbSession, items);
} }


@Override @Override
public IndexingResult index(DbSession dbSession, Collection<EsQueueDto> items) { public IndexingResult index(DbSession dbSession, Collection<EsQueueDto> items) {
if (items.isEmpty()) {
return new IndexingResult();
}

IndexingResult result = new IndexingResult(); IndexingResult result = new IndexingResult();

if (!items.isEmpty()) {
ListMultimap<EsQueueDto.Type, EsQueueDto> itemsByType = groupItemsByType(items); ListMultimap<EsQueueDto.Type, EsQueueDto> itemsByType = groupItemsByType(items);

result.add(doIndexRules(dbSession, itemsByType.get(EsQueueDto.Type.RULE)));
result.add(doIndexRules(dbSession, itemsByType.get(EsQueueDto.Type.RULE))); result.add(doIndexRuleExtensions(dbSession, itemsByType.get(EsQueueDto.Type.RULE_EXTENSION)));
result.add(doIndexRuleExtensions(dbSession, itemsByType.get(EsQueueDto.Type.RULE_EXTENSION))); }

return result; return result;
} }


private IndexingResult doIndexRules(DbSession dbSession, List<EsQueueDto> items) { private IndexingResult doIndexRules(DbSession dbSession, List<EsQueueDto> items) {
BulkIndexer bulkIndexer = createBulkIndexer(Size.REGULAR, new ResiliencyIndexingListener(dbClient, dbSession, items)); BulkIndexer bulkIndexer = createBulkIndexer(Size.REGULAR, new ResiliencyIndexingListener(dbClient, dbSession, items));
bulkIndexer.start(); bulkIndexer.start();


Set<RuleKey> rules = items List<RuleKey> ruleKeys = items
.stream() .stream()
.filter(i -> { .filter(i -> {
requireNonNull(i.getDocId(), () -> "BUG - " + i + " has not been persisted before indexing"); requireNonNull(i.getDocId(), () -> "BUG - " + i + " has not been persisted before indexing");
return i.getDocType() == EsQueueDto.Type.RULE; return i.getDocType() == EsQueueDto.Type.RULE;
}) })
.map(i -> RuleKey.parse(i.getDocId())) .map(i -> RuleKey.parse(i.getDocId()))
.collect(toHashSet(items.size())); .collect(toArrayList(items.size()));


dbClient.ruleDao().scrollIndexingRulesByKeys(dbSession, rules, dbClient.ruleDao().scrollIndexingRulesByKeys(dbSession, ruleKeys,
// only index requests, no deletion requests. // only index requests, no deletion requests.
// Deactivated users are not deleted but updated. // Deactivated users are not deleted but updated.
r -> { r -> {
rules.remove(r.getRuleKey());
bulkIndexer.add(newRuleDocIndexRequest(r)); bulkIndexer.add(newRuleDocIndexRequest(r));
bulkIndexer.add(newRuleExtensionDocIndexRequest(r)); bulkIndexer.add(newRuleExtensionDocIndexRequest(r));
ruleKeys.remove(r.getRuleKey());
}); });


// the remaining items reference rows that don't exist in db. They must // the remaining items reference rows that don't exist in db. They must
// be deleted from index. // be deleted from index.
rules.forEach(r -> bulkIndexer.addDeletion(RuleIndexDefinition.INDEX_TYPE_RULE, r.toString())); ruleKeys.forEach(r -> bulkIndexer.addDeletion(RuleIndexDefinition.INDEX_TYPE_RULE, r.toString(), r.toString()));
rules.forEach(r -> bulkIndexer.addDeletion(RuleIndexDefinition.INDEX_TYPE_RULE_EXTENSION, r.toString())); ruleKeys.forEach(r -> bulkIndexer.addDeletion(RuleIndexDefinition.INDEX_TYPE_RULE_EXTENSION, RuleExtensionDoc.idOf(r, RuleExtensionScope.system()), r.toString()));


return bulkIndexer.stop(); return bulkIndexer.stop();
} }
Expand All @@ -175,26 +168,30 @@ private IndexingResult doIndexRuleExtensions(DbSession dbSession, List<EsQueueDt
BulkIndexer bulkIndexer = createBulkIndexer(Size.REGULAR, new ResiliencyIndexingListener(dbClient, dbSession, items)); BulkIndexer bulkIndexer = createBulkIndexer(Size.REGULAR, new ResiliencyIndexingListener(dbClient, dbSession, items));
bulkIndexer.start(); bulkIndexer.start();


Set<RuleExtensionId> docIds = items List<RuleExtensionId> docIds = items
.stream() .stream()
.filter(i -> { .filter(i -> {
requireNonNull(i.getDocId(), () -> "BUG - " + i + " has not been persisted before indexing"); requireNonNull(i.getDocId(), () -> "BUG - " + i + " has not been persisted before indexing");
return i.getDocType() == EsQueueDto.Type.RULE_EXTENSION; return i.getDocType() == EsQueueDto.Type.RULE_EXTENSION;
}) })
.map(RuleIndexer::explodeRuleExtensionDocId) .map(RuleIndexer::explodeRuleExtensionDocId)
.collect(toHashSet(items.size())); .collect(toArrayList(items.size()));


dbClient.ruleDao().scrollIndexingRuleExtensionsByIds(dbSession, docIds, dbClient.ruleDao().scrollIndexingRuleExtensionsByIds(dbSession, docIds,
// only index requests, no deletion requests. // only index requests, no deletion requests.
// Deactivated users are not deleted but updated. // Deactivated users are not deleted but updated.
r -> { r -> {
docIds.remove(new RuleExtensionId(r.getOrganizationUuid(), r.getPluginName(), r.getPluginRuleKey())); RuleExtensionId docId = new RuleExtensionId(r.getOrganizationUuid(), r.getPluginName(), r.getPluginRuleKey());
docIds.remove(docId);
bulkIndexer.add(newRuleExtensionDocIndexRequest(r)); bulkIndexer.add(newRuleExtensionDocIndexRequest(r));
}); });


// the remaining items reference rows that don't exist in db. They must // the remaining items reference rows that don't exist in db. They must
// be deleted from index. // be deleted from index.
docIds.forEach(r -> bulkIndexer.addDeletion(RuleIndexDefinition.INDEX_TYPE_RULE_EXTENSION, r.getId())); docIds.forEach(docId -> {
RuleKey ruleKey = RuleKey.of(docId.getRepositoryName(), docId.getRuleKey());
bulkIndexer.addDeletion(RuleIndexDefinition.INDEX_TYPE_RULE_EXTENSION, docId.getId(), ruleKey.toString());
});


return bulkIndexer.stop(); return bulkIndexer.stop();
} }
Expand Down Expand Up @@ -239,4 +236,14 @@ private static RuleExtensionId explodeRuleExtensionDocId(EsQueueDto esQueueDto)
checkArgument(esQueueDto.getDocType() == EsQueueDto.Type.RULE_EXTENSION); checkArgument(esQueueDto.getDocType() == EsQueueDto.Type.RULE_EXTENSION);
return new RuleExtensionId(esQueueDto.getDocId()); return new RuleExtensionId(esQueueDto.getDocId());
} }

private static EsQueueDto createQueueDtoForRule(RuleKey ruleKey) {
return EsQueueDto.create(EsQueueDto.Type.RULE, ruleKey.toString(), null, ruleKey.toString());
}

private static EsQueueDto createQueueDtoForRuleExtension(RuleKey ruleKey, OrganizationDto organization) {
String docId = RuleExtensionDoc.idOf(ruleKey, RuleExtensionScope.organization(organization));
return EsQueueDto.create(EsQueueDto.Type.RULE_EXTENSION, docId, null, ruleKey.toString());
}

} }
Expand Up @@ -38,14 +38,14 @@
import org.sonar.server.es.EsClient; import org.sonar.server.es.EsClient;
import org.sonar.server.es.IndexType; import org.sonar.server.es.IndexType;
import org.sonar.server.es.IndexingListener; import org.sonar.server.es.IndexingListener;
import org.sonar.server.es.IndexingResult;
import org.sonar.server.es.ResiliencyIndexingListener; import org.sonar.server.es.ResiliencyIndexingListener;
import org.sonar.server.es.ResilientIndexer; import org.sonar.server.es.ResilientIndexer;
import org.sonar.server.es.IndexingResult;
import org.sonar.server.es.StartupIndexer; import org.sonar.server.es.StartupIndexer;


import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.sonar.core.util.stream.MoreCollectors.toHashSet; import static org.sonar.core.util.stream.MoreCollectors.toArrayList;
import static org.sonar.server.user.index.UserIndexDefinition.INDEX_TYPE_USER; import static org.sonar.server.user.index.UserIndexDefinition.INDEX_TYPE_USER;


public class UserIndexer implements StartupIndexer, ResilientIndexer { public class UserIndexer implements StartupIndexer, ResilientIndexer {
Expand Down Expand Up @@ -113,14 +113,14 @@ public IndexingResult index(DbSession dbSession, Collection<EsQueueDto> items) {
if (items.isEmpty()) { if (items.isEmpty()) {
return new IndexingResult(); return new IndexingResult();
} }
Set<String> logins = items List<String> logins = items
.stream() .stream()
.filter(i -> { .filter(i -> {
requireNonNull(i.getDocId(), () -> "BUG - " + i + " has not been persisted before indexing"); requireNonNull(i.getDocId(), () -> "BUG - " + i + " has not been persisted before indexing");
return i.getDocType() == EsQueueDto.Type.USER; return i.getDocType() == EsQueueDto.Type.USER;
}) })
.map(EsQueueDto::getDocId) .map(EsQueueDto::getDocId)
.collect(toHashSet(items.size())); .collect(toArrayList(items.size()));


ListMultimap<String, String> organizationUuidsByLogin = ArrayListMultimap.create(); ListMultimap<String, String> organizationUuidsByLogin = ArrayListMultimap.create();
dbClient.organizationMemberDao().selectForUserIndexing(dbSession, logins, organizationUuidsByLogin::put); dbClient.organizationMemberDao().selectForUserIndexing(dbSession, logins, organizationUuidsByLogin::put);
Expand Down

0 comments on commit 75e926d

Please sign in to comment.