Skip to content

Commit

Permalink
Secure Dictionary Server Implementation
Browse files Browse the repository at this point in the history
Secure Dictionary Implementation Along with Non Secure.
  • Loading branch information
sounak authored and sounakr committed Jul 10, 2017
1 parent 0558c28 commit d0aca3a
Show file tree
Hide file tree
Showing 23 changed files with 729 additions and 284 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,82 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.dictionary.client;

import java.net.InetSocketAddress;
package org.apache.carbondata.core.dictionary.client;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

/**
* Dictionary client to connect to Dictionary server and generate dictionary values
*/
public class DictionaryClient {

private static final LogService LOGGER =
LogServiceFactory.getLogService(DictionaryClient.class.getName());

private DictionaryClientHandler dictionaryClientHandler = new DictionaryClientHandler();

private NioEventLoopGroup workerGroup;
public interface DictionaryClient {

/**
* start dictionary client
*
* @param address
* @param port
*/
public void startClient(String address, int port) {
LOGGER.audit("Starting client on " + address + " " + port);
long start = System.currentTimeMillis();
// Create an Event with 1 thread.
workerGroup = new NioEventLoopGroup(1);
Bootstrap clientBootstrap = new Bootstrap();
clientBootstrap.group(workerGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Based on length provided at header, it collects all packets
pipeline
.addLast("LengthDecoder",
new LengthFieldBasedFrameDecoder(1048576, 0,
2, 0, 2));
pipeline.addLast("DictionaryClientHandler", dictionaryClientHandler);
}
});
clientBootstrap.connect(new InetSocketAddress(address, port));
LOGGER.info(
"Dictionary client Started, Total time spent : " + (System.currentTimeMillis() - start));
}
public void startClient(String secretKey, String address, int port);

/**
* for client request
*
* @param key
* @return
*/
public DictionaryMessage getDictionary(DictionaryMessage key) {
return dictionaryClientHandler.getDictionary(key);
}
public void shutDown();

/**
* shutdown dictionary client
*/
public void shutDown() {
workerGroup.shutdownGracefully();
try {
workerGroup.terminationFuture().sync();
} catch (InterruptedException e) {
LOGGER.error(e);
}
}
public DictionaryMessage getDictionary(DictionaryMessage key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.dictionary.client;

import java.net.InetSocketAddress;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

/**
* Dictionary client to connect to Dictionary server and generate dictionary values
*/
public class NonSecureDictionaryClient implements DictionaryClient {

private static final LogService LOGGER =
LogServiceFactory.getLogService(NonSecureDictionaryClient.class.getName());

private NonSecureDictionaryClientHandler nonSecureDictionaryClientHandler =
new NonSecureDictionaryClientHandler();

private NioEventLoopGroup workerGroup;

/**
* start dictionary client
*
* @param address
* @param port
*/
@Override public void startClient(String secretKey, String address, int port) {
LOGGER.audit("Starting client on " + address + " " + port);
long start = System.currentTimeMillis();
// Create an Event with 1 thread.
workerGroup = new NioEventLoopGroup(1);
Bootstrap clientBootstrap = new Bootstrap();
clientBootstrap.group(workerGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// Based on length provided at header, it collects all packets
pipeline
.addLast("LengthDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 2, 0, 2));
pipeline.addLast("NonSecureDictionaryClientHandler", nonSecureDictionaryClientHandler);
}
});
clientBootstrap.connect(new InetSocketAddress(address, port));
LOGGER.info(
"Dictionary client Started, Total time spent : " + (System.currentTimeMillis() - start));
}

/**
* for client request
*
* @param key
* @return
*/
@Override
public DictionaryMessage getDictionary(
DictionaryMessage key) {
return nonSecureDictionaryClientHandler.getDictionary(key);
}

/**
* shutdown dictionary client
*/
@Override public void shutDown() {
workerGroup.shutdownGracefully();
try {
workerGroup.terminationFuture().sync();
} catch (InterruptedException e) {
LOGGER.error(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
/**
* Client handler to get data.
*/
public class DictionaryClientHandler extends ChannelInboundHandlerAdapter {
public class NonSecureDictionaryClientHandler extends ChannelInboundHandlerAdapter {

private static final LogService LOGGER =
LogServiceFactory.getLogService(DictionaryClientHandler.class.getName());
LogServiceFactory.getLogService(NonSecureDictionaryClientHandler.class.getName());

private final BlockingQueue<DictionaryMessage> responseMsgQueue = new LinkedBlockingQueue<>();

Expand All @@ -57,7 +57,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
try {
ByteBuf data = (ByteBuf) msg;
DictionaryMessage key = new DictionaryMessage();
key.readData(data);
key.readNonSecureData(data);
data.release();
responseMsgQueue.offer(key);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;


/**
* This is the dictionary generator for all tables. It generates dictionary
* based on @{@link DictionaryMessage}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonProperties;


/**
* Dictionary generation for table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,32 @@ public class DictionaryMessage {
*/
private DictionaryMessageType type;

public void readData(ByteBuf byteBuf) {
public void readNonSecureData(ByteBuf byteBuf) {
byteBuf.resetReaderIndex();
byte[] tableBytes = new byte[byteBuf.readInt()];
byteBuf.readBytes(tableBytes);
tableUniqueName = new String(tableBytes);

byte[] colBytes = new byte[byteBuf.readInt()];
byteBuf.readBytes(colBytes);
columnName = new String(colBytes);

byte typeByte = byteBuf.readByte();
type = getKeyType(typeByte);

byte dataType = byteBuf.readByte();
if (dataType == 0) {
dictionaryValue = byteBuf.readInt();
} else {
byte[] dataBytes = new byte[byteBuf.readInt()];
byteBuf.readBytes(dataBytes);
data = new String(dataBytes);
}
}

public void readSecureData(ByteBuf byteBuf) {
byteBuf.resetReaderIndex();
short shtr = byteBuf.readShort();
byte[] tableBytes = new byte[byteBuf.readInt()];
byteBuf.readBytes(tableBytes);
tableUniqueName = new String(tableBytes);
Expand Down Expand Up @@ -103,6 +128,7 @@ public void writeData(ByteBuf byteBuf) {
byteBuf.setShort(startIndex, endIndex - startIndex - 2);
}


private DictionaryMessageType getKeyType(byte type) {
switch (type) {
case 1:
Expand Down

0 comments on commit d0aca3a

Please sign in to comment.