Skip to content

Commit

Permalink
ignite-757 - direct marshalling for H2 types
Browse files Browse the repository at this point in the history
  • Loading branch information
S.Vladykin committed Apr 27, 2015
1 parent 59ec2f9 commit 351001b
Show file tree
Hide file tree
Showing 37 changed files with 2,472 additions and 221 deletions.
Expand Up @@ -18,7 +18,6 @@
package org.apache.ignite.codegen;

import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.datastreamer.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
Expand All @@ -45,6 +44,9 @@ public class MessageCodeGenerator {
/** */
private static final String DFLT_SRC_DIR = U.getIgniteHome() + "/modules/core/src/main/java";

/** */
private static final String INDEXING_SRC_DIR = U.getIgniteHome() + "/modules/indexing/src/main/java";

/** */
private static final Class<?> BASE_CLS = Message.class;

Expand Down Expand Up @@ -140,7 +142,7 @@ public static void main(String[] args) throws Exception {

MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);

gen.generateAndWrite(DataStreamerEntry.class);
// gen.generateAndWrite(DataStreamerEntry.class);

// gen.generateAndWrite(GridDistributedLockRequest.class);
// gen.generateAndWrite(GridDistributedLockResponse.class);
Expand Down Expand Up @@ -172,6 +174,25 @@ public static void main(String[] args) throws Exception {
// gen.generateAndWrite(GridQueryNextPageResponse.class);
// gen.generateAndWrite(GridQueryRequest.class);
// gen.generateAndWrite(GridCacheSqlQuery.class);

// gen.generateAndWrite(GridH2Null.class);
// gen.generateAndWrite(GridH2Boolean.class);
// gen.generateAndWrite(GridH2Byte.class);
// gen.generateAndWrite(GridH2Short.class);
// gen.generateAndWrite(GridH2Integer.class);
// gen.generateAndWrite(GridH2Long.class);
// gen.generateAndWrite(GridH2Decimal.class);
// gen.generateAndWrite(GridH2Double.class);
// gen.generateAndWrite(GridH2Float.class);
// gen.generateAndWrite(GridH2Time.class);
// gen.generateAndWrite(GridH2Date.class);
// gen.generateAndWrite(GridH2Timestamp.class);
// gen.generateAndWrite(GridH2Bytes.class);
// gen.generateAndWrite(GridH2String.class);
// gen.generateAndWrite(GridH2Array.class);
// gen.generateAndWrite(GridH2JavaObject.class);
// gen.generateAndWrite(GridH2Uuid.class);
// gen.generateAndWrite(GridH2Geometry.class);
}

/**
Expand Down
Expand Up @@ -17,10 +17,13 @@

package org.apache.ignite.internal;

import org.apache.ignite.plugin.extensions.communication.*;

import java.lang.annotation.*;

/**
* Annotates iterable fields.
* Note that for any {@link Message} implementations it is enough to set item type to {@code Message.class}.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
Expand Down
Expand Up @@ -55,6 +55,9 @@
* Grid communication manager.
*/
public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>> {
/** Empty array of message factories. */
public static final MessageFactory[] EMPTY = {};

/** Max closed topics to store. */
public static final int MAX_CLOSED_TOPICS = 10240;

Expand Down Expand Up @@ -224,7 +227,25 @@ public void resetMetrics() {
};
}

msgFactory = new GridIoMessageFactory(ctx.plugins().extensions(MessageFactory.class));
MessageFactory[] msgs = ctx.plugins().extensions(MessageFactory.class);

if (msgs == null)
msgs = EMPTY;

MessageFactory qryMsgs = null;

try {
qryMsgs = U.newInstance( // TODO fix this dirty hack
"org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory");
}
catch (IgniteCheckedException e) {
// No-op.
}

if (qryMsgs != null)
msgs = F.concat(msgs, qryMsgs);

msgFactory = new GridIoMessageFactory(msgs);

if (log.isDebugEnabled())
log.debug(startInfo());
Expand Down
Expand Up @@ -595,6 +595,14 @@ public GridIoMessageFactory(MessageFactory[] ext) {

break;

case 112:
msg = new GridCacheSqlQuery();

break;

// [-3..112] - this
// [120..123] - DR
// [-4..-21] - SQL
default:
if (ext != null) {
for (MessageFactory factory : ext) {
Expand Down
Expand Up @@ -17,35 +17,43 @@

package org.apache.ignite.internal.processors.cache.query;

import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.extensions.communication.*;

import java.io.*;
import java.nio.*;

/**
* Query.
*/
public class GridCacheSqlQuery implements Externalizable {
public class GridCacheSqlQuery implements Message {
/** */
private static final long serialVersionUID = 0L;

/** */
public static final Object[] EMPTY_PARAMS = {};

/** */
String alias;
private String alias;

/** */
@GridToStringInclude
String qry;
private String qry;

/** */
@GridToStringInclude
Object[] params;
@GridDirectTransient
private Object[] params;

/** */
private byte[] paramsBytes;

/**
* For {@link Externalizable}.
* For {@link Message}.
*/
public GridCacheSqlQuery() {
// No-op.
Expand Down Expand Up @@ -86,25 +94,116 @@ public Object[] parameters() {
return params;
}

/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, alias);
U.writeString(out, qry);
U.writeArray(out, params);
/**
* @param m Marshaller.
* @throws IgniteCheckedException If failed.
*/
public void marshallParams(Marshaller m) throws IgniteCheckedException {
if (paramsBytes != null)
return;

assert params != null;

paramsBytes = m.marshal(params);
}

/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
alias = U.readString(in);
qry = U.readString(in);
params = U.readArray(in);
/**
* @param m Marshaller.
* @throws IgniteCheckedException If failed.
*/
public void unmarshallParams(Marshaller m) throws IgniteCheckedException {
if (params != null)
return;

assert paramsBytes != null;

if (F.isEmpty(params))
params = EMPTY_PARAMS;
params = m.unmarshal(paramsBytes, null);
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheSqlQuery.class, this);
}

/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);

if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType(), fieldsCount()))
return false;

writer.onHeaderWritten();
}

switch (writer.state()) {
case 0:
if (!writer.writeString("alias", alias))
return false;

writer.incrementState();

case 1:
if (!writer.writeByteArray("paramsBytes", paramsBytes))
return false;

writer.incrementState();

case 2:
if (!writer.writeString("qry", qry))
return false;

writer.incrementState();

}

return true;
}

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);

if (!reader.beforeMessageRead())
return false;

switch (reader.state()) {
case 0:
alias = reader.readString("alias");

if (!reader.isLastRead())
return false;

reader.incrementState();

case 1:
paramsBytes = reader.readByteArray("paramsBytes");

if (!reader.isLastRead())
return false;

reader.incrementState();

case 2:
qry = reader.readString("qry");

if (!reader.isLastRead())
return false;

reader.incrementState();

}

return true;
}

/** {@inheritDoc} */
@Override public byte directType() {
return 112;
}

/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 3;
}
}
Expand Up @@ -23,16 +23,12 @@
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;

import java.io.*;
import java.util.*;

/**
* Two step map-reduce style query.
*/
public class GridCacheTwoStepQuery implements Serializable {
/** */
private static final long serialVersionUID = 0L;

public class GridCacheTwoStepQuery {
/** */
public static final int DFLT_PAGE_SIZE = 1000;

Expand Down Expand Up @@ -95,7 +91,7 @@ public GridCacheSqlQuery reduceQuery() {
* @return Map queries.
*/
public Collection<GridCacheSqlQuery> mapQueries() {
return new ArrayList<>(mapQrys.values()); // Copy to make it Serializable.
return mapQrys.values();
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.spi.indexing.*;
import org.jetbrains.annotations.*;

Expand Down Expand Up @@ -224,4 +225,11 @@ public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Cach
* @return Backup filter.
*/
public IndexingQueryFilter backupFilter();

/**
* Gets message factory.
*
* @return Message factory.
*/
public MessageFactory messageFactory();
}
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.query.*;
Expand All @@ -35,6 +36,7 @@
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.spi.indexing.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
Expand Down Expand Up @@ -649,6 +651,13 @@ public <K,V> Iterator<Cache.Entry<K,V>> queryLocal(GridCacheContext<?,?> cctx, S
}
}

/**
* @return Message factory for {@link GridIoManager}.
*/
public MessageFactory messageFactory() {
return idx == null ? null : idx.messageFactory();
}

/**
* Closeable iterator.
*/
Expand Down

0 comments on commit 351001b

Please sign in to comment.