Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log objects directly. #6

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/main/java/org/fluentd/logger/FluentLogger.java
Expand Up @@ -82,10 +82,18 @@ public boolean log(String tag, String key, Object value, long timestamp) {
}

public boolean log(String tag, Map<String, Object> data) {
return log(tag, data, 0);
return log(tag, (Object)data, 0);
}

public boolean log(String tag, Map<String, Object> data, long timestamp) {
return log(tag, (Object)data, timestamp);
}

public boolean log(String tag, Object data) {
return log(tag, data, 0);
}

public boolean log(String tag, Object data, long timestamp) {
String concatTag = null;
if (tagPrefix == null || tagPrefix.length() == 0) {
concatTag = tag;
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/fluentd/logger/sender/DefaultEventTemplate.java
@@ -0,0 +1,12 @@
package org.fluentd.logger.sender;

import java.io.IOException;

import org.msgpack.packer.Packer;

public class DefaultEventTemplate extends EventTemplate {
@Override
protected void doWriteData(Packer pk, Object data, boolean required) throws IOException {
pk.write(data);
}
}
52 changes: 3 additions & 49 deletions src/main/java/org/fluentd/logger/sender/Event.java
Expand Up @@ -17,30 +17,22 @@
//
package org.fluentd.logger.sender;

import java.io.IOException;
import java.util.Map;

import org.msgpack.MessageTypeException;
import org.msgpack.packer.Packer;
import org.msgpack.template.AbstractTemplate;
import org.msgpack.template.Templates;
import org.msgpack.unpacker.Unpacker;

public class Event {
public String tag;

public long timestamp;

public Map<String, Object> data;
public Object data;

public Event() {
}

public Event(String tag, Map<String, Object> data) {
public Event(String tag, Object data) {
this(tag, System.currentTimeMillis() / 1000, data);
}

public Event(String tag, long timestamp, Map<String, Object> data) {
public Event(String tag, long timestamp, Object data) {
this.tag = tag;
this.timestamp = timestamp;
this.data = data;
Expand All @@ -51,42 +43,4 @@ public String toString() {
return String.format("Event{tag=%s,timestamp=%d,data=%s}",
tag, timestamp, data.toString());
}

public static class EventTemplate extends AbstractTemplate<Event> {
public static EventTemplate INSTANCE = new EventTemplate();

public void write(Packer pk, Event v, boolean required) throws IOException {
if (v == null) {
if (required) {
throw new MessageTypeException("Attempted to write null");
}
pk.writeNil();
return;
}

pk.writeArrayBegin(3);
{
Templates.TString.write(pk, v.tag, required);
Templates.TLong.write(pk, v.timestamp, required);
pk.writeMapBegin(v.data.size());
{
for (Map.Entry<String, Object> entry : v.data.entrySet()) {
Templates.TString.write(pk, entry.getKey(), required);
try {
pk.write(entry.getValue());
} catch (MessageTypeException e) {
String val = entry.getValue().toString();
Templates.TString.write(pk, val, required);
}
}
}
pk.writeMapEnd();
}
pk.writeArrayEnd();
}

public Event read(Unpacker u, Event to, boolean required) throws IOException {
throw new UnsupportedOperationException("Don't need the operation");
}
}
}
38 changes: 38 additions & 0 deletions src/main/java/org/fluentd/logger/sender/EventTemplate.java
@@ -0,0 +1,38 @@
package org.fluentd.logger.sender;

import java.io.IOException;

import org.msgpack.MessageTypeException;
import org.msgpack.packer.Packer;
import org.msgpack.template.AbstractTemplate;
import org.msgpack.template.Templates;
import org.msgpack.unpacker.Unpacker;

public abstract class EventTemplate extends AbstractTemplate<Event> {
public static EventTemplate INSTANCE = new DefaultEventTemplate();

public void write(Packer pk, Event v, boolean required) throws IOException {
if (v == null) {
if (required) {
throw new MessageTypeException("Attempted to write null");
}
pk.writeNil();
return;
}

pk.writeArrayBegin(3);
{
Templates.TString.write(pk, v.tag, required);
Templates.TLong.write(pk, v.timestamp, required);
doWriteData(pk, v.data, required);
}
pk.writeArrayEnd();
}

protected abstract void doWriteData(Packer pk, Object data, boolean required)
throws IOException;

public Event read(Unpacker u, Event to, boolean required) throws IOException {
throw new UnsupportedOperationException("Don't need the operation");
}
}
75 changes: 75 additions & 0 deletions src/main/java/org/fluentd/logger/sender/MapStyleEventTemplate.java
@@ -0,0 +1,75 @@
package org.fluentd.logger.sender;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.LinkedHashMap;
import java.util.Map;

import org.msgpack.MessageTypeException;
import org.msgpack.packer.Packer;
import org.msgpack.template.Templates;

public class MapStyleEventTemplate extends EventTemplate {
@Override
protected void doWriteData(Packer pk, Object data, boolean required) throws IOException {
if(data instanceof Map){
writeMap(pk, (Map<?, ?>)data, required);
} else{
try{
pk.write(data);
} catch (MessageTypeException e) {
writeObj(pk, data, required);
}
}
}

private <K, V> void writeMap(Packer pk, Map<K, V> map, boolean required) throws IOException {
pk.writeMapBegin(map.size());
{
for (Map.Entry<?, ?> entry : map.entrySet()) {
Templates.TString.write(pk, entry.getKey().toString(), required);
Object value = entry.getValue();
if(value instanceof Map<?, ?>){
writeMap(pk, (Map<?, ?>)value, required);
} else{
try {
pk.write(entry.getValue());
} catch (MessageTypeException e) {
writeObj(pk, entry.getValue(), required);
}
}
}
}
pk.writeMapEnd();
}

private void writeObj(Packer pk, Object data, boolean required) throws IOException {
Map<String, Object> map = new LinkedHashMap<String, Object>();
Class<?> clazz = data.getClass();
while(!clazz.equals(Object.class)){
for(Method m : clazz.getDeclaredMethods()){
if(m.getDeclaringClass().equals(Object.class)) continue;
if(m.getParameterTypes().length != 0) continue;
String name = null;
if(m.getName().startsWith("get")){
name = m.getName().substring(3);
} else if(m.getName().startsWith("is") && m.getReturnType().equals(boolean.class)){
name = m.getName().substring(2);
} else{
continue;
}
if(name.length() == 0) continue;
name = name.substring(0, 1).toLowerCase() + (name.length() == 1 ? "" : name.substring(1));
try {
map.put(name, m.invoke(data));
} catch (IllegalArgumentException e) {
} catch (IllegalAccessException e) {
} catch (InvocationTargetException e) {
}
}
clazz = clazz.getSuperclass();
}
writeMap(pk, map, required);
}
}
5 changes: 2 additions & 3 deletions src/main/java/org/fluentd/logger/sender/NullSender.java
Expand Up @@ -17,20 +17,19 @@
//
package org.fluentd.logger.sender;

import java.util.Map;

public class NullSender implements Sender {

public NullSender(String host, int port, int timeout, int bufferCapacity) {
}

@Override
public boolean emit(String tag, Map<String, Object> data) {
public boolean emit(String tag, Object data) {
return emit(tag, System.currentTimeMillis() / 1000, data);
}

@Override
public boolean emit(String tag, long timestamp, Map<String, Object> data) {
public boolean emit(String tag, long timestamp, Object data) {
return true;
}

Expand Down
7 changes: 3 additions & 4 deletions src/main/java/org/fluentd/logger/sender/RawSocketSender.java
Expand Up @@ -23,7 +23,6 @@
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.logging.Level;

import org.msgpack.MessagePack;
Expand Down Expand Up @@ -63,7 +62,7 @@ public RawSocketSender(String host, int port, int timeout, int bufferCapacity) {

public RawSocketSender(String host, int port, int timeout, int bufferCapacity, Reconnector reconnector) {
msgpack = new MessagePack();
msgpack.register(Event.class, Event.EventTemplate.INSTANCE);
msgpack.register(Event.class, EventTemplate.INSTANCE);
pendings = ByteBuffer.allocate(bufferCapacity);
server = new InetSocketAddress(host, port);
this.reconnector = reconnector;
Expand Down Expand Up @@ -125,11 +124,11 @@ public void close() {
}
}

public boolean emit(String tag, Map<String, Object> data) {
public boolean emit(String tag, Object data) {
return emit(tag, System.currentTimeMillis() / 1000, data);
}

public boolean emit(String tag, long timestamp, Map<String, Object> data) {
public boolean emit(String tag, long timestamp, Object data) {
return emit(new Event(tag, timestamp, data));
}

Expand Down
6 changes: 2 additions & 4 deletions src/main/java/org/fluentd/logger/sender/Sender.java
Expand Up @@ -17,12 +17,10 @@
//
package org.fluentd.logger.sender;

import java.util.Map;

public interface Sender {
boolean emit(String tag, Map<String, Object> data);
boolean emit(String tag, Object data);

boolean emit(String tag, long timestamp, Map<String, Object> data);
boolean emit(String tag, long timestamp, Object data);

void flush();

Expand Down