Skip to content
Permalink
Browse files
Sync gap fix for case there is a lot of non-modified builds (#139)
  • Loading branch information
dspavlov committed Jul 30, 2019
1 parent 7b242ed commit 397be3270491c3928d40d157491b71e436c7f653
Showing 8 changed files with 127 additions and 52 deletions.
@@ -172,6 +172,8 @@ protected String checkQueue(String srvId, List<ITrackedChain> chains) {

String selfLogin = creds.getUser(srvId);

tcIgn.actualizeRecentBuildRefs();

StringBuilder res = new StringBuilder();

for (ITrackedChain chain : chains) {
@@ -80,7 +80,7 @@ public static long tcSimpleDateToTimestamp(String date) {
return new SimpleDateFormat("yyyyMMdd'T'HHmmssZ").parse(date).getTime();
}
catch (ParseException e) {
System.err.println("Exception happened when TimeUtilo tried to convert date into timestamp [" +
System.err.println("Exception happened when TimeUtil tried to convert date into timestamp [" +
"date=" + date + ", err=" + e.getMessage() + ']');
}

@@ -124,7 +124,7 @@ public void init() {
Iterator<Cache.Entry<String, org.apache.ignite.ci.teamcity.ignited.IgniteStringCompactor.CompactorEntity>> iter = qryCursor.iterator();

if (!iter.hasNext()) {
System.err.println("Not found string by id " + id);
System.err.println("Error: String Not found string by id " + id);

return null;
}
@@ -17,32 +17,28 @@
package org.apache.ignite.tcignited;

import com.google.common.base.Strings;

import java.io.File;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import org.apache.ignite.ci.teamcity.ignited.BuildRefCompacted;
import org.apache.ignite.tcbot.common.conf.ITcServerConfig;
import org.apache.ignite.tcignited.history.IRunHistory;
import org.apache.ignite.tcignited.history.IRunStat;
import org.apache.ignite.tcignited.history.ISuiteRunHistory;
import org.apache.ignite.tcservice.model.agent.Agent;
import org.apache.ignite.tcservice.model.mute.MuteInfo;
import org.apache.ignite.tcservice.model.result.Build;
import org.apache.ignite.ci.teamcity.ignited.buildcondition.BuildCondition;
import org.apache.ignite.ci.teamcity.ignited.buildtype.BuildTypeCompacted;
import org.apache.ignite.ci.teamcity.ignited.buildtype.BuildTypeRefCompacted;
import org.apache.ignite.ci.teamcity.ignited.change.ChangeCompacted;
import org.apache.ignite.ci.teamcity.ignited.change.RevisionCompacted;
import org.apache.ignite.ci.teamcity.ignited.fatbuild.FatBuildCompacted;
import org.apache.ignite.tcbot.common.conf.ITcServerConfig;
import org.apache.ignite.tcignited.history.IRunHistory;
import org.apache.ignite.tcignited.history.ISuiteRunHistory;
import org.apache.ignite.tcservice.model.agent.Agent;
import org.apache.ignite.tcservice.model.mute.MuteInfo;
import org.apache.ignite.tcservice.model.result.Build;

/**
*
@@ -257,4 +253,10 @@ public default String getLatestCommitVersion(FatBuildCompacted build) {
}

@Nullable File downloadAndCacheBuildLog(int buildId);

/**
* Enforce reloading of recent build references for this server. At least queued/running builds from TC Bot DB
* should be re-synced.
*/
public void actualizeRecentBuildRefs();
}
@@ -33,6 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -60,10 +61,10 @@
import org.apache.ignite.tcignited.buildref.BranchEquivalence;
import org.apache.ignite.tcignited.buildref.BuildRefDao;
import org.apache.ignite.tcignited.buildref.BuildRefSync;
import org.apache.ignite.tcignited.history.BuildStartTimeStorage;
import org.apache.ignite.tcignited.history.HistoryCollector;
import org.apache.ignite.tcignited.history.IRunHistory;
import org.apache.ignite.tcignited.history.ISuiteRunHistory;
import org.apache.ignite.tcignited.history.BuildStartTimeStorage;
import org.apache.ignite.tcignited.history.SuiteInvocationHistoryDao;
import org.apache.ignite.tcignited.mute.MuteDao;
import org.apache.ignite.tcignited.mute.MuteSync;
@@ -494,8 +495,12 @@ public void ensureActualizeRequested() {
Map<String, Object> buildParms) {
Build build = conn.triggerBuild(buildTypeId, branchName, cleanRebuild, queueAtTop, buildParms);

List<BuildRef> deps = build.getSnapshotDependenciesNonNull();

Stream<Integer> allIds = Stream.concat(Stream.of(build.getId()), deps.stream().map(BuildRef::getId));

//todo may add additional parameter: load builds into DB in sync/async fashion
buildRefSync.runActualizeBuildRefs(srvCode, false, Sets.newHashSet(build.getId()), conn);
buildRefSync.runActualizeBuildRefs(srvCode, BuildRefSync.SyncMode.ULTRAFAST, allIds.collect(Collectors.toSet()), conn);

return build;
}
@@ -604,16 +609,16 @@ protected FatBuildCompacted getFatBuildFromIgnite(int buildId) {
return changes.values();
}

String actualizeRecentBuildRefs() {
return actualizeRecentBuildRefs(srvCode);
public void actualizeRecentBuildRefs() {
actualizeRecentBuildRefs(srvCode);
}

/**
*
* @param srvNme TC service name
*/
@SuppressWarnings("WeakerAccess")
@MonitoredTask(name = "Prepare Actualize BuildRefs(srv, full resync)", nameExtArgsIndexes = {0})
@MonitoredTask(name = "Prepare Actualize BuildRefs(srv, incremental sync)", nameExtArgsIndexes = {0})
protected String actualizeRecentBuildRefs(String srvNme) {
List<BuildRefCompacted> running = buildRefDao.getQueuedAndRunning(srvIdMaskHigh);

@@ -636,10 +641,10 @@ protected String actualizeRecentBuildRefs(String srvNme) {
//schedule direct reload for Fat Builds for all queued too-old builds
fatBuildSync.scheduleBuildsLoad(conn, directUpload);

buildRefSync.runActualizeBuildRefs(srvCode, false, paginateUntil, conn);
buildRefSync.runActualizeBuildRefs(srvCode, BuildRefSync.SyncMode.INCREMENTAL, paginateUntil, conn);

int freshButNotFoundByBuildsRefsScan = paginateUntil.size();
if (!paginateUntil.isEmpty()) {
if (freshButNotFoundByBuildsRefsScan > 0) {
//some builds may stuck in the queued or running, enforce loading now
fatBuildSync.doLoadBuilds(-1, srvCode, conn, paginateUntil);
}
@@ -663,6 +668,6 @@ private void sheduleResyncBuildRefs() {
*
*/
void fullReindex() {
buildRefSync.runActualizeBuildRefs(srvCode, true, null, conn);
buildRefSync.runActualizeBuildRefs(srvCode, BuildRefSync.SyncMode.FULL_REINDEX, null, conn);
}
}
@@ -315,9 +315,6 @@ public FatBuildCompacted loadBuild(ITeamcityConn conn, int buildId,

buildRefDao.save(srvIdMask, refCompacted);

//todo remove unused code
// runHistSync.saveToHistoryLater(srvCode, savedVer);

return savedVer;
}

@@ -240,9 +240,8 @@ public List<BuildRefCompacted> getQueuedAndRunning(int srvId) {
if (stateRunningId != null)
list.add(stateRunningId);


return compactedBuildsForServer(srvId)
.filter(e -> list.contains(e.state()) )
.filter(e -> list.contains(e.state()))
.collect(Collectors.toList());
}

@@ -272,6 +271,7 @@ public List<BuildRefCompacted> getBuildsForBranch(int srvId, int branchNameId) {
}

if (!list.isEmpty()) {

System.err.println(" Branch " + compactor.getStringFromId(branchNameId)
+ " builds " + list.size() + " (Overall) ");
}
@@ -16,21 +16,22 @@
*/
package org.apache.ignite.tcignited.buildref;

import org.apache.ignite.tcbot.common.interceptor.AutoProfiling;
import org.apache.ignite.tcbot.common.interceptor.MonitoredTask;
import org.apache.ignite.tcservice.model.hist.BuildRef;
import org.apache.ignite.tcignited.ITeamcityIgnited;
import org.apache.ignite.tcignited.build.ProactiveFatBuildSync;
import org.apache.ignite.tcservice.ITeamcityConn;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import org.apache.ignite.tcbot.common.interceptor.AutoProfiling;
import org.apache.ignite.tcbot.common.interceptor.MonitoredTask;
import org.apache.ignite.tcbot.common.util.TimeUtil;
import org.apache.ignite.tcignited.ITeamcityIgnited;
import org.apache.ignite.tcignited.build.ProactiveFatBuildSync;
import org.apache.ignite.tcservice.ITeamcityConn;
import org.apache.ignite.tcservice.model.hist.BuildRef;

/**
* This class checks all builds ocurred on a TC server.
@@ -42,31 +43,44 @@ public class BuildRefSync {
* Max builds to check during incremental sync. If this value is reached (50 pages) and some stuck builds still not
* found, then iteration stops
*/
public static final int MAX_INCREMENTAL_BUILDS_TO_CHECK = 5000;
public static final int MAX_INCREMENTAL_BUILDS_TO_CHECK = 7000;

/** Incremental builds WO modification to be found to stop iterating. */
public static final int INCREMENTAL_BUILDS_WO_MODIFICATION_TO_STOP = 1000;

/** Build reference DAO. */
@Inject private BuildRefDao buildRefDao;

/** Build Sync. */
@Inject private ProactiveFatBuildSync fatBuildSync;

public enum SyncMode {
ULTRAFAST,
FULL_REINDEX,
INCREMENTAL
}

/**
* List all builds (first pages or all available).
*
* @param srvId Server id.
* @param fullReindex Reindex all builds from TC history.
* @param syncMode Reindex all builds from TC history; reindex latets, only find mandatory
* @param mandatoryToReload [in/out] Build ID should be found before end of sync. Ignored if fullReindex mode.
* @param conn Teamcity to check builds
*/
@SuppressWarnings({"WeakerAccess", "UnusedReturnValue"})
@MonitoredTask(name = "Actualize BuildRefs(srv, full resync)", nameExtArgsIndexes = {0, 1})
@MonitoredTask(name = "Actualize BuildRefs(srv, syncmode)", nameExtArgsIndexes = {0, 1})
@AutoProfiling
public String runActualizeBuildRefs(String srvId, boolean fullReindex,
@Nullable Set<Integer> mandatoryToReload, ITeamcityConn conn) {
public String runActualizeBuildRefs(String srvId,
SyncMode syncMode,
@Nullable Set<Integer> mandatoryToReload,
ITeamcityConn conn) {

AtomicReference<String> outLinkNext = new AtomicReference<>();
List<BuildRef> tcDataFirstPage = conn.getBuildRefsPage(null, outLinkNext);

final int srvIdMaskHigh = ITeamcityIgnited.serverIdToInt(srvId);
long start = System.currentTimeMillis();
int srvIdMaskHigh = ITeamcityIgnited.serverIdToInt(srvId);
Set<Long> buildsUpdated = buildRefDao.saveChunk(srvIdMaskHigh, tcDataFirstPage);
int totalUpdated = buildsUpdated.size();
fatBuildSync.scheduleBuildsLoad(conn, cacheKeysToBuildIds(buildsUpdated));
@@ -79,6 +93,25 @@ public String runActualizeBuildRefs(String srvId, boolean fullReindex,
tcDataFirstPage.stream().map(BuildRef::getId).forEach(mandatoryToReload::remove);
}

if (syncMode == SyncMode.ULTRAFAST && isEmpty(mandatoryToReload)) {
return "Entries saved " +
totalUpdated +
" Builds checked " +
totalChecked +
" Needed to find " +
neededToFind +
" remained to find " +
mandatoryToReload.size();
}

long lastTimeUpdateFound = System.currentTimeMillis();
long maxMsWithoutChanges = Duration.ofHours(1).toMillis();

//reason for end for full sync
boolean timeoutForNewBuild = false;
//reason for end for incremental sync: decrementing counter of builds to find without modification to stop search.
int buildsCntrToStop = INCREMENTAL_BUILDS_WO_MODIFICATION_TO_STOP;

while (outLinkNext.get() != null) {
String nextPageUrl = outLinkNext.get();
outLinkNext.set(null);
@@ -90,26 +123,62 @@ public String runActualizeBuildRefs(String srvId, boolean fullReindex,
int savedCurChunk = curChunkBuildsSaved.size();

totalChecked += tcDataNextPage.size();

if (!fullReindex) {
if (mandatoryToReload != null && !mandatoryToReload.isEmpty())
if (savedCurChunk != 0) {
lastTimeUpdateFound = System.currentTimeMillis();

buildsCntrToStop = INCREMENTAL_BUILDS_WO_MODIFICATION_TO_STOP;
} else
buildsCntrToStop -= tcDataNextPage.size();

if (syncMode == SyncMode.ULTRAFAST && isEmpty(mandatoryToReload))
break;
else if (syncMode==SyncMode.FULL_REINDEX) {
timeoutForNewBuild = System.currentTimeMillis() > lastTimeUpdateFound + maxMsWithoutChanges;
if (timeoutForNewBuild
&& totalChecked > MAX_INCREMENTAL_BUILDS_TO_CHECK)
break;
}
else {
boolean noMandatoryBuildsLeft = isEmpty(mandatoryToReload);
if (!noMandatoryBuildsLeft)
tcDataNextPage.stream().map(BuildRef::getId).forEach(mandatoryToReload::remove);

if (savedCurChunk == 0 &&
(mandatoryToReload == null
|| mandatoryToReload.isEmpty()
|| totalChecked > MAX_INCREMENTAL_BUILDS_TO_CHECK)
) {
if (buildsCntrToStop <= 0
&& (noMandatoryBuildsLeft || totalChecked > MAX_INCREMENTAL_BUILDS_TO_CHECK)) {
// There are no modification at current page, hopefully no modifications at all
break;
}
}
}

int leftToFind = mandatoryToReload == null ? 0 : mandatoryToReload.size();
return "Entries saved " + totalUpdated + " Builds checked " + totalChecked + " Needed to find " + neededToFind + " remained to find " + leftToFind;
StringBuilder sb = new StringBuilder();
sb.append("Entries saved ");
sb.append(totalUpdated);
sb.append(" Builds checked ");
sb.append(totalChecked);
if (mandatoryToReload != null) {
sb.append(" Needed to find ");
sb.append(neededToFind);
int leftToFind = mandatoryToReload.size();
sb.append(" remained to find ");
sb.append(leftToFind);
}

sb.append(" Last time update found ");
sb.append(TimeUtil.millisToDurationPrintable(System.currentTimeMillis()- lastTimeUpdateFound));
sb.append(" ago");

if(timeoutForNewBuild) {
sb.append("TIMEOUT, total time: ");
sb.append(TimeUtil.millisToDurationPrintable(System.currentTimeMillis()- start));
}

return sb.toString();
}

public boolean isEmpty(@Nullable Set<Integer> mandatoryToReload) {
return mandatoryToReload == null || mandatoryToReload.isEmpty();
}

@Nonnull
private List<Integer> cacheKeysToBuildIds(Collection<Long> cacheKeysUpdated) {

0 comments on commit 397be32

Please sign in to comment.