Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue2878 #2938

Merged
merged 29 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8f915b1
#1390-document-projection-db-support: first draft
May 6, 2024
8192886
#1390-document-projection-db-support: issue1752: wip
May 21, 2024
4302085
#2878: empty commit
uweschaefer Jun 3, 2024
426f2d0
#2878: renamed/moved few classes
uweschaefer Jun 3, 2024
fab4b83
#2878: wip
uweschaefer Jun 4, 2024
34ca9be
#2878: wip
uweschaefer Jun 4, 2024
d9b858d
#2878: merged 0.8
uweschaefer Jun 4, 2024
a3d5ef5
#1390-document-projection-db-support: issue1390: working on text
Jun 4, 2024
cf38b63
#2878: wip
uweschaefer Jun 4, 2024
64ffeec
#2878: bring back the tests
uweschaefer Jun 4, 2024
13bf835
#1390-document-projection-db-support: issue1390: remove empty example
Jun 4, 2024
be5ff35
Apply suggestions from code review
bedaka Jun 4, 2024
de2cd3f
#2878: sonardance
uweschaefer Jun 4, 2024
7e2a8f4
#2878: sonardance
uweschaefer Jun 4, 2024
09b11cf
#1390-document-projection-db-support: issue1390: clarify locking
Jun 4, 2024
c39d0e4
#1390-document-projection-db-support: issue1390: merge changes
Jun 4, 2024
0e31aea
#1390-document-projection-db-support: issue1390: add some more details
Jun 4, 2024
04e909a
#2878: sonardance
uweschaefer Jun 4, 2024
0951eba
#2878: minor cleanup
uweschaefer Jun 13, 2024
7d2acdb
#2878: provide interfaces compatible to 0.7
uweschaefer Jun 13, 2024
f84ea56
#2878: minor cleanup
uweschaefer Jun 13, 2024
cef7175
#2878: interface cleanup
uweschaefer Jun 13, 2024
144912e
#2878: fixed setting FSP on non-transactional projections
uweschaefer Jun 13, 2024
8cd77c3
#2878: fixed setting FSP on non-transactional projections
uweschaefer Jun 13, 2024
8d31e6e
#2878: damn you sonar
uweschaefer Jun 13, 2024
8422331
Update extend-projection-db-support.md
uweschaefer Jun 13, 2024
e0e43f0
Apply formatter
uweschaefer Jun 13, 2024
28f6e73
Update factcast-site/documentation-docsy/content/en/Usage/Hitchhikers…
uweschaefer Jun 13, 2024
4e52f7c
Merge pull request #2945 from factcast/issue1390-document-projection-…
uweschaefer Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file removed build.trigger
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
package org.factcast.factus.redis;

import lombok.NonNull;
import org.factcast.factus.projection.ManagedProjection;
import org.redisson.api.RedissonClient;

public abstract class AbstractRedisManagedProjection extends AbstractRedisProjection
implements ManagedProjection {
public AbstractRedisManagedProjection(@NonNull RedissonClient redisson) {
implements RedisManagedProjection {
protected AbstractRedisManagedProjection(@NonNull RedissonClient redisson) {
super(redisson);

Check warning on line 24 in factcast-factus-redis/src/main/java/org/factcast/factus/redis/AbstractRedisManagedProjection.java

View check run for this annotation

Codecov / codecov/patch

factcast-factus-redis/src/main/java/org/factcast/factus/redis/AbstractRedisManagedProjection.java#L23-L24

Added lines #L23 - L24 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@
import org.factcast.factus.projection.Named;
import org.factcast.factus.projection.WriterToken;
import org.factcast.factus.projection.WriterTokenAware;
import org.factcast.factus.projection.tx.AbstractOpenTransactionAwareProjection;
import org.factcast.factus.redis.tx.RedisTransactional;
import org.redisson.api.*;

abstract class AbstractRedisProjection extends AbstractOpenTransactionAwareProjection<RTransaction>
@SuppressWarnings("java:S2142")
public abstract class AbstractRedisProjection
implements RedisProjection, FactStreamPositionAware, WriterTokenAware, Named {
@Getter protected final RedissonClient redisson;

private final RLock lock;
private final String stateBucketName;
protected final RLock lock;
protected final String stateBucketName;

@Getter private final String redisKey;
@Getter protected final String redisKey;

protected AbstractRedisProjection(@NonNull RedissonClient redisson) {
this.redisson = redisson;
Expand All @@ -50,37 +49,23 @@ protected AbstractRedisProjection(@NonNull RedissonClient redisson) {
}

@VisibleForTesting
RBucket<FactStreamPosition> stateBucket(@NonNull RTransaction tx) {
return tx.getBucket(stateBucketName, FactStreamPositionCodec.INSTANCE);
}

@VisibleForTesting
RBucket<FactStreamPosition> stateBucket() {
protected RBucket<FactStreamPosition> stateBucket() {
return redisson.getBucket(stateBucketName, FactStreamPositionCodec.INSTANCE);
}

@Override
public FactStreamPosition factStreamPosition() {
if (inTransaction()) {
return stateBucket(runningTransaction()).get();
} else {
return stateBucket().get();
}
return stateBucket().get();
}

@SuppressWarnings("ConstantConditions")
@Override
public void factStreamPosition(@Nullable FactStreamPosition position) {
if (inTransaction()) {
stateBucket(runningTransaction()).set(position);
} else {
stateBucket().set(position);
}
stateBucket().set(position);
}

@Override
public WriterToken acquireWriteToken(@NonNull Duration maxWait) {
assertNoRunningTransaction();
try {
if (lock.tryLock(maxWait.toMillis(), TimeUnit.MILLISECONDS))
return new RedisWriterToken(redisson, lock);
Expand All @@ -89,33 +74,4 @@ public WriterToken acquireWriteToken(@NonNull Duration maxWait) {
}
return null;
}

@Override
protected @NonNull RTransaction beginNewTransaction() {
return redisson().createTransaction(transactionOptions());
}

@Override
protected void commit(@NonNull RTransaction runningTransaction) {
runningTransaction.commit();
}

@Override
protected void rollback(@NonNull RTransaction runningTransaction) {
runningTransaction.rollback();
}

protected final @NonNull TransactionOptions transactionOptions() {
RedisTransactional tx = this.getClass().getAnnotation(RedisTransactional.class);
if (tx != null) return RedisTransactional.Defaults.with(tx);
else return TransactionOptions.defaults();
}

@Override
public final int maxBatchSizePerTransaction() {
RedisTransactional tx = this.getClass().getAnnotation(RedisTransactional.class);
if (tx != null) {
return tx.bulkSize();
} else return super.maxBatchSizePerTransaction();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public abstract class AbstractRedisSubscribedProjection extends AbstractRedisProjection
implements SubscribedProjection {
public AbstractRedisSubscribedProjection(@NonNull RedissonClient redisson) {
protected AbstractRedisSubscribedProjection(@NonNull RedissonClient redisson) {
super(redisson);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright © 2017-2022 factcast.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.factcast.factus.redis;

import org.factcast.factus.projection.ManagedProjection;

public interface RedisManagedProjection extends ManagedProjection, RedisProjection {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright © 2017-2022 factcast.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.factcast.factus.redis.tx;

import lombok.NonNull;
import org.factcast.factus.projection.ManagedProjection;
import org.redisson.api.RedissonClient;

public abstract class AbstractRedisTxManagedProjection extends AbstractRedisTxProjection
implements ManagedProjection {
protected AbstractRedisTxManagedProjection(@NonNull RedissonClient redisson) {
super(redisson);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright © 2017-2022 factcast.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.factcast.factus.redis.tx;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import javax.annotation.Nullable;
import lombok.NonNull;
import lombok.experimental.Delegate;
import org.factcast.core.FactStreamPosition;
import org.factcast.factus.projection.WriterToken;
import org.factcast.factus.projection.tx.OpenTransactionAware;
import org.factcast.factus.projection.tx.TransactionBehavior;
import org.factcast.factus.redis.AbstractRedisProjection;
import org.factcast.factus.redis.FactStreamPositionCodec;
import org.redisson.api.*;

abstract class AbstractRedisTxProjection extends AbstractRedisProjection
implements OpenTransactionAware<RTransaction> {

@Delegate private final TransactionBehavior<RTransaction> tx;

protected AbstractRedisTxProjection(@NonNull RedissonClient redisson) {
super(redisson);
this.tx =
new TransactionBehavior<>(
new RedisTxAdapter(redisson, getClass().getAnnotation(RedisTransactional.class)));
}

@VisibleForTesting
protected RBucket<FactStreamPosition> stateBucket(@NonNull RTransaction tx) {
return tx.getBucket(stateBucketName, FactStreamPositionCodec.INSTANCE);
}

@Override
public FactStreamPosition factStreamPosition() {
if (inTransaction()) {
return stateBucket(runningTransaction()).get();
} else {
return super.factStreamPosition();
}
}

@SuppressWarnings("ConstantConditions")
@Override
public void factStreamPosition(@Nullable FactStreamPosition position) {
if (inTransaction()) {
stateBucket(runningTransaction()).set(position);
} else {
super.factStreamPosition(position);
}
}

@Override
public WriterToken acquireWriteToken(@NonNull Duration maxWait) {
assertNoRunningTransaction();
return super.acquireWriteToken(maxWait);
}

@Override
public AutoCloseable acquireWriteToken() {
assertNoRunningTransaction();
return super.acquireWriteToken();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright © 2017-2022 factcast.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.factcast.factus.redis.tx;

import lombok.NonNull;
import org.factcast.factus.projection.SubscribedProjection;
import org.redisson.api.RedissonClient;

public abstract class AbstractRedisTxSubscribedProjection extends AbstractRedisTxProjection
implements SubscribedProjection {
protected AbstractRedisTxSubscribedProjection(@NonNull RedissonClient redisson) {
super(redisson);
}

Check warning on line 26 in factcast-factus-redis/src/main/java/org/factcast/factus/redis/tx/AbstractRedisTxSubscribedProjection.java

View check run for this annotation

Codecov / codecov/patch

factcast-factus-redis/src/main/java/org/factcast/factus/redis/tx/AbstractRedisTxSubscribedProjection.java#L24-L26

Added lines #L24 - L26 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import lombok.experimental.UtilityClass;
import org.redisson.api.TransactionOptions;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Inherited
public @interface RedisTransactional {
int bulkSize() default 1000;

int DEFAULT_BULK_SIZE = 1000;

int bulkSize() default DEFAULT_BULK_SIZE;

long timeout() default Defaults.timeout;

Expand All @@ -34,6 +38,7 @@

long retryInterval() default Defaults.retryInterval;

@UtilityClass
class Defaults {
static final long timeout = 30000;
static final long responseTimeout = 5001;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright © 2017-2024 factcast.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.factcast.factus.redis.tx;

import com.google.common.annotations.VisibleForTesting;
import javax.annotation.Nullable;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.factcast.factus.projection.tx.TransactionAdapter;
import org.redisson.api.RTransaction;
import org.redisson.api.RedissonClient;
import org.redisson.api.TransactionOptions;

@RequiredArgsConstructor
public class RedisTxAdapter implements TransactionAdapter<RTransaction> {
@NonNull private final RedissonClient client;
@Nullable private final RedisTransactional annotation;

@Override
@NonNull
public RTransaction beginNewTransaction() {
return client.createTransaction(transactionOptions());
}

@VisibleForTesting
@NonNull
public TransactionOptions transactionOptions() {
if (annotation != null) return RedisTransactional.Defaults.with(annotation);
else return TransactionOptions.defaults();
}

@Override
public void commit(@NonNull RTransaction runningTransaction) {
runningTransaction.commit();
}

@Override
public void rollback(@NonNull RTransaction runningTransaction) {
runningTransaction.rollback();
}

@Override
public int maxBatchSizePerTransaction() {
if (annotation != null) {
return annotation.bulkSize();
} else return RedisTransactional.DEFAULT_BULK_SIZE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.factcast.factus.redis;
package org.factcast.factus.redis.tx;

import lombok.NonNull;
import org.factcast.factus.serializer.ProjectionMetaData;
import org.redisson.api.RedissonClient;

@ProjectionMetaData(revision = 1)
public class ARedisTransactionalManagedProjection extends AbstractRedisManagedProjection {
public ARedisTransactionalManagedProjection(@NonNull RedissonClient redisson) {
public class ARedisTxManagedProjection extends AbstractRedisTxManagedProjection {
public ARedisTxManagedProjection(@NonNull RedissonClient redisson) {
super(redisson);
}
}
Loading