Skip to content

Commit

Permalink
feat: log upload/remove events (#553)
Browse files Browse the repository at this point in the history
* feat: log upload/remove events

* correct optional
  • Loading branch information
olenagerasimova committed Aug 3, 2023
1 parent 560d6d6 commit 3525edd
Show file tree
Hide file tree
Showing 14 changed files with 514 additions and 105 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ SOFTWARE.
<dependency>
<groupId>com.artipie</groupId>
<artifactId>http</artifactId>
<version>v1.2.3</version>
<version>v1.2.18</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
Expand Down
23 changes: 21 additions & 2 deletions src/main/java/com/artipie/rpm/asto/AstoMetadataRemove.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.artipie.asto.misc.UncheckedIOScalar;
import com.artipie.asto.streams.StorageValuePipeline;
import com.artipie.rpm.RepoConfig;
import com.artipie.rpm.meta.PackageInfo;
import com.artipie.rpm.meta.XmlAlter;
import com.artipie.rpm.meta.XmlMaid;
import com.artipie.rpm.meta.XmlPackage;
Expand All @@ -20,6 +21,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand All @@ -43,14 +45,31 @@ public final class AstoMetadataRemove {
*/
private final RepoConfig cnfg;

/**
* Collection with removed packages info if required.
*/
private final Optional<Collection<PackageInfo>> infos;

/**
* Ctor.
* @param asto Abstract storage
* @param cnfg Repos config
* @param infos Collection with removed packages info if required
*/
public AstoMetadataRemove(final Storage asto, final RepoConfig cnfg) {
public AstoMetadataRemove(final Storage asto, final RepoConfig cnfg,
final Optional<Collection<PackageInfo>> infos) {
this.asto = asto;
this.cnfg = cnfg;
this.infos = infos;
}

/**
* Ctor.
* @param asto Abstract storage
* @param cnfg Repos config
*/
public AstoMetadataRemove(final Storage asto, final RepoConfig cnfg) {
this(asto, cnfg, Optional.empty());
}

/**
Expand Down Expand Up @@ -119,7 +138,7 @@ private CompletionStage<Long> removePackages(
final InputStream input = opt.map(new UncheckedIOFunc<>(GZIPInputStream::new))
.get();
if (pckg == XmlPackage.PRIMARY) {
maid = new XmlPrimaryMaid.Stream(input, out);
maid = new XmlPrimaryMaid.Stream(input, out, this.infos);
} else {
maid = new XmlMaid.ByPkgidAttr.Stream(input, out);
}
Expand Down
82 changes: 54 additions & 28 deletions src/main/java/com/artipie/rpm/asto/AstoRepoAdd.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.artipie.asto.rx.RxStorageWrapper;
import com.artipie.rpm.RepoConfig;
import com.artipie.rpm.http.RpmUpload;
import com.artipie.rpm.meta.PackageInfo;
import com.artipie.rpm.pkg.HeaderTags;
import com.artipie.rpm.pkg.Package;
import com.jcabi.log.Logger;
import hu.akarnokd.rxjava2.interop.SingleInterop;
Expand All @@ -19,6 +21,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

/**
* Add packages to metadata and repository.
Expand Down Expand Up @@ -60,38 +63,60 @@ public AstoRepoAdd(final Storage asto, final RepoConfig cnfg) {
public CompletionStage<Void> perform() {
return this.read().thenCompose(
list -> new AstoMetadataAdd(this.asto, this.cnfg).perform(list)
).thenCompose(
temp -> new AstoCreateRepomd(this.asto, this.cnfg).perform(temp).thenCompose(
nothing -> new AstoMetadataNames(this.asto, this.cnfg).prepareNames(temp)
.thenCompose(
keys -> {
final StorageLock lock = new StorageLock(this.asto, AstoRepoAdd.META);
return lock.acquire()
.thenCompose(ignored -> this.remove(AstoRepoAdd.META))
).thenCompose(this::generateRepomdAndMoveXmls);
}

/**
* Performs whole workflow to add items, listed in {@link com.artipie.rpm.http.RpmUpload#TO_ADD}
* location, to the repository and metadata files. Returns list with info about added
* packages.
* @return Completable action with added packages info list
*/
public CompletionStage<List<PackageInfo>> performWithResult() {
return this.read().thenCompose(
list -> new AstoMetadataAdd(this.asto, this.cnfg).perform(list)
.thenCompose(this::generateRepomdAndMoveXmls)
.thenApply(
nothing -> list.stream()
.map(info -> new PackageInfo(new HeaderTags(info), info.size()))
.collect(Collectors.toList())
)
);
}

/**
* Creates repomd metadata file and moves all other metadata xmls to repository
* with storage lock.
* @param temp Temp location o metadata files
* @return Completable action
*/
private CompletionStage<Void> generateRepomdAndMoveXmls(final Key temp) {
return new AstoCreateRepomd(this.asto, this.cnfg).perform(temp).thenCompose(
nothing -> new AstoMetadataNames(this.asto, this.cnfg).prepareNames(temp).thenCompose(
keys -> {
final StorageLock lock = new StorageLock(this.asto, AstoRepoAdd.META);
return lock.acquire().thenCompose(ignored -> this.remove(AstoRepoAdd.META))
.thenCompose(
ignored -> CompletableFuture.allOf(
keys.entrySet().stream().map(
entry -> this.asto.move(entry.getKey(), entry.getValue())
).toArray(CompletableFuture[]::new)
)
).thenCompose(
ignored -> this.asto.list(RpmUpload.TO_ADD)
.thenCompose(
ignored -> CompletableFuture.allOf(
keys.entrySet().stream().map(
entry -> this.asto
.move(entry.getKey(), entry.getValue())
list -> CompletableFuture.allOf(
list.stream().map(
key -> this.asto.move(
key, AstoRepoAdd.removeTempPart(key)
)
).toArray(CompletableFuture[]::new)
)
)
.thenCompose(
ignored -> this.asto.list(RpmUpload.TO_ADD).thenCompose(
list -> CompletableFuture.allOf(
list.stream().map(
key -> this.asto.move(
key, AstoRepoAdd.removeTempPart(key)
)
).toArray(CompletableFuture[]::new)
)
)
)
.thenCompose(ignored -> lock.release()).thenCompose(
ignored -> this.remove(temp)
);
}
)
)
.thenCompose(ignored -> lock.release())
.thenCompose(ignored -> this.remove(temp));
}
)
);
}
Expand Down Expand Up @@ -144,4 +169,5 @@ private CompletableFuture<Void> remove(final Key key) {
private static Key removeTempPart(final Key key) {
return new KeyExcludeFirst(key, RpmUpload.TO_ADD.string());
}

}
79 changes: 55 additions & 24 deletions src/main/java/com/artipie/rpm/asto/AstoRepoRemove.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import com.artipie.asto.rx.RxStorageWrapper;
import com.artipie.rpm.RepoConfig;
import com.artipie.rpm.http.RpmRemove;
import com.artipie.rpm.meta.PackageInfo;
import hu.akarnokd.rxjava2.interop.SingleInterop;
import io.reactivex.Observable;
import io.reactivex.Single;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

Expand All @@ -42,14 +44,42 @@ public final class AstoRepoRemove {
*/
private final RepoConfig cnfg;

/**
* Collection with removed packages info if required.
*/
private final Optional<Collection<PackageInfo>> infos;

/**
* Ctor.
* @param asto Abstract storage
* @param cnfg Repository config
* @param infos Collection with removed packages info if required
*/
public AstoRepoRemove(final Storage asto, final RepoConfig cnfg) {
public AstoRepoRemove(final Storage asto, final RepoConfig cnfg,
final Optional<Collection<PackageInfo>> infos) {
this.asto = asto;
this.cnfg = cnfg;
this.infos = infos;
}

/**
* Ctor.
* @param asto Abstract storage
* @param cnfg Repository config
* @param infos Collection with removed packages info
*/
public AstoRepoRemove(final Storage asto, final RepoConfig cnfg,
final Collection<PackageInfo> infos) {
this(asto, cnfg, Optional.of(infos));
}

/**
* Ctor.
* @param asto Abstract storage
* @param cnfg Repository config
*/
public AstoRepoRemove(final Storage asto, final RepoConfig cnfg) {
this(asto, cnfg, Optional.empty());
}

/**
Expand All @@ -60,29 +90,30 @@ public AstoRepoRemove(final Storage asto, final RepoConfig cnfg) {
* @return Completable action
*/
public CompletionStage<Void> perform(final Collection<String> checksums) {
return new AstoMetadataRemove(this.asto, this.cnfg).perform(checksums).thenCompose(
temp -> new AstoCreateRepomd(this.asto, this.cnfg).perform(temp).thenCompose(
nothing -> new AstoMetadataNames(this.asto, this.cnfg).prepareNames(temp)
.thenCompose(
keys -> {
final StorageLock lock =
new StorageLock(this.asto, AstoRepoRemove.META);
return lock.acquire()
.thenCompose(ignored -> this.remove(AstoRepoRemove.META))
.thenCompose(
ignored -> CompletableFuture.allOf(
keys.entrySet().stream().map(
entry ->
this.asto.move(entry.getKey(), entry.getValue())
).toArray(CompletableFuture[]::new)
)
).thenCompose(ignored -> lock.release()).thenCompose(
ignored -> this.remove(temp)
);
}
)
)
);
return new AstoMetadataRemove(this.asto, this.cnfg, this.infos).perform(checksums)
.thenCompose(
temp -> new AstoCreateRepomd(this.asto, this.cnfg).perform(temp).thenCompose(
nothing -> new AstoMetadataNames(this.asto, this.cnfg).prepareNames(temp)
.thenCompose(
keys -> {
final StorageLock lock =
new StorageLock(this.asto, AstoRepoRemove.META);
return lock.acquire()
.thenCompose(ignored -> this.remove(AstoRepoRemove.META))
.thenCompose(
ignored -> CompletableFuture.allOf(
keys.entrySet().stream().map(
entry ->
this.asto.move(entry.getKey(), entry.getValue())
).toArray(CompletableFuture[]::new)
)
).thenCompose(ignored -> lock.release()).thenCompose(
ignored -> this.remove(temp)
);
}
)
)
);
}

/**
Expand Down
37 changes: 34 additions & 3 deletions src/main/java/com/artipie/rpm/http/RpmRemove.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
import com.artipie.http.rs.RsWithStatus;
import com.artipie.rpm.RepoConfig;
import com.artipie.rpm.asto.AstoRepoRemove;
import com.artipie.rpm.meta.PackageInfo;
import com.artipie.scheduling.ArtifactEvent;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -56,14 +61,22 @@ public final class RpmRemove implements Slice {
*/
private final RepoConfig cnfg;

/**
* Artifact upload/remove events.
*/
private final Optional<Queue<ArtifactEvent>> events;

/**
* Ctor.
* @param asto Asto storage
* @param cnfg Repo config
* @param events Artifact events
*/
public RpmRemove(final Storage asto, final RepoConfig cnfg) {
public RpmRemove(final Storage asto, final RepoConfig cnfg,
final Optional<Queue<ArtifactEvent>> events) {
this.asto = asto;
this.cnfg = cnfg;
this.events = events;
}

@Override
Expand All @@ -81,8 +94,26 @@ public Response response(final String line, final Iterable<Map.Entry<String, Str
.completedFuture(RsStatus.ACCEPTED);
if (valid && this.cnfg.mode() == RepoConfig.UpdateMode.UPLOAD
&& !request.skipUpdate()) {
res = new AstoRepoRemove(this.asto, this.cnfg).perform()
.thenApply(ignored -> RsStatus.ACCEPTED);
res = this.events.map(
queue -> {
final Collection<PackageInfo> infos =
new ArrayList<>(1);
return new AstoRepoRemove(this.asto, this.cnfg, infos)
.perform().thenAccept(
nothing -> infos.forEach(
item -> queue.add(
new ArtifactEvent(
RpmUpload.REPO_TYPE,
this.cnfg.name(), item.name(),
item.version()
)
)
)
);
}
).orElseGet(
() -> new AstoRepoRemove(this.asto, this.cnfg).perform()
).thenApply(ignored -> RsStatus.ACCEPTED);
} else if (!valid) {
res = this.asto.delete(temp)
.thenApply(nothing -> RsStatus.BAD_REQUEST);
Expand Down
Loading

0 comments on commit 3525edd

Please sign in to comment.