Skip to content

Commit

Permalink
Enable connection to Spock buckets (HELLO, SELECT_BUCKET)
Browse files Browse the repository at this point in the history
close #38
  • Loading branch information
amoudi87 authored and avsej committed May 8, 2017
1 parent 455e5c9 commit aa91229
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 29 deletions.
7 changes: 7 additions & 0 deletions src/main/java/com/couchbase/client/dcp/Client.java
Expand Up @@ -97,6 +97,7 @@ private Client(Builder builder) {
.setClusterAt(builder.clusterAt)
.setConnectionNameGenerator(builder.connectionNameGenerator)
.setBucket(builder.bucket)
.setUsername(builder.username == null ? builder.bucket : builder.username)
.setPassword(builder.password)
.setDcpControl(builder.dcpControl)
.setEventLoopGroup(eventLoopGroup, builder.eventLoopGroup == null)
Expand Down Expand Up @@ -719,6 +720,7 @@ public static class Builder {
private List<InetSocketAddress> clusterAt = Arrays.asList(InetSocketAddress.createUnresolved("127.0.0.1", 0));;
private EventLoopGroup eventLoopGroup;
private String bucket = "default";
private String username;
private String password = "";
private ConnectionNameGenerator connectionNameGenerator = DefaultConnectionNameGenerator.INSTANCE;
private DcpControl dcpControl = new DcpControl();
Expand Down Expand Up @@ -817,6 +819,11 @@ public Builder bucket(final String bucket) {
return this;
}

public Builder username(final String username){
this.username = username;
return this;
}

/**
* The password of the bucket to use.
*
Expand Down
Expand Up @@ -15,6 +15,11 @@
*/
package com.couchbase.client.dcp.config;

import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.couchbase.client.core.env.ConfigParserEnvironment;
import com.couchbase.client.core.env.CoreScheduler;
import com.couchbase.client.core.env.resources.NoOpShutdownHook;
Expand All @@ -33,6 +38,7 @@
import com.couchbase.client.deps.io.netty.channel.EventLoopGroup;
import com.couchbase.client.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.deps.io.netty.util.concurrent.GenericFutureListener;

import rx.Completable;
import rx.CompletableSubscriber;
import rx.Observable;
Expand All @@ -42,11 +48,6 @@
import rx.functions.Func1;
import rx.functions.Func2;

import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* The {@link ClientEnvironment} is responsible to carry various configuration and
* state information throughout the lifecycle.
Expand Down Expand Up @@ -84,7 +85,12 @@ public class ClientEnvironment implements SecureEnvironment, ConfigParserEnviron
private final String bucket;

/**
* The password of the bucket.
* The connecting username.
*/
private final String username;

/**
* The password of the username.
*/
private final String password;

Expand Down Expand Up @@ -180,6 +186,7 @@ public class ClientEnvironment implements SecureEnvironment, ConfigParserEnviron
private ClientEnvironment(final Builder builder) {
connectionNameGenerator = builder.connectionNameGenerator;
bucket = builder.bucket;
username = builder.username;
password = builder.password;
bootstrapTimeout = builder.bootstrapTimeout;
connectTimeout = builder.connectTimeout;
Expand Down Expand Up @@ -257,7 +264,14 @@ public String bucket() {
}

/**
* Password of the bucket used.
* Usrename used.
*/
public String username() {
return username;
}

/**
* Password of the user.
*/
public String password() {
return password;
Expand Down Expand Up @@ -422,6 +436,7 @@ public static class Builder {
private List<InetSocketAddress> clusterAt;
private ConnectionNameGenerator connectionNameGenerator;
private String bucket;
private String username;
private String password;
private long bootstrapTimeout = DEFAULT_BOOTSTRAP_TIMEOUT;
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
Expand Down Expand Up @@ -466,6 +481,11 @@ public Builder setBucket(String bucket) {
return this;
}

public Builder setUsername(String username){
this.username = username;
return this;
}

public Builder setPassword(String password) {
this.password = password;
return this;
Expand Down
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2017 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.couchbase.client.dcp.message;

import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;

public enum BucketSelectRequest {
;

public static void init(ByteBuf buffer, String bucket) {
MessageUtil.initRequest(MessageUtil.SELECT_BUCKET_OPCODE, buffer);
ByteBuf key = Unpooled.copiedBuffer(bucket, CharsetUtil.UTF_8);
MessageUtil.setKey(key, buffer);
key.release();
}
}
39 changes: 39 additions & 0 deletions src/main/java/com/couchbase/client/dcp/message/HelloRequest.java
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2017 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.couchbase.client.dcp.message;

import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;

public enum HelloRequest {
;

public static final short DATATYPE = 0x01;
public static final short TLS = 0x02;
public static final short TCPNODELAY = 0x03;
public static final short MUTATIONSEQ = 0x04;
public static final short TCPDELAY = 0x05;
public static final short XATTR = 0x06;
public static final short XERROR = 0x07;
public static final short SELECT = 0x08;
private static final ByteBuf VALUES = Unpooled.copyShort(XERROR, SELECT);

public static void init(ByteBuf buffer, ByteBuf connectionName) {
MessageUtil.initRequest(MessageUtil.HELLO_OPCODE, buffer);
MessageUtil.setKey(connectionName, buffer);
MessageUtil.setContent(VALUES, buffer);
}
}
Expand Up @@ -33,6 +33,8 @@ public enum MessageUtil {
public static final short OPAQUE_OFFSET = 12;
public static final short CAS_OFFSET = 16;

public static final byte VERSION_OPCODE = 0x0b;
public static final byte HELLO_OPCODE = 0x1f;
public static final byte SASL_LIST_MECHS_OPCODE = 0x20;
public static final byte SASL_AUTH_OPCODE = 0x21;
public static final byte SASL_STEP_OPCODE = 0x22;
Expand All @@ -52,6 +54,7 @@ public enum MessageUtil {
public static final byte DCP_NOOP_OPCODE = 0x5c;
public static final byte DCP_BUFFER_ACK_OPCODE = 0x5d;
public static final byte DCP_CONTROL_OPCODE = 0x5e;
public static final byte SELECT_BUCKET_OPCODE = (byte) 0x89;

public static final byte INTERNAL_ROLLBACK_OPCODE = 0x01;

Expand Down
26 changes: 26 additions & 0 deletions src/main/java/com/couchbase/client/dcp/message/VersionRequest.java
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2017 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.couchbase.client.dcp.message;

import com.couchbase.client.deps.io.netty.buffer.ByteBuf;

public enum VersionRequest {
;

public static void init(ByteBuf buffer) {
MessageUtil.initRequest(MessageUtil.VERSION_OPCODE, buffer);
}
}
Expand Up @@ -15,6 +15,8 @@
*/
package com.couchbase.client.dcp.transport.netty;

import java.net.InetSocketAddress;

import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
Expand All @@ -27,9 +29,8 @@
import com.couchbase.client.deps.io.netty.handler.logging.LogLevel;
import com.couchbase.client.deps.io.netty.handler.logging.LoggingHandler;
import com.couchbase.client.deps.io.netty.handler.ssl.SslHandler;
import rx.subjects.Subject;

import java.net.InetSocketAddress;
import rx.subjects.Subject;

/**
* Configures the pipeline for the HTTP config stream.
Expand All @@ -55,10 +56,15 @@ public class ConfigPipeline extends ChannelInitializer<Channel> {
private final InetSocketAddress hostname;

/**
* The name of the bucket (used for http auth).
* The name of the bucket
*/
private final String bucket;

/**
* The username (used for http auth).
*/
private final String username;

/**
* THe password of the bucket (used for http auth).
*/
Expand All @@ -80,6 +86,7 @@ public ConfigPipeline(final ClientEnvironment environment, final InetSocketAddre
final Subject<CouchbaseBucketConfig, CouchbaseBucketConfig> configStream) {
this.hostname = hostname;
this.bucket = environment.bucket();
this.username = environment.username();
this.password = environment.password();
this.configStream = configStream;
this.environment = environment;
Expand Down Expand Up @@ -110,7 +117,7 @@ protected void initChannel(final Channel ch) throws Exception {

pipeline
.addLast(new HttpClientCodec())
.addLast(new StartStreamHandler(bucket, password))
.addLast(new StartStreamHandler(bucket, username, password))
.addLast(new ConfigHandler(hostname, configStream, environment));
}

Expand Down

0 comments on commit aa91229

Please sign in to comment.