Skip to content
This repository has been archived by the owner on Sep 27, 2023. It is now read-only.

Commit

Permalink
+ custom codecs tutorial (replaying decoder)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bruno de Carvalho committed Dec 5, 2010
1 parent 0e6d0e0 commit cafb5d6
Show file tree
Hide file tree
Showing 13 changed files with 758 additions and 1 deletion.
4 changes: 4 additions & 0 deletions customcodecs/README.md
@@ -0,0 +1,4 @@
Custom codecs tutorial
======================

This is the backing code for a basic tutorial on netty's replaying decoder at... Well, I'm still writing it :x
119 changes: 119 additions & 0 deletions customcodecs/pom.xml
@@ -0,0 +1,119 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.biasedbit.nettytutorials</groupId>
<artifactId>customcodecs</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>

<name>netty tutorial - customcodecs</name>
<url>http://bruno.biasedbit.com/blag/</url>
<description>
Netty tutorials - custom codecs
</description>
<inceptionYear>2010</inceptionYear>
<developers>
<developer>
<name>Bruno de Carvalho</name>
<email>bruno@biasedbit.com</email>
<url>http://bruno.biasedbit.com</url>
</developer>
</developers>

<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0</url>
</license>
</licenses>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<repositories>
<repository>
<id>repository.jboss.org</id>
<url>http://repository.jboss.org/nexus/payload/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.3.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
<scope>compile</scope>
</dependency>

<!-- optionals, logging -->
<dependency>
<optional>true</optional>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<optional>true</optional>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
<scope>runtime</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Compiler configuration -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>

<!-- tests -->
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<forkMode>never</forkMode>
<excludes>
<exclude>**/Abstract*</exclude>
<exclude>**/*TestUtil*</exclude>
</excludes>
</configuration>
</plugin>

<!-- IDEA integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-idea-plugin</artifactId>
<version>2.2</version>
<configuration>
<downloadSources>true</downloadSources>
<jdkLevel>7</jdkLevel>
</configuration>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,148 @@
package com.biasedbit.nettytutorials.customcodecs.client;

import com.biasedbit.nettytutorials.customcodecs.common.Envelope;
import com.biasedbit.nettytutorials.customcodecs.common.Decoder;
import com.biasedbit.nettytutorials.customcodecs.common.Encoder;
import com.biasedbit.nettytutorials.customcodecs.common.Type;
import com.biasedbit.nettytutorials.customcodecs.common.Version;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class Client implements ClientHandlerListener {

// configuration --------------------------------------------------------------------------------------------------

private final String host;
private final int port;
private final int messages;
private final int floods;

// internal vars --------------------------------------------------------------------------------------------------

private ChannelFactory clientFactory;
private ChannelGroup channelGroup;
private ClientHandler handler;
private final AtomicInteger received;
private int flood;
private long startTime;

// constructors ---------------------------------------------------------------------------------------------------

public Client(String host, int port, int messages, int floods) {
this.host = host;
this.port = port;
this.messages = messages;
this.floods = floods;
this.received = new AtomicInteger(0);
this.flood = 0;
}

// ClientHandlerListener ------------------------------------------------------------------------------------------

@Override
public void messageReceived(Envelope message) {
// System.err.println("Received message " + message);
if (this.received.incrementAndGet() == this.messages) {
long stopTime = System.currentTimeMillis();
float timeInSeconds = (stopTime - this.startTime) / 1000f;
System.err.println("Sent and received " + this.messages + " in " + timeInSeconds + "s");
System.err.println("That's " + (this.messages / timeInSeconds) + " echoes per second!");

// ideally, this should be sent to another thread, since this method is called by a netty worker thread.
if (this.flood < this.floods) {
this.received.set(0);
this.flood++;
this.flood();
}
}
}

// public methods -------------------------------------------------------------------------------------------------

public boolean start() {

// For production scenarios, use limited sized thread pools
this.clientFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
this.channelGroup = new DefaultChannelGroup(this + "-channelGroup");
this.handler = new ClientHandler(this, this.channelGroup);
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {

@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("encoder", Encoder.getInstance());
pipeline.addLast("decoder", new Decoder());
pipeline.addLast("handler", handler);
return pipeline;
}
};

ClientBootstrap bootstrap = new ClientBootstrap(this.clientFactory);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
bootstrap.setPipelineFactory(pipelineFactory);


boolean connected = bootstrap.connect(new InetSocketAddress(host, port)).awaitUninterruptibly().isSuccess();
if (!connected) {
this.stop();
}

return connected;
}

public void stop() {
if (this.channelGroup != null) {
this.channelGroup.close();
}
if (this.clientFactory != null) {
this.clientFactory.releaseExternalResources();
}
}

private void flood() {
if ((this.channelGroup == null) || (this.clientFactory == null)) {
return;
}

this.startTime = System.currentTimeMillis();
for (int i = 0; i < this.messages; i++) {
this.handler.sendMessage(new Envelope(Version.VERSION1, Type.REQUEST, new byte[175]));
}
}

// main -----------------------------------------------------------------------------------------------------------

public static void main(String[] args) throws InterruptedException {
final Client client = new Client("localhost", 9999, 100000, 10);

if (!client.start()) {

System.exit(-1);
return; // not really needed...
}

System.out.println("Client started...");

client.flood();

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
client.stop();
}
});
}
}
@@ -0,0 +1,50 @@
package com.biasedbit.nettytutorials.customcodecs.client;

import com.biasedbit.nettytutorials.customcodecs.common.Envelope;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;

public class ClientHandler extends SimpleChannelUpstreamHandler {

// internal vars --------------------------------------------------------------------------------------------------

private final ClientHandlerListener listener;
private final ChannelGroup channelGroup;
private Channel channel;

// constructors ---------------------------------------------------------------------------------------------------

public ClientHandler(ClientHandlerListener listener, ChannelGroup channelGroup) {
this.listener = listener;
this.channelGroup = channelGroup;
}

// SimpleChannelUpstreamHandler -----------------------------------------------------------------------------------

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() instanceof Envelope) {
this.listener.messageReceived((Envelope) e.getMessage());
} else {
super.messageReceived(ctx, e);
}
}

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
this.channel = e.getChannel();
this.channelGroup.add(e.getChannel());
}

// public methods -------------------------------------------------------------------------------------------------

public void sendMessage(Envelope envelope) {
if (this.channel != null) {
this.channel.write(envelope);
}
}
}
@@ -0,0 +1,8 @@
package com.biasedbit.nettytutorials.customcodecs.client;

import com.biasedbit.nettytutorials.customcodecs.common.Envelope;

public interface ClientHandlerListener {

void messageReceived(Envelope message);
}

0 comments on commit cafb5d6

Please sign in to comment.