Skip to content

Commit

Permalink
Change priority, host, timestamp accessors to functions.
Browse files Browse the repository at this point in the history
Scalar functions priority(), priority_level(), host(), and event_timestamp()
now provide access to the properties of the current event, eliminating
the #priority, #host, and #timestamp fields which shadowed potential
user-defined attributes.

ScalarFunc API changed to have access to the entire current EventWrapper
in addition to its ordinary argument array.

Improved string representation of timestamps in OutputElement.
  • Loading branch information
kimballa committed May 11, 2011
1 parent 6dd8aed commit 5dec46b
Show file tree
Hide file tree
Showing 20 changed files with 282 additions and 79 deletions.
6 changes: 0 additions & 6 deletions TODO
@@ -1,11 +1,5 @@


Before releasing: is #host, #priority such a good idea? This aliases
with the namespace of attributes. Longer names (#flumebase.host) would
require more typing, and don't really solve the problem. But reserved
keywords (a. la. 'rownum' in Oracle) are established.. It doesn't seem
unreasonable to have reserved attr names too. -- review with gwu.

-- BINARY: Need tests for equality, comparison operators
-- What does ByteBuffer.equals() do? Should we be using a separate array
equality comparator? Ensure that we're not coercing all the way to string for this.
Expand Down
56 changes: 35 additions & 21 deletions src/docbkx/UserGuide.xml
Expand Up @@ -1243,42 +1243,56 @@ rtsql&gt; <userinput>SELECT COUNT(*) FROM foo OVER mywin</userinput>
named attributes.
</para>
<para>
Pre-defined system columns allow you to access each of these:
Named attributes of an event can be accessed with the syntax
"<userinput>#attrname</userinput>". This acts like a column of
type <literal>BINARY</literal>.
</para>
<table><caption>System columns in rtsql streams</caption>
<para>
For example, to select events with the <literal>interesting</literal> attribute:
<screen>
rtsql&gt; <userinput>SELECT * FROM foo WHERE #interesting IS NOT NULL;</userinput>
</screen>
</para>
<para>
Event attributes are defined as a STRING key and a BINARY value. To
use these values as strings, use the <literal>bin2str()</literal>
function:
<screen>
rtsql&gt; <userinput>SELECT * FROM foo WHERE bin2str(#x) = 'abc';</userinput>
</screen>
</para>
<para>
A set of functions allow you to access the host, priority, and timestamp
properties of each event:
</para>
<table><caption>Event property accessor functions</caption>
<thead>
<tr><td>column</td><td>accesses</td><td>type</td></tr>
<tr><td>function</td><td>accesses</td><td>type</td></tr>
</thead>
<tbody>
<tr><td><literal>#timestamp</literal></td><td>Event
timestamp</td><td>TIMESTAMP NOT NULL</td></tr>
<tr><td><literal>#host</literal></td><td>Event origin
<tr><td><literal>event_timestamp()</literal></td><td>Event
<literal>timestamp</literal> and <literal>nanos</literal>
properties</td><td>TIMESTAMP NOT NULL</td></tr>
<tr><td><literal>host()</literal></td><td>Event origin
host</td><td>STRING NOT NULL</td></tr>
<tr><td><literal>#priority</literal></td><td>Event priority</td>
<tr><td><literal>priority()</literal></td><td>Event priority label</td>
<td>STRING NOT NULL</td></tr>
<tr><td><userinput>#attrname</userinput></td><td>An attribute
named <literal>attrname</literal></td>
<td>BINARY</td></tr>
<tr><td><literal>priority_level()</literal></td><td>Event priority as an integer</td>
<td>INT NOT NULL</td></tr>
</tbody>
</table>
<para>
For example, to select events only at the ERROR priority level:
<screen>
rtsql&gt; <userinput>SELECT * FROM foo WHERE #priority = 'ERROR';</userinput>
</screen>
</para>
<para>
Or to select events with the <literal>interesting</literal> attribute:
<screen>
rtsql&gt; <userinput>SELECT * FROM foo WHERE #interesting IS NOT NULL;</userinput>
rtsql&gt; <userinput>SELECT * FROM foo WHERE priority() = 'ERROR';</userinput>
</screen>
</para>
<para>
Event attributes are defined as a STRING key and a BINARY value. To
use these values as strings, use the <literal>bin2str()</literal>
function:
The priority field is also available as an integer. More urgent priorities have
lower ordinal values (<constant>'FATAL'</constant> is <constant>0</constant>).
To select events at the <constant>WARN</constant> level and more urgent:
<screen>
rtsql&gt; <userinput>SELECT * FROM foo WHERE bin2str(#x) = 'abc';</userinput>
rtsql&gt; <userinput>SELECT * FROM foo WHERE priority_level() &lt;= 2;</userinput>
</screen>
</para>
</section>
Expand Down
Expand Up @@ -18,16 +18,15 @@
package com.odiago.flumebase.exec;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.odiago.flumebase.exec.builtins.*;
import com.odiago.flumebase.exec.builtins.bin2str;

import com.odiago.flumebase.lang.Function;
import com.odiago.flumebase.lang.Type;
Expand All @@ -42,15 +41,19 @@ public class BuiltInSymbolTable extends SymbolTable {

private static Map<String, Symbol> BUILTINS;
static {
BUILTINS = new HashMap<String, Symbol>();
BUILTINS = new TreeMap<String, Symbol>();
// Add symbols for all built-in objects in the system.
loadBuiltinFunction(avg.class);
loadBuiltinFunction(bin2str.class);
loadBuiltinFunction(count.class);
loadBuiltinFunction(current_timestamp.class);
loadBuiltinFunction(event_timestamp.class);
loadBuiltinFunction(host.class);
loadBuiltinFunction(length.class);
loadBuiltinFunction(min.class);
loadBuiltinFunction(max.class);
loadBuiltinFunction(priority.class);
loadBuiltinFunction(priority_level.class);
loadBuiltinFunction(square.class);
loadBuiltinFunction(sum.class);
BUILTINS = Collections.unmodifiableMap(BUILTINS);
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/com/odiago/flumebase/exec/OutputElement.java
Expand Up @@ -57,6 +57,8 @@
import com.odiago.flumebase.io.AvroEventParser;

import com.odiago.flumebase.lang.StreamType;
import com.odiago.flumebase.lang.Timestamp;
import com.odiago.flumebase.lang.TimestampBase;
import com.odiago.flumebase.lang.Type;

import com.odiago.flumebase.parser.FormatSpec;
Expand Down Expand Up @@ -323,11 +325,25 @@ public void takeEvent(EventWrapper e) throws IOException, InterruptedException {
for (TypedField field : mInputFields) {
sb.append('\t');
Object fieldVal = e.getField(field);
if (null != fieldVal) {
LOG.debug("Printing val of class " + fieldVal.getClass().getName());
}

// If we get any GenericRecord types, convert them to our own specific types
// if we can figure out which to use. This makes toString'ing prettier.
if (fieldVal instanceof GenericRecord) {
GenericRecord record = (GenericRecord) fieldVal;
if (record.getSchema().equals(TimestampBase.SCHEMA$)) {
fieldVal = new Timestamp((Long) record.get("milliseconds"),
(Long) record.get("nanos"));
}
}

if (null == fieldVal) {
sb.append("null");
} else if (fieldVal instanceof ByteBuffer) {
sb.append("B[");
String toStr = (String) BIN2STR_FN.eval(fieldVal);
String toStr = (String) BIN2STR_FN.eval(null, fieldVal);
sb.append(toStr);
sb.append("]");
} else {
Expand Down
Expand Up @@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.List;

import com.odiago.flumebase.exec.EventWrapper;

import com.odiago.flumebase.lang.ScalarFunc;
import com.odiago.flumebase.lang.Type;

Expand All @@ -44,7 +46,7 @@ public Type getReturnType() {
}

@Override
public Object eval(Object... args) {
public Object eval(EventWrapper event, Object... args) {
Object arg0 = args[0];
if (null == arg0) {
return null;
Expand Down
Expand Up @@ -17,12 +17,13 @@

package com.odiago.flumebase.exec.builtins;

import java.sql.Timestamp;

import java.util.Collections;
import java.util.List;

import com.odiago.flumebase.exec.EventWrapper;

import com.odiago.flumebase.lang.ScalarFunc;
import com.odiago.flumebase.lang.Timestamp;
import com.odiago.flumebase.lang.Type;

/**
Expand All @@ -35,7 +36,7 @@ public Type getReturnType() {
}

@Override
public Object eval(Object... args) {
public Object eval(EventWrapper event, Object... args) {
return new Timestamp(System.currentTimeMillis());
}

Expand Down
@@ -0,0 +1,50 @@
/**
* Licensed to Odiago, Inc. under one or more contributor license
* agreements. See the NOTICE.txt file distributed with this work for
* additional information regarding copyright ownership. Odiago, Inc.
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.odiago.flumebase.exec.builtins;

import java.util.Collections;
import java.util.List;

import com.cloudera.flume.core.Event;

import com.odiago.flumebase.exec.EventWrapper;

import com.odiago.flumebase.lang.ScalarFunc;
import com.odiago.flumebase.lang.Timestamp;
import com.odiago.flumebase.lang.Type;

/**
* Return the timestamp field of the current event as a TIMESTAMP.
*/
public class event_timestamp extends ScalarFunc {
@Override
public Type getReturnType() {
return Type.getPrimitive(Type.TypeName.TIMESTAMP);
}

@Override
public Object eval(EventWrapper event, Object... args) {
Event e = event.getEvent();
return new Timestamp(e.getTimestamp(), e.getNanos());
}

@Override
public List<Type> getArgumentTypes() {
return Collections.emptyList();
}
}
49 changes: 49 additions & 0 deletions src/main/java/com/odiago/flumebase/exec/builtins/host.java
@@ -0,0 +1,49 @@
/**
* Licensed to Odiago, Inc. under one or more contributor license
* agreements. See the NOTICE.txt file distributed with this work for
* additional information regarding copyright ownership. Odiago, Inc.
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.odiago.flumebase.exec.builtins;

import java.util.Collections;
import java.util.List;

import com.cloudera.flume.core.Event;

import com.odiago.flumebase.exec.EventWrapper;

import com.odiago.flumebase.lang.ScalarFunc;
import com.odiago.flumebase.lang.Type;

/**
* Return the host field of the current event as a STRING.
*/
public class host extends ScalarFunc {
@Override
public Type getReturnType() {
return Type.getPrimitive(Type.TypeName.STRING);
}

@Override
public Object eval(EventWrapper event, Object... args) {
Event e = event.getEvent();
return e.getHost();
}

@Override
public List<Type> getArgumentTypes() {
return Collections.emptyList();
}
}
4 changes: 3 additions & 1 deletion src/main/java/com/odiago/flumebase/exec/builtins/length.java
Expand Up @@ -20,6 +20,8 @@
import java.util.Collections;
import java.util.List;

import com.odiago.flumebase.exec.EventWrapper;

import com.odiago.flumebase.lang.ScalarFunc;
import com.odiago.flumebase.lang.Type;

Expand All @@ -33,7 +35,7 @@ public Type getReturnType() {
}

@Override
public Object eval(Object... args) {
public Object eval(EventWrapper event, Object... args) {
Object arg0 = args[0];
if (null == arg0) {
return null;
Expand Down
49 changes: 49 additions & 0 deletions src/main/java/com/odiago/flumebase/exec/builtins/priority.java
@@ -0,0 +1,49 @@
/**
* Licensed to Odiago, Inc. under one or more contributor license
* agreements. See the NOTICE.txt file distributed with this work for
* additional information regarding copyright ownership. Odiago, Inc.
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.odiago.flumebase.exec.builtins;

import java.util.Collections;
import java.util.List;

import com.cloudera.flume.core.Event;

import com.odiago.flumebase.exec.EventWrapper;

import com.odiago.flumebase.lang.ScalarFunc;
import com.odiago.flumebase.lang.Type;

/**
* Return the priority field of the current event as a STRING.
*/
public class priority extends ScalarFunc {
@Override
public Type getReturnType() {
return Type.getPrimitive(Type.TypeName.STRING);
}

@Override
public Object eval(EventWrapper event, Object... args) {
Event e = event.getEvent();
return e.getPriority().toString();
}

@Override
public List<Type> getArgumentTypes() {
return Collections.emptyList();
}
}

0 comments on commit 5dec46b

Please sign in to comment.