Skip to content
This repository has been archived by the owner on Apr 22, 2022. It is now read-only.

Support mapping native Avro timestamps. #245

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions config/findbugs/findbugs-excludes.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,18 @@
</Match>
<Match>
<!--
~ This method uses intentional fall-through from switch labels.
~ These methods use intentional fall-through from switch labels.
-->
<Class name="io.divolte.server.ChunkyByteBuffer"/>
<Method name="getBufferUsed"/>
<Or>
<And>
<Class name="io.divolte.server.ChunkyByteBuffer"/>
<Method name="getBufferUsed"/>
</And>
<And>
<Class name="io.divolte.server.ServerTestUtils$EventDecoder"/>
<Method name="decodeSchemaAsStrings"/>
</And>
</Or>
<Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match>
<Match>
Expand Down
15 changes: 12 additions & 3 deletions src/main/java/io/divolte/server/AvroRecordBuffer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 GoDataDriven B.V.
* Copyright 2019 GoDataDriven B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,16 +28,25 @@
import javax.annotation.ParametersAreNonnullByDefault;

import com.google.common.base.MoreObjects;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;

@ParametersAreNonnullByDefault
public final class AvroRecordBuffer {
private static final int INITIAL_BUFFER_SIZE = 100;
private static final AtomicInteger BUFFER_SIZE = new AtomicInteger(INITIAL_BUFFER_SIZE);
static final GenericData AVRO_GENERIC_DATA;

static {
AVRO_GENERIC_DATA = new GenericData();
AVRO_GENERIC_DATA.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
AVRO_GENERIC_DATA.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
}

private final DivolteIdentifier partyId;
private final DivolteIdentifier sessionId;
Expand All @@ -64,7 +73,7 @@ private AvroRecordBuffer(final DivolteIdentifier partyId,
* will also allocate the larger size array from that point onward.
*/
final ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE.get());
final DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(record.getSchema());
final DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema(), AVRO_GENERIC_DATA);
final Encoder encoder = EncoderFactory.get().directBinaryEncoder(new ByteBufferOutputStream(byteBuffer), null);

writer.write(record, encoder);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 GoDataDriven B.V.
* Copyright 2019 GoDataDriven B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,17 +41,20 @@
import io.undertow.server.handlers.Cookie;
import io.undertow.util.Headers;
import net.sf.uadetector.ReadableUserAgent;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecordBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.net.*;
import java.time.Instant;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -270,12 +273,12 @@ public BooleanValueProducer duplicate() {
return new BooleanValueProducer("duplicate()", (e,c) -> Optional.ofNullable(e.exchange.getAttachment(DUPLICATE_EVENT_KEY)));
}

public ValueProducer<Long> timestamp() {
return new PrimitiveValueProducer<>("timestamp()", Long.class, (e,c) -> Optional.of(e.requestStartTime.toEpochMilli()));
public ValueProducer<Instant> timestamp() {
return new InstantValueProducer("timestamp()", (e,c) -> Optional.of(e.requestStartTime));
}

public ValueProducer<Long> clientTimestamp() {
return new PrimitiveValueProducer<>("clientTimestamp()", Long.class, (e,c) -> Optional.of(e.clientTime.toEpochMilli()));
public ValueProducer<Instant> clientTimestamp() {
return new InstantValueProducer("clientTimestamp()", (e,c) -> Optional.of(e.clientTime));
}

public ValueProducer<String> remoteHost() {
Expand Down Expand Up @@ -1183,6 +1186,38 @@ public BooleanValueProducer negate() {
}
}

public static class InstantValueProducer extends ValueProducer<Instant> {

InstantValueProducer(final String identifier,
final FieldSupplier<Instant> supplier) {
super(identifier, Instant.class, supplier, false);
}

private static boolean isTimestamp(final @Nullable LogicalType type) {
final boolean isTimestamp;
if (null == type) {
isTimestamp = false;
} else {
switch (type.getName()) {
case "timestamp-millis":
case "timestamp-micros":
isTimestamp = true;
break;
default:
isTimestamp = false;
}
}
return isTimestamp;
}

@Override
Optional<ValidationError> validateTypes(final Field target) {
return validateTrivialUnion(target.schema(),
schema -> Type.LONG == schema.getType() && isTimestamp(schema.getLogicalType()),
"Timestamps can only be mapped to timestamp logical types");
}
}

static Optional<ValidationError> validateTrivialUnion(final Schema targetSchema,
final Function<Schema, Boolean> validator,
final String messageIfInvalid,
Expand Down
48 changes: 42 additions & 6 deletions src/test/java/io/divolte/server/DslRecordMapperTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 GoDataDriven B.V.
* Copyright 2019 GoDataDriven B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,7 +36,6 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.junit.After;
import org.junit.Test;

Expand All @@ -50,6 +49,7 @@
import java.nio.file.Files;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.*;
import java.util.stream.Stream;

Expand Down Expand Up @@ -100,7 +100,7 @@ public void shouldPopulateFlatFields() throws InterruptedException, IOException
assertEquals(true, record.get("sessionStart"));
assertEquals(true, record.get("unreliable"));
assertEquals(false, record.get("dupe"));
assertEquals(event.requestStartTime.toEpochMilli(), record.get("ts"));
assertEquals(event.requestStartTime, record.get("ts"));
assertEquals("https://example.com/", record.get("location"));
assertEquals("http://example.com/", record.get("referer"));

Expand Down Expand Up @@ -163,7 +163,43 @@ public void shouldMapClientTimestamp() throws IOException, InterruptedException
final EventPayload event = request("https://example.com/", "http://example.com/");
final GenericRecord record = event.record;

assertEquals(ClientSideCookieEventHandler.tryParseBase36Long(CLIENT_SIDE_TIME), record.get("ts"));
assertEquals(Instant.ofEpochMilli(Long.parseLong(CLIENT_SIDE_TIME, 36)), record.get("ts"));
}

@Test
public void shouldMapTimestampFromLong() throws IOException, InterruptedException {
setupServer("timestamp-from-long.groovy");

final EventPayload event = request("https://example.com/", "http://example.com/");
final GenericRecord record = event.record;

assertEquals(Instant.ofEpochSecond(1558101158L, 679000000L), record.get("ts"));
assertEquals(Instant.ofEpochSecond(1558101158L, 679123000L), record.get("tsMicros"));
}

@Test
public void shouldMapTimestampMillis() throws IOException, InterruptedException {
setupServer("timestamp-as-millis.groovy");

final EventPayload event = request("https://example.com/", "http://example.com/");
final GenericRecord record = event.record;

assertEquals(Instant.ofEpochMilli(Long.parseLong(CLIENT_SIDE_TIME, 36)), record.get("ts"));
}

@Test
public void shouldMapTimestampMicros() throws IOException, InterruptedException {
setupServer("timestamp-as-micros.groovy");

final EventPayload event = request("https://example.com/", "http://example.com/");
final GenericRecord record = event.record;

// We can't easily test the microsecond precision because there's no way to inject a
// specific timestamp with microsecond precision into the event. (The server timestamp
// has it, but the tests can't force it to a specific value.)
// So we have to make do with the client timestamp and checking that it's interpreted
// correctly.
assertEquals(Instant.ofEpochMilli(Long.parseLong(CLIENT_SIDE_TIME, 36)), record.get("tsMicros"));
}

@Test(expected=SchemaMappingException.class)
Expand All @@ -185,7 +221,7 @@ public void shouldApplyActionsInClosureWhenEqualToConditionHolds() throws IOExce

assertEquals("locationmatch", event.record.get("eventType"));
assertEquals("referermatch", event.record.get("client"));
assertEquals(new Utf8("not set"), event.record.get("queryparam"));
assertEquals("not set", event.record.get("queryparam"));

assertEquals("absent", event.record.get("event"));
assertEquals("present", event.record.get("pageview"));
Expand All @@ -195,7 +231,7 @@ public void shouldApplyActionsInClosureWhenEqualToConditionHolds() throws IOExce
public void shouldChainValueProducersWithIntermediateNull() throws IOException, InterruptedException {
setupServer("chained-na-mapping.groovy");
final EventPayload event = request("http://www.exmaple.com/");
assertEquals(new Utf8("not set"), event.record.get("queryparam"));
assertEquals("not set", event.record.get("queryparam"));
}

@Test
Expand Down
98 changes: 92 additions & 6 deletions src/test/java/io/divolte/server/ServerTestUtils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 GoDataDriven B.V.
* Copyright 2019 GoDataDriven B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,14 @@
import io.divolte.server.config.ValidatedConfiguration;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.ServerConnection;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,13 +42,13 @@
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -118,7 +125,7 @@ public static final class EventPayload {
final AvroRecordBuffer buffer;
final GenericRecord record;

public EventPayload(final DivolteEvent event,
private EventPayload(final DivolteEvent event,
final AvroRecordBuffer buffer,
final GenericRecord record) {
this.event = Objects.requireNonNull(event);
Expand All @@ -127,6 +134,84 @@ public EventPayload(final DivolteEvent event,
}
}

@ParametersAreNonnullByDefault
private static final class EventDecoder {
private final Map<Schema, DatumReader<GenericRecord>> readers = new ConcurrentHashMap<>();
private final Consumer<EventPayload> payloadConsumer;

private EventDecoder(final Consumer<EventPayload> payloadConsumer) {
this.payloadConsumer = Objects.requireNonNull(payloadConsumer);
}

private static void decodeSchemaAsStrings(final Schema schema) {
/*
* Recursively descend through a schema marking all the strings (and keys in maps)
* so that they're decoded as Java strings instead of Avro Utf8 instances.
* This is intended as a convenience for tests.
*/
switch (schema.getType()) {
case MAP:
decodeSchemaAsStrings(schema.getValueType());
// Intentional fall-through.
case STRING:
GenericData.setStringType(schema, GenericData.StringType.String);
break;
case UNION:
schema.getTypes().forEach(EventDecoder::decodeSchemaAsStrings);
break;
case ARRAY:
decodeSchemaAsStrings(schema.getElementType());
break;
case RECORD:
schema.getFields().stream().map(Schema.Field::schema).forEach(EventDecoder::decodeSchemaAsStrings);
break;
default:
// Nothing to do for other schema types.
break;
}
}

private DatumReader<GenericRecord> getReader(final Schema schema) {
/*
* This is complicated, but the idea is to avoid marking a schema recursively to
* produce normal strings. We also want to cache the readers.
*
* A complication is that recursively marking the schema involves setting
* properties, which means the hash code changes.
*
* Our steps are:
* 1. See if the schema already has a reader.
* If so, it's already marked for strings and we're done.
* 2. No reader? Mark the schema up for string conversions.
* This changes the hash code if it wasn't already marked up.
* 3. Use compute-if-absent to atomically build a reader if there still isn't
* one.
*
* (What we're doing here smells a bit like double-checked locking, but isn't;
* instead it's more like opportunistic locking.)
*/
DatumReader<GenericRecord> reader = readers.get(schema);
if (null == reader) {
decodeSchemaAsStrings(schema);
reader = readers.computeIfAbsent(schema,
s -> new GenericDatumReader<>(s, s, AvroRecordBuffer.AVRO_GENERIC_DATA));
}
return reader;
}

private void onEvent(final DivolteEvent event, final AvroRecordBuffer buffer, final GenericRecord record) {
final DatumReader<GenericRecord> reader = getReader(record.getSchema());
final Decoder decoder = DecoderFactory.get().binaryDecoder(new ByteBufferInputStream(Collections.singletonList(buffer.getByteBuffer())), null);
try {
final GenericRecord normalizedRecord = reader.read(null, decoder);
payloadConsumer.accept(new EventPayload(event, buffer, normalizedRecord));
} catch (final IOException e) {
// Discard the event if we can't decode the record.
logger.error("Cannot decode Avro record", e);
}
}
}

@ParametersAreNonnullByDefault
public static final class TestServer {
final Config config;
Expand Down Expand Up @@ -159,10 +244,11 @@ private TestServer(final InetSocketAddress hostPort, final Config config) {
.withValue("divolte.global.server.port", ConfigValueFactory.fromAnyRef(port));

events = new ArrayBlockingQueue<>(100);
final EventDecoder eventDecoder = new EventDecoder(events::add);
final ValidatedConfiguration vc = new ValidatedConfiguration(() -> this.config);
Preconditions.checkArgument(vc.isValid(),
"Invalid test server configuration: %s", vc.errors());
server = new Server(vc, (event, buffer, record) -> events.add(new EventPayload(event, buffer, record)));
server = new Server(vc, eventDecoder::onEvent);
try {
server.run();
} catch (final RuntimeException e) {
Expand Down
Loading