Skip to content

Commit

Permalink
Added Segment logging, blocking for functions (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Jul 22, 2021
1 parent 82cb009 commit acfcd03
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,43 @@

import com.agorapulse.micronaut.segment.builder.*;
import com.segment.analytics.Analytics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class DefaultSegmentService implements SegmentService {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSegmentService.class);

private final Analytics analytics;
private final SegmentConfiguration config;
private final ExecutorService segmentNetworkExecutor;
private final boolean blocking;

public DefaultSegmentService(Analytics analytics, SegmentConfiguration config) {
public DefaultSegmentService(Analytics analytics, SegmentConfiguration config, ExecutorService segmentNetworkExecutor, boolean blocking) {
this.analytics = analytics;
this.config = config;
this.segmentNetworkExecutor = segmentNetworkExecutor;
this.blocking = blocking;
}

@Override
public void flush() {
analytics.flush();
if (blocking) {
try {
LOGGER.debug("Waiting for messages being flushed");
// give some time to enqueue the batch
Thread.sleep(10);
segmentNetworkExecutor.submit(() -> LOGGER.trace("Wating task has been executed")).get(10, TimeUnit.SECONDS);
LOGGER.debug("Messages should be flushed now!");
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new IllegalStateException("Exception while waiting for flushing to happen");
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,104 @@
*/
package com.agorapulse.micronaut.segment;

import com.segment.analytics.Analytics;
import com.segment.analytics.MessageInterceptor;
import com.segment.analytics.MessageTransformer;
import com.agorapulse.micronaut.segment.util.Slf4jSegmentLog;
import com.jakewharton.retrofit.Ok3Client;
import com.segment.analytics.*;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import okhttp3.OkHttpClient;
import retrofit.client.Client;

import javax.annotation.Nullable;
import javax.inject.Named;
import javax.inject.Singleton;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static java.lang.Thread.MIN_PRIORITY;

@Factory
public class SegmentFactory {

@Bean
private static final String THREAD_NAME = "Analytics";

@Bean(preDestroy = "shutdown")
@Singleton
@Requires(beans = SegmentConfiguration.class)
public Analytics analytics(
SegmentConfiguration configuration,
List<MessageInterceptor> messageInterceptor,
List<MessageTransformer> messageTransformers
List<MessageTransformer> messageTransformers,
List<Callback> callbacks,
@Named("segment") Client client,
@Named("segment") ThreadFactory threadFactory,
@Named("segmentNetworkExecutor") ExecutorService segmentNetworkExecutor
) {
Analytics.Builder builder = Analytics.builder(configuration.getApiKey());

messageInterceptor.forEach(builder::messageInterceptor);

messageTransformers.forEach(builder::messageTransformer);

callbacks.forEach(builder::callback);

builder.log(new Slf4jSegmentLog())
.threadFactory(threadFactory)
.networkExecutor(segmentNetworkExecutor)
.client(client);

return builder.build();
}

@Bean
@Singleton
public SegmentService segmentService(@Nullable Analytics analytics, @Nullable SegmentConfiguration configuration) {
public SegmentService segmentService(
@Nullable Analytics analytics,
@Nullable SegmentConfiguration configuration,
@Nullable @Named("segmentNetworkExecutor") ExecutorService segmentNetworkExecutor,
Environment environment
) {
if (analytics != null) {
return new DefaultSegmentService(analytics, configuration);
return new DefaultSegmentService(analytics, configuration, segmentNetworkExecutor, environment.getActiveNames().contains(Environment.FUNCTION));
}
return new NoOpSegmentService();
}

@Bean
@Singleton
@Named("segment")
@Requires(beans = SegmentConfiguration.class)
Client defaultClient() {
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(15, TimeUnit.SECONDS)
.readTimeout(15, TimeUnit.SECONDS)
.writeTimeout(15, TimeUnit.SECONDS)
.build();
return new Ok3Client(client);
}

@Bean
@Singleton
@Named("segmentNetworkExecutor")
@Requires(beans = SegmentConfiguration.class)
ExecutorService defaultNetworkExecutor(@Named("segment") ThreadFactory threadFactory) {
return Executors.newSingleThreadExecutor(threadFactory);
}

@Bean
@Singleton
@Named("segment")
@Requires(beans = SegmentConfiguration.class)
ThreadFactory defaultThreadFactory() {
return r -> new Thread(() -> {
Thread.currentThread().setPriority(MIN_PRIORITY);
r.run();
}, THREAD_NAME);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2020-2021 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.micronaut.segment.util;

import com.segment.analytics.Analytics;
import com.segment.analytics.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Slf4jSegmentLog implements Log {

private static final Logger LOGGER = LoggerFactory.getLogger(Analytics.class);

@Override
public void print(Level level, String format, Object... args) {
switch (level) {
case VERBOSE:
LOGGER.trace(format, args);
break;
case DEBUG:
LOGGER.debug(format, args);
break;
case ERROR:
LOGGER.error(format, args);
break;
}
}

@Override
public void print(Level level, Throwable error, String format, Object... args) {
switch (level) {
case VERBOSE:
LOGGER.trace(format, args);
LOGGER.trace("", error);
break;
case DEBUG:
LOGGER.debug(format, args);
LOGGER.debug("", error);
break;
case ERROR:
LOGGER.error(format, args);
LOGGER.error("", error);
break;
}
}
}

0 comments on commit acfcd03

Please sign in to comment.