Skip to content

Commit

Permalink
Merge pull request #25 from agorapulse/fix/conversion-service-regression
Browse files Browse the repository at this point in the history
using Environment instead of ConversionService
  • Loading branch information
musketyr committed Dec 19, 2023
2 parents ab33ca9 + 40784ea commit 40eba36
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.core.convert.ConversionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Optional;

@Factory
@Requires(classes = { SimpleQueueService.class }, beans = { SimpleQueueService.class, AWSCredentialsProvider.class })
Expand All @@ -48,7 +46,6 @@ public JobQueues sqsQueues(
AWSCredentialsProvider provider,
ObjectMapper mapper,
SimpleQueueService service,
Optional<ConversionService<?>> conversionService,
Environment environment
) {
try {
Expand All @@ -60,7 +57,7 @@ public JobQueues sqsQueues(
} else if (LOGGER.isInfoEnabled()) {
LOGGER.info("AWS SDK is not authenticated correctly, Using local job queues");
}
return new LocalQueues(conversionService);
return new LocalQueues(environment);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ class SqsQueuesFactorySpec extends Specification {

void 'return sqs if there is no issue'() {
expect:
factory.sqsQueues(provider, mapper, simpleQueueService, Optional.empty(), environment) instanceof SqsQueues
factory.sqsQueues(provider, mapper, simpleQueueService, environment) instanceof SqsQueues
}

void 'return local if there is issue'() {
when:
factory.sqsQueues(provider, mapper, simpleQueueService, Optional.empty(), environment) instanceof LocalQueues
factory.sqsQueues(provider, mapper, simpleQueueService, environment) instanceof LocalQueues

then:
1 * provider.credentials >> { throw new SdkClientException('login failed') }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.core.convert.ConversionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkClientException;

import javax.inject.Named;
import javax.inject.Singleton;
import java.util.Optional;

@Factory
@Requires(classes = { SimpleQueueService.class }, beans = { SimpleQueueService.class, AwsCredentialsProvider.class })
Expand All @@ -48,7 +46,6 @@ public JobQueues sqsQueues(
AwsCredentialsProvider provider,
ObjectMapper mapper,
SimpleQueueService service,
Optional<ConversionService<?>> conversionService,
Environment environment
) {
try {
Expand All @@ -60,7 +57,7 @@ public JobQueues sqsQueues(
} else if (LOGGER.isInfoEnabled()) {
LOGGER.info("AWS SDK is not authenticated correctly, Using local job queues");
}
return new LocalQueues(conversionService);
return new LocalQueues(environment);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ class SqsQueuesFactorySpec extends Specification {

void 'return sqs if there is no issue'() {
expect:
factory.sqsQueues(provider, mapper, simpleQueueService, Optional.empty(), environment) instanceof SqsQueues
factory.sqsQueues(provider, mapper, simpleQueueService, environment) instanceof SqsQueues
}

void 'return local if there is issue'() {
when:
factory.sqsQueues(provider, mapper, simpleQueueService, Optional.empty(), environment) instanceof LocalQueues
factory.sqsQueues(provider, mapper, simpleQueueService, environment) instanceof LocalQueues

then:
1 * provider.resolveCredentials() >> { throw SdkClientException.create('login failed') }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@

import com.agorapulse.worker.queue.JobQueues;
import io.micronaut.context.annotation.Secondary;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.context.env.Environment;
import io.micronaut.core.type.Argument;

import javax.inject.Named;
import javax.inject.Singleton;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -37,10 +36,10 @@
public class LocalQueues implements JobQueues {

private final ConcurrentMap<String, ConcurrentLinkedDeque<Object>> queues = new ConcurrentHashMap<>();
private final ConversionService<?> conversionService;
private final Environment environment;

public LocalQueues(Optional<ConversionService<?>> conversionService) {
this.conversionService = conversionService.orElse(ConversionService.SHARED);
public LocalQueues(Environment environment) {
this.environment = environment;
}

@Override
Expand All @@ -51,7 +50,7 @@ public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration
}

for (int i = 0; i < maxNumberOfMessages && !objects.isEmpty(); i++) {
action.accept(conversionService.convertRequired(objects.removeFirst(), argument));
action.accept(environment.convertRequired(objects.removeFirst(), argument));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import com.agorapulse.worker.configuration.MutableJobConfiguration;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.BeanContext;
import io.micronaut.context.env.Environment;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.naming.NameUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.BeanDefinition;
Expand Down Expand Up @@ -75,7 +75,7 @@ public class MethodJobProcessor implements ExecutableMethodProcessor<Job> {
private final JobManager jobManager;
private final ApplicationConfiguration applicationConfiguration;
private final JobScheduler jobScheduler;
private final ConversionService<?> conversionService;
private final Environment environment;
private final WorkerConfiguration workerConfiguration;

/**
Expand All @@ -95,7 +95,7 @@ public MethodJobProcessor(
JobManager jobManager,
ApplicationConfiguration applicationConfiguration,
JobScheduler jobScheduler,
Optional<ConversionService<?>> optionalConversionService,
Environment environment,
WorkerConfiguration workerConfiguration
) {
this.beanContext = beanContext;
Expand All @@ -104,7 +104,7 @@ public MethodJobProcessor(
this.jobManager = jobManager;
this.applicationConfiguration = applicationConfiguration;
this.jobScheduler = jobScheduler;
this.conversionService = optionalConversionService.orElse(ConversionService.SHARED);
this.environment = environment;
this.workerConfiguration = workerConfiguration;
}

Expand Down Expand Up @@ -252,7 +252,7 @@ private String extractQueueNameFromMethod(ExecutableMethod<?, ?> method) {
}

private Duration convertDuration(String jobName, String durationString, String humanReadableProperty) {
Optional<Duration> converted = conversionService.convert(durationString, Duration.class);
Optional<Duration> converted = environment.convert(durationString, Duration.class);

if (converted.isPresent()) {
return converted.get();
Expand Down

0 comments on commit 40eba36

Please sign in to comment.