Skip to content

Commit

Permalink
Checkpoint - switch WebSocket over to new UpgradeProcessorInternal
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1662694 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
markt-asf committed Feb 27, 2015
1 parent b497aa9 commit 12d8024
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 133 deletions.
9 changes: 8 additions & 1 deletion java/org/apache/coyote/http11/AbstractHttp11Protocol.java
Expand Up @@ -23,7 +23,9 @@

import org.apache.coyote.AbstractProtocol;
import org.apache.coyote.Processor;
import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
import org.apache.coyote.http11.upgrade.UpgradeProcessorExternal;
import org.apache.coyote.http11.upgrade.UpgradeProcessorInternal;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.SocketWrapperBase;

Expand Down Expand Up @@ -283,7 +285,12 @@ protected Processor createUpgradeProcessor(
SocketWrapperBase<?> socket, ByteBuffer leftoverInput,
HttpUpgradeHandler httpUpgradeHandler)
throws IOException {
return new UpgradeProcessorExternal(socket, leftoverInput, httpUpgradeHandler);
if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) {
return new UpgradeProcessorInternal(socket, leftoverInput,
(InternalHttpUpgradeHandler) httpUpgradeHandler);
} else {
return new UpgradeProcessorExternal(socket, leftoverInput, httpUpgradeHandler);
}
}
}
}
Expand Up @@ -20,6 +20,7 @@

import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapperBase;


/**
Expand All @@ -34,4 +35,6 @@
public interface InternalHttpUpgradeHandler extends HttpUpgradeHandler {

SocketState upgradeDispatch(SocketStatus status);

void setSocketWrapper(SocketWrapperBase<?> wrapper);
}
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.Executor;

import javax.servlet.http.HttpUpgradeHandler;
import javax.servlet.http.WebConnection;

import org.apache.coyote.Processor;
import org.apache.coyote.Request;
Expand All @@ -29,7 +30,7 @@
import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapperBase;

public abstract class UpgradeProcessorBase implements Processor {
public abstract class UpgradeProcessorBase implements Processor, WebConnection {

public UpgradeProcessorBase(SocketWrapperBase<?> wrapper, ByteBuffer leftOverInput) {
wrapper.unRead(leftOverInput);
Expand Down
Expand Up @@ -22,7 +22,6 @@
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpUpgradeHandler;
import javax.servlet.http.WebConnection;

import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
Expand All @@ -31,7 +30,7 @@
import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.util.res.StringManager;

public class UpgradeProcessorExternal extends UpgradeProcessorBase implements WebConnection {
public class UpgradeProcessorExternal extends UpgradeProcessorBase {

private static final int INFINITE_TIMEOUT = -1;

Expand Down
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.coyote.http11.upgrade;

import java.io.IOException;
import java.nio.ByteBuffer;

import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;

import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapperBase;

public class UpgradeProcessorInternal extends UpgradeProcessorBase {

private final InternalHttpUpgradeHandler internalHttpUpgradeHandler;

public UpgradeProcessorInternal(SocketWrapperBase<?> wrapper, ByteBuffer leftOverInput,
InternalHttpUpgradeHandler internalHttpUpgradeHandler) {
super(wrapper, leftOverInput);
this.internalHttpUpgradeHandler = internalHttpUpgradeHandler;
internalHttpUpgradeHandler.setSocketWrapper(wrapper);
}


@Override
public SocketState upgradeDispatch(SocketStatus status) {
return internalHttpUpgradeHandler.upgradeDispatch(status);
}


// --------------------------------------------------- AutoCloseable methods

@Override
public void close() throws Exception {
internalHttpUpgradeHandler.destroy();
}


// --------------------------------------------------- WebConnection methods

@Override
public ServletInputStream getInputStream() throws IOException {
return null;
}

@Override
public ServletOutputStream getOutputStream() throws IOException {
return null;
}
}
15 changes: 7 additions & 8 deletions java/org/apache/tomcat/websocket/server/WsFrameServer.java
Expand Up @@ -18,22 +18,21 @@

import java.io.IOException;

import javax.servlet.ServletInputStream;

import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.websocket.Transformation;
import org.apache.tomcat.websocket.WsFrameBase;
import org.apache.tomcat.websocket.WsSession;

public class WsFrameServer extends WsFrameBase {

private final ServletInputStream sis;
private final SocketWrapperBase<?> socketWrapper;
private final Object connectionReadLock = new Object();


public WsFrameServer(ServletInputStream sis, WsSession wsSession,
public WsFrameServer(SocketWrapperBase<?> socketWrapper, WsSession wsSession,
Transformation transformation) {
super(wsSession, transformation);
this.sis = sis;
this.socketWrapper = socketWrapper;
}


Expand All @@ -45,10 +44,10 @@ public WsFrameServer(ServletInputStream sis, WsSession wsSession,
*/
public void onDataAvailable() throws IOException {
synchronized (connectionReadLock) {
while (isOpen() && sis.isReady()) {
while (isOpen() && socketWrapper.isReadyForRead()) {
// Fill up the input buffer with as much data as we can
int read = sis.read(
inputBuffer, writePos, inputBuffer.length - writePos);
int read = socketWrapper.read(
false, inputBuffer, writePos, inputBuffer.length - writePos);
if (read <= 0) {
return;
}
Expand Down

0 comments on commit 12d8024

Please sign in to comment.