Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

add autogenerated files

  • Loading branch information...
commit 379066bbec0f3c14f2f68c7069351a27287432b5 1 parent e5c3e5c
@acme authored
View
5,060 lib/Net/Cassandra/Backend/Cassandra.pm
5,060 additions, 0 deletions not shown
View
13 lib/Net/Cassandra/Backend/Constants.pm
@@ -0,0 +1,13 @@
+#
+# Autogenerated by Thrift
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+package Net::Cassandra::Backend::Constants;
+require 5.6.0;
+use strict;
+use warnings;
+use Net::Cassandra::Backend::Thrift;
+
+
+1;
View
177 lib/Net/Cassandra/Backend/Thrift.pm
@@ -0,0 +1,177 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+our $VERSION = '0.1';
+
+require 5.6.0;
+use strict;
+use warnings;
+
+#
+# Data types that can be sent via Thrift
+#
+package Net::Cassandra::Backend::TType;
+use constant STOP => 0;
+use constant VOID => 1;
+use constant BOOL => 2;
+use constant BYTE => 3;
+use constant I08 => 3;
+use constant DOUBLE => 4;
+use constant I16 => 6;
+use constant I32 => 8;
+use constant I64 => 10;
+use constant STRING => 11;
+use constant UTF7 => 11;
+use constant STRUCT => 12;
+use constant MAP => 13;
+use constant SET => 14;
+use constant LIST => 15;
+use constant UTF8 => 16;
+use constant UTF16 => 17;
+1;
+
+#
+# Message types for RPC
+#
+package Net::Cassandra::Backend::TMessageType;
+use constant CALL => 1;
+use constant REPLY => 2;
+use constant EXCEPTION => 3;
+use constant ONEWAY => 4;
+1;
+
+package Net::Cassandra::Backend::Thrift::TException;
+
+sub new {
+ my $classname = shift;
+ my $self = {message => shift, code => shift || 0};
+
+ return bless($self,$classname);
+}
+1;
+
+package Net::Cassandra::Backend::TApplicationException;
+use base('Net::Cassandra::Backend::Thrift::TException');
+
+use constant UNKNOWN => 0;
+use constant UNKNOWN_METHOD => 1;
+use constant INVALID_MESSAGE_TYPE => 2;
+use constant WRONG_METHOD_NAME => 3;
+use constant BAD_SEQUENCE_ID => 4;
+use constant MISSING_RESULT => 5;
+
+sub new {
+ my $classname = shift;
+
+ my $self = $classname->SUPER::new();
+
+ return bless($self,$classname);
+}
+
+sub read {
+ my $self = shift;
+ my $input = shift;
+
+ my $xfer = 0;
+ my $fname = undef;
+ my $ftype = 0;
+ my $fid = 0;
+
+ $xfer += $input->readStructBegin($fname);
+
+ while (1)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == Net::Cassandra::Backend::TType::STOP) {
+ last; next;
+ }
+
+ SWITCH: for($fid)
+ {
+ /1/ && do{
+
+ if ($ftype == Net::Cassandra::Backend::TType::STRING) {
+ $xfer += $input->readString($self->{message});
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+
+ last;
+ };
+
+ /2/ && do{
+ if ($ftype == Net::Cassandra::Backend::TType::I32) {
+ $xfer += $input->readI32($self->{code});
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ last;
+ };
+
+ $xfer += $input->skip($ftype);
+ }
+
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+
+ return $xfer;
+}
+
+sub write {
+ my $self = shift;
+ my $output = shift;
+
+ my $xfer = 0;
+
+ $xfer += $output->writeStructBegin('Net::Cassandra::Backend::TApplicationException');
+
+ if ($self->getMessage()) {
+ $xfer += $output->writeFieldBegin('message', Net::Cassandra::Backend::TType::STRING, 1);
+ $xfer += $output->writeString($self->getMessage());
+ $xfer += $output->writeFieldEnd();
+ }
+
+ if ($self->getCode()) {
+ $xfer += $output->writeFieldBegin('type', Net::Cassandra::Backend::TType::I32, 2);
+ $xfer += $output->writeI32($self->getCode());
+ $xfer += $output->writeFieldEnd();
+ }
+
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+
+ return $xfer;
+}
+
+sub getMessage
+{
+ my $self = shift;
+
+ return $self->{message};
+}
+
+sub getCode
+{
+ my $self = shift;
+
+ return $self->{code};
+}
+
+1;
View
498 lib/Net/Cassandra/Backend/Thrift/BinaryProtocol.pm
@@ -0,0 +1,498 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 5.6.0;
+
+use strict;
+use warnings;
+
+use utf8;
+use Encode;
+
+use Net::Cassandra::Backend::Thrift;
+use Net::Cassandra::Backend::Thrift::Protocol;
+
+use Bit::Vector;
+
+#
+# Binary implementation of the Thrift protocol.
+#
+package Net::Cassandra::Backend::Thrift::BinaryProtocol;
+use base('Net::Cassandra::Backend::Thrift::Protocol');
+
+use constant VERSION_MASK => 0xffff0000;
+use constant VERSION_1 => 0x80010000;
+
+sub new
+{
+ my $classname = shift;
+ my $trans = shift;
+ my $self = $classname->SUPER::new($trans);
+
+ return bless($self,$classname);
+}
+
+sub writeMessageBegin
+{
+ my $self = shift;
+ my ($name, $type, $seqid) = @_;
+
+ return
+ $self->writeI32(VERSION_1 | $type) +
+ $self->writeString($name) +
+ $self->writeI32($seqid);
+}
+
+sub writeMessageEnd
+{
+ my $self = shift;
+ return 0;
+}
+
+sub writeStructBegin{
+ my $self = shift;
+ my $name = shift;
+ return 0;
+}
+
+sub writeStructEnd
+{
+ my $self = shift;
+ return 0;
+}
+
+sub writeFieldBegin
+{
+ my $self = shift;
+ my ($fieldName, $fieldType, $fieldId) = @_;
+
+ return
+ $self->writeByte($fieldType) +
+ $self->writeI16($fieldId);
+}
+
+sub writeFieldEnd
+{
+ my $self = shift;
+ return 0;
+}
+
+sub writeFieldStop
+{
+ my $self = shift;
+ return $self->writeByte(Net::Cassandra::Backend::TType::STOP);
+}
+
+sub writeMapBegin
+{
+ my $self = shift;
+ my ($keyType, $valType, $size) = @_;
+
+ return
+ $self->writeByte($keyType) +
+ $self->writeByte($valType) +
+ $self->writeI32($size);
+}
+
+sub writeMapEnd
+{
+ my $self = shift;
+ return 0;
+}
+
+sub writeListBegin
+{
+ my $self = shift;
+ my ($elemType, $size) = @_;
+
+ return
+ $self->writeByte($elemType) +
+ $self->writeI32($size);
+}
+
+sub writeListEnd
+{
+ my $self = shift;
+ return 0;
+}
+
+sub writeSetBegin
+{
+ my $self = shift;
+ my ($elemType, $size) = @_;
+
+ return
+ $self->writeByte($elemType) +
+ $self->writeI32($size);
+}
+
+sub writeSetEnd
+{
+ my $self = shift;
+ return 0;
+}
+
+sub writeBool
+{
+ my $self = shift;
+ my $value = shift;
+
+ my $data = pack('c', $value ? 1 : 0);
+ $self->{trans}->write($data, 1);
+ return 1;
+}
+
+sub writeByte
+{
+ my $self = shift;
+ my $value= shift;
+
+ my $data = pack('c', $value);
+ $self->{trans}->write($data, 1);
+ return 1;
+}
+
+sub writeI16
+{
+ my $self = shift;
+ my $value= shift;
+
+ my $data = pack('n', $value);
+ $self->{trans}->write($data, 2);
+ return 2;
+}
+
+sub writeI32
+{
+ my $self = shift;
+ my $value= shift;
+
+ my $data = pack('N', $value);
+ $self->{trans}->write($data, 4);
+ return 4;
+}
+
+sub writeI64
+{
+ my $self = shift;
+ my $value= shift;
+ my $data;
+
+ my $vec;
+ #stop annoying error
+ $vec = Bit::Vector->new_Dec(64, $value);
+ $data = pack 'NN', $vec->Chunk_Read(32, 32), $vec->Chunk_Read(32, 0);
+
+ $self->{trans}->write($data, 8);
+
+ return 8;
+}
+
+
+sub writeDouble
+{
+ my $self = shift;
+ my $value= shift;
+
+ my $data = pack('d', $value);
+ $self->{trans}->write(scalar reverse($data), 8);
+ return 8;
+}
+
+sub writeString{
+ my $self = shift;
+ my $value= shift;
+
+ if( utf8::is_utf8($value) ){
+ $value = Encode::encode_utf8($value);
+ }
+
+ my $len = length($value);
+
+ my $result = $self->writeI32($len);
+
+ if ($len) {
+ $self->{trans}->write($value,$len);
+ }
+ return $result + $len;
+ }
+
+
+#
+#All references
+#
+sub readMessageBegin
+{
+ my $self = shift;
+ my ($name, $type, $seqid) = @_;
+
+ my $version = 0;
+ my $result = $self->readI32(\$version);
+ if (($version & VERSION_MASK) > 0) {
+ if (($version & VERSION_MASK) != VERSION_1) {
+ die new Net::Cassandra::Backend::Thrift::TException('Missing version identifier')
+ }
+ $$type = $version & 0x000000ff;
+ return
+ $result +
+ $self->readString($name) +
+ $self->readI32($seqid);
+ } else { # old client support code
+ return
+ $result +
+ $self->readStringBody($name, $version) + # version here holds the size of the string
+ $self->readByte($type) +
+ $self->readI32($seqid);
+ }
+}
+
+sub readMessageEnd
+{
+ my $self = shift;
+ return 0;
+}
+
+sub readStructBegin
+{
+ my $self = shift;
+ my $name = shift;
+
+ $$name = '';
+
+ return 0;
+}
+
+sub readStructEnd
+{
+ my $self = shift;
+ return 0;
+}
+
+sub readFieldBegin
+{
+ my $self = shift;
+ my ($name, $fieldType, $fieldId) = @_;
+
+ my $result = $self->readByte($fieldType);
+
+ if ($$fieldType == Net::Cassandra::Backend::TType::STOP) {
+ $$fieldId = 0;
+ return $result;
+ }
+
+ $result += $self->readI16($fieldId);
+
+ return $result;
+}
+
+sub readFieldEnd() {
+ my $self = shift;
+ return 0;
+}
+
+sub readMapBegin
+{
+ my $self = shift;
+ my ($keyType, $valType, $size) = @_;
+
+ return
+ $self->readByte($keyType) +
+ $self->readByte($valType) +
+ $self->readI32($size);
+}
+
+sub readMapEnd()
+{
+ my $self = shift;
+ return 0;
+}
+
+sub readListBegin
+{
+ my $self = shift;
+ my ($elemType, $size) = @_;
+
+ return
+ $self->readByte($elemType) +
+ $self->readI32($size);
+}
+
+sub readListEnd
+{
+ my $self = shift;
+ return 0;
+}
+
+sub readSetBegin
+{
+ my $self = shift;
+ my ($elemType, $size) = @_;
+
+ return
+ $self->readByte($elemType) +
+ $self->readI32($size);
+}
+
+sub readSetEnd
+{
+ my $self = shift;
+ return 0;
+}
+
+sub readBool
+{
+ my $self = shift;
+ my $value = shift;
+
+ my $data = $self->{trans}->readAll(1);
+ my @arr = unpack('c', $data);
+ $$value = $arr[0] == 1;
+ return 1;
+}
+
+sub readByte
+{
+ my $self = shift;
+ my $value = shift;
+
+ my $data = $self->{trans}->readAll(1);
+ my @arr = unpack('c', $data);
+ $$value = $arr[0];
+ return 1;
+}
+
+sub readI16
+{
+ my $self = shift;
+ my $value = shift;
+
+ my $data = $self->{trans}->readAll(2);
+
+ my @arr = unpack('n', $data);
+
+ $$value = $arr[0];
+
+ if ($$value > 0x7fff) {
+ $$value = 0 - (($$value - 1) ^ 0xffff);
+ }
+
+ return 2;
+}
+
+sub readI32
+{
+ my $self = shift;
+ my $value= shift;
+
+ my $data = $self->{trans}->readAll(4);
+ my @arr = unpack('N', $data);
+
+ $$value = $arr[0];
+ if ($$value > 0x7fffffff) {
+ $$value = 0 - (($$value - 1) ^ 0xffffffff);
+ }
+ return 4;
+}
+
+sub readI64
+{
+ my $self = shift;
+ my $value = shift;
+
+ my $data = $self->{trans}->readAll(8);
+
+ my ($hi,$lo)=unpack('NN',$data);
+
+ my $vec = new Bit::Vector(64);
+
+ $vec->Chunk_Store(32,32,$hi);
+ $vec->Chunk_Store(32,0,$lo);
+
+ $$value = $vec->to_Dec();
+
+ return 8;
+}
+
+sub readDouble
+{
+ my $self = shift;
+ my $value = shift;
+
+ my $data = scalar reverse($self->{trans}->readAll(8));
+ my @arr = unpack('d', $data);
+
+ $$value = $arr[0];
+
+ return 8;
+}
+
+sub readString
+{
+ my $self = shift;
+ my $value = shift;
+
+ my $len;
+ my $result = $self->readI32(\$len);
+
+ if ($len) {
+ $$value = $self->{trans}->readAll($len);
+ } else {
+ $$value = '';
+ }
+
+ return $result + $len;
+}
+
+sub readStringBody
+{
+ my $self = shift;
+ my $value = shift;
+ my $len = shift;
+
+ if ($len) {
+ $$value = $self->{trans}->readAll($len);
+ } else {
+ $$value = '';
+ }
+
+ return $len;
+}
+
+#
+# Binary Protocol Factory
+#
+package Net::Cassandra::Backend::TBinaryProtocolFactory;
+use base('Net::Cassandra::Backend::TProtocolFactory');
+
+sub new
+{
+ my $classname = shift;
+ my $self = $classname->SUPER::new();
+
+ return bless($self,$classname);
+}
+
+sub getProtocol{
+ my $self = shift;
+ my $trans = shift;
+
+ return new TBinaryProtocol($trans);
+}
+
+1;
View
109 lib/Net/Cassandra/Backend/Thrift/BufferedTransport.pm
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 5.6.0;
+use strict;
+use warnings;
+
+use Net::Cassandra::Backend::Thrift;
+use Net::Cassandra::Backend::Thrift::Transport;
+
+package Net::Cassandra::Backend::Thrift::BufferedTransport;
+use base('Net::Cassandra::Backend::Thrift::Transport');
+
+sub new
+{
+ my $classname = shift;
+ my $transport = shift;
+ my $rBufSize = shift || 512;
+ my $wBufSize = shift || 512;
+
+ my $self = {
+ transport => $transport,
+ rBufSize => $rBufSize,
+ wBufSize => $wBufSize,
+ wBuf => '',
+ rBuf => '',
+ };
+
+ return bless($self,$classname);
+}
+
+sub isOpen
+{
+ my $self = shift;
+
+ return $self->{transport}->isOpen();
+}
+
+sub open
+{
+ my $self = shift;
+ $self->{transport}->open();
+}
+
+sub close()
+{
+ my $self = shift;
+ $self->{transport}->close();
+}
+
+sub readAll
+{
+ my $self = shift;
+ my $len = shift;
+
+ return $self->{transport}->readAll($len);
+}
+
+sub read
+{
+ my $self = shift;
+ my $len = shift;
+ my $ret;
+
+ # Methinks Perl is already buffering these for us
+ return $self->{transport}->read($len);
+}
+
+sub write
+{
+ my $self = shift;
+ my $buf = shift;
+
+ $self->{wBuf} .= $buf;
+ if (length($self->{wBuf}) >= $self->{wBufSize}) {
+ $self->{transport}->write($self->{wBuf});
+ $self->{wBuf} = '';
+ }
+}
+
+sub flush
+{
+ my $self = shift;
+
+ if (length($self->{wBuf}) > 0) {
+ $self->{transport}->write($self->{wBuf});
+ $self->{wBuf} = '';
+ }
+ $self->{transport}->flush();
+}
+
+
+1;
View
164 lib/Net/Cassandra/Backend/Thrift/FramedTransport.pm
@@ -0,0 +1,164 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+use strict;
+use warnings;
+
+use Net::Cassandra::Backend::Thrift;
+use Net::Cassandra::Backend::Thrift::Transport;
+
+#
+# Framed transport. Writes and reads data in chunks that are stamped with
+# their length.
+#
+# @package thrift.transport
+#
+package Net::Cassandra::Backend::Thrift::FramedTransport;
+
+use base('Net::Cassandra::Backend::Thrift::Transport');
+
+sub new
+{
+ my $classname = shift;
+ my $transport = shift;
+ my $read = shift || 1;
+ my $write = shift || 1;
+
+ my $self = {
+ transport => $transport,
+ read => $read,
+ write => $write,
+ wBuf => '',
+ rBuf => '',
+ };
+
+ return bless($self,$classname);
+}
+
+sub isOpen
+{
+ my $self = shift;
+ return $self->{transport}->isOpen();
+}
+
+sub open
+{
+ my $self = shift;
+
+ $self->{transport}->open();
+}
+
+sub close
+{
+ my $self = shift;
+
+ $self->{transport}->close();
+}
+
+#
+# Reads from the buffer. When more data is required reads another entire
+# chunk and serves future reads out of that.
+#
+# @param int $len How much data
+#
+sub read
+{
+
+ my $self = shift;
+ my $len = shift;
+
+ if (!$self->{read}) {
+ return $self->{transport}->read($len);
+ }
+
+ if (length($self->{rBuf}) == 0) {
+ $self->_readFrame();
+ }
+
+
+ # Just return full buff
+ if ($len > length($self->{rBuf})) {
+ my $out = $self->{rBuf};
+ $self->{rBuf} = '';
+ return $out;
+ }
+
+ # Return substr
+ my $out = substr($self->{rBuf}, 0, $len);
+ $self->{rBuf} = substr($self->{rBuf}, $len);
+ return $out;
+}
+
+#
+# Reads a chunk of data into the internal read buffer.
+# (private)
+sub _readFrame
+{
+ my $self = shift;
+ my $buf = $self->{transport}->readAll(4);
+ my @val = unpack('N', $buf);
+ my $sz = $val[0];
+
+ $self->{rBuf} = $self->{transport}->readAll($sz);
+}
+
+#
+# Writes some data to the pending output buffer.
+#
+# @param string $buf The data
+# @param int $len Limit of bytes to write
+#
+sub write
+{
+ my $self = shift;
+ my $buf = shift;
+ my $len = shift;
+
+ unless($self->{write}) {
+ return $self->{transport}->write($buf, $len);
+ }
+
+ if ( defined $len && $len < length($buf)) {
+ $buf = substr($buf, 0, $len);
+ }
+
+ $self->{wBuf} .= $buf;
+ }
+
+#
+# Writes the output buffer to the stream in the format of a 4-byte length
+# followed by the actual data.
+#
+sub flush
+{
+ my $self = shift;
+
+ unless ($self->{write}) {
+ return $self->{transport}->flush();
+ }
+
+ my $out = pack('N', length($self->{wBuf}));
+ $out .= $self->{wBuf};
+ $self->{transport}->write($out);
+ $self->{transport}->flush();
+ $self->{wBuf} = '';
+
+}
+
+1;
View
200 lib/Net/Cassandra/Backend/Thrift/HttpClient.pm
@@ -0,0 +1,200 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 5.6.0;
+use strict;
+use warnings;
+
+use Net::Cassandra::Backend::Thrift;
+use Net::Cassandra::Backend::Thrift::Transport;
+
+use HTTP::Request;
+use LWP::UserAgent;
+use IO::String;
+
+package Net::Cassandra::Backend::Thrift::HttpClient;
+
+use base('Net::Cassandra::Backend::Thrift::Transport');
+
+sub new
+{
+ my $classname = shift;
+ my $url = shift || 'http://localhost:9090';
+ my $debugHandler = shift;
+
+ my $out = IO::String->new;
+ binmode($out);
+
+ my $self = {
+ url => $url,
+ out => $out,
+ debugHandler => $debugHandler,
+ debug => 0,
+ sendTimeout => 100,
+ recvTimeout => 750,
+ handle => undef,
+ };
+
+ return bless($self,$classname);
+}
+
+sub setSendTimeout
+{
+ my $self = shift;
+ my $timeout = shift;
+
+ $self->{sendTimeout} = $timeout;
+}
+
+sub setRecvTimeout
+{
+ my $self = shift;
+ my $timeout = shift;
+
+ $self->{recvTimeout} = $timeout;
+}
+
+
+#
+#Sets debugging output on or off
+#
+# @param bool $debug
+#
+sub setDebug
+{
+ my $self = shift;
+ my $debug = shift;
+
+ $self->{debug} = $debug;
+}
+
+#
+# Tests whether this is open
+#
+# @return bool true if the socket is open
+#
+sub isOpen
+{
+ return 1;
+}
+
+sub open {}
+
+#
+# Cleans up the buffer.
+#
+sub close
+{
+ my $self = shift;
+ if (defined($self->{io})) {
+ close($self->{io});
+ $self->{io} = undef;
+ }
+}
+
+#
+# Guarantees that the full amount of data is read.
+#
+# @return string The data, of exact length
+# @throws TTransportException if cannot read data
+#
+sub readAll
+{
+ my $self = shift;
+ my $len = shift;
+
+ my $buf = $self->read($len);
+
+ if (!defined($buf)) {
+ die new Net::Cassandra::Backend::Thrift::TException('TSocket: Could not read '.$len.' bytes from input buffer');
+ }
+ return $buf;
+}
+
+#
+# Read and return string
+#
+sub read
+{
+ my $self = shift;
+ my $len = shift;
+
+ my $buf;
+
+ my $in = $self->{in};
+
+ if (!defined($in)) {
+ die new Net::Cassandra::Backend::Thrift::TException("Response buffer is empty, no request.");
+ }
+ eval {
+ my $ret = sysread($in, $buf, $len);
+ if (! defined($ret)) {
+ die new Net::Cassandra::Backend::Thrift::TException("No more data available.");
+ }
+ }; if($@){
+ die new Net::Cassandra::Backend::Thrift::TException($@);
+ }
+
+ return $buf;
+}
+
+#
+# Write string
+#
+sub write
+{
+ my $self = shift;
+ my $buf = shift;
+ $self->{out}->print($buf);
+}
+
+#
+# Flush output (do the actual HTTP/HTTPS request)
+#
+sub flush
+{
+ my $self = shift;
+
+ my $ua = LWP::UserAgent->new('timeout' => ($self->{sendTimeout} / 1000),
+ 'agent' => 'Perl/THttpClient'
+ );
+ $ua->default_header('Accept' => 'application/x-thrift');
+ $ua->default_header('Content-Type' => 'application/x-thrift');
+ $ua->cookie_jar({}); # hash to remember cookies between redirects
+
+ my $out = $self->{out};
+ $out->setpos(0); # rewind
+ my $buf = join('', <$out>);
+
+ my $request = new HTTP::Request(POST => $self->{url}, undef, $buf);
+ my $response = $ua->request($request);
+ my $content_ref = $response->content_ref;
+
+ my $in = IO::String->new($content_ref);
+ binmode($in);
+ $self->{in} = $in;
+ $in->setpos(0); # rewind
+
+ # reset write buffer
+ $out = IO::String->new;
+ binmode($out);
+ $self->{out} = $out;
+}
+
+1;
View
126 lib/Net/Cassandra/Backend/Thrift/MemoryBuffer.pm
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 5.6.0;
+use strict;
+use warnings;
+
+use Net::Cassandra::Backend::Thrift;
+use Net::Cassandra::Backend::Thrift::Transport;
+
+package Net::Cassandra::Backend::Thrift::MemoryBuffer;
+use base('Net::Cassandra::Backend::Thrift::Transport');
+
+sub new
+{
+ my $classname = shift;
+
+ my $bufferSize= shift || 1024;
+
+ my $self = {
+ buffer => '',
+ bufferSize=> $bufferSize,
+ wPos => 0,
+ rPos => 0,
+ };
+
+ return bless($self,$classname);
+}
+
+sub isOpen
+{
+ return 1;
+}
+
+sub open
+{
+
+}
+
+sub close
+{
+
+}
+
+sub peek
+{
+ my $self = shift;
+ return($self->{rPos} < $self->{wPos});
+}
+
+
+sub getBuffer
+{
+ my $self = shift;
+ return $self->{buffer};
+}
+
+sub resetBuffer
+{
+ my $self = shift;
+
+ my $new_buffer = shift || '';
+
+ $self->{buffer} = $new_buffer;
+ $self->{bufferSize} = length($new_buffer);
+ $self->{wPos} = length($new_buffer);
+ $self->{rPos} = 0;
+}
+
+sub available
+{
+ my $self = shift;
+ return ($self->{wPos} - $self->{rPos});
+}
+
+sub read
+{
+ my $self = shift;
+ my $len = shift;
+ my $ret;
+
+ my $avail = ($self->{wPos} - $self->{rPos});
+ return '' if $avail == 0;
+
+ #how much to give
+ my $give = $len;
+ $give = $avail if $avail < $len;
+
+ $ret = substr($self->{buffer},$self->{rPos},$give);
+
+ $self->{rPos} += $give;
+
+ return $ret;
+}
+
+sub write
+{
+ my $self = shift;
+ my $buf = shift;
+
+ $self->{buffer} .= $buf;
+ $self->{wPos} += length($buf);
+}
+
+sub flush
+{
+
+}
+
+1;
View
543 lib/Net/Cassandra/Backend/Thrift/Protocol.pm
@@ -0,0 +1,543 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 5.6.0;
+use strict;
+use warnings;
+
+use Net::Cassandra::Backend::Thrift;
+
+#
+# Protocol exceptions
+#
+package Net::Cassandra::Backend::TProtocolException;
+use base('Net::Cassandra::Backend::Thrift::TException');
+
+use constant UNKNOWN => 0;
+use constant INVALID_DATA => 1;
+use constant NEGATIVE_SIZE => 2;
+use constant SIZE_LIMIT => 3;
+use constant BAD_VERSION => 4;
+
+sub new {
+ my $classname = shift;
+
+ my $self = $classname->SUPER::new();
+
+ return bless($self,$classname);
+}
+
+#
+# Protocol base class module.
+#
+package Net::Cassandra::Backend::Thrift::Protocol;
+
+sub new {
+ my $classname = shift;
+ my $self = {};
+
+ my $trans = shift;
+ $self->{trans}= $trans;
+
+ return bless($self,$classname);
+}
+
+sub getTransport
+{
+ my $self = shift;
+
+ return $self->{trans};
+}
+
+#
+# Writes the message header
+#
+# @param string $name Function name
+# @param int $type message type TMessageType::CALL or TMessageType::REPLY
+# @param int $seqid The sequence id of this message
+#
+sub writeMessageBegin
+{
+ my ($name, $type, $seqid);
+ die "abstract";
+}
+
+#
+# Close the message
+#
+sub writeMessageEnd {
+ die "abstract";
+}
+
+#
+# Writes a struct header.
+#
+# @param string $name Struct name
+# @throws TException on write error
+# @return int How many bytes written
+#
+sub writeStructBegin {
+ my ($name);
+
+ die "abstract";
+}
+
+#
+# Close a struct.
+#
+# @throws TException on write error
+# @return int How many bytes written
+#
+sub writeStructEnd {
+ die "abstract";
+}
+
+#
+# Starts a field.
+#
+# @param string $name Field name
+# @param int $type Field type
+# @param int $fid Field id
+# @throws TException on write error
+# @return int How many bytes written
+#
+sub writeFieldBegin {
+ my ($fieldName, $fieldType, $fieldId);
+
+ die "abstract";
+}
+
+sub writeFieldEnd {
+ die "abstract";
+}
+
+sub writeFieldStop {
+ die "abstract";
+}
+
+sub writeMapBegin {
+ my ($keyType, $valType, $size);
+
+ die "abstract";
+}
+
+sub writeMapEnd {
+ die "abstract";
+}
+
+sub writeListBegin {
+ my ($elemType, $size);
+ die "abstract";
+}
+
+sub writeListEnd {
+ die "abstract";
+}
+
+sub writeSetBegin {
+ my ($elemType, $size);
+ die "abstract";
+}
+
+sub writeSetEnd {
+ die "abstract";
+}
+
+sub writeBool {
+ my ($bool);
+ die "abstract";
+}
+
+sub writeByte {
+ my ($byte);
+ die "abstract";
+}
+
+sub writeI16 {
+ my ($i16);
+ die "abstract";
+}
+
+sub writeI32 {
+ my ($i32);
+ die "abstract";
+}
+
+sub writeI64 {
+ my ($i64);
+ die "abstract";
+}
+
+sub writeDouble {
+ my ($dub);
+ die "abstract";
+}
+
+sub writeString
+{
+ my ($str);
+ die "abstract";
+}
+
+#
+# Reads the message header
+#
+# @param string $name Function name
+# @param int $type message type TMessageType::CALL or TMessageType::REPLY
+# @parem int $seqid The sequence id of this message
+#
+sub readMessageBegin
+{
+ my ($name, $type, $seqid);
+ die "abstract";
+}
+
+#
+# Read the close of message
+#
+sub readMessageEnd
+{
+ die "abstract";
+}
+
+sub readStructBegin
+{
+ my($name);
+
+ die "abstract";
+}
+
+sub readStructEnd
+{
+ die "abstract";
+}
+
+sub readFieldBegin
+{
+ my ($name, $fieldType, $fieldId);
+ die "abstract";
+}
+
+sub readFieldEnd
+{
+ die "abstract";
+}
+
+sub readMapBegin
+{
+ my ($keyType, $valType, $size);
+ die "abstract";
+}
+
+sub readMapEnd
+{
+ die "abstract";
+}
+
+sub readListBegin
+{
+ my ($elemType, $size);
+ die "abstract";
+}
+
+sub readListEnd
+{
+ die "abstract";
+}
+
+sub readSetBegin
+{
+ my ($elemType, $size);
+ die "abstract";
+}
+
+sub readSetEnd
+{
+ die "abstract";
+}
+
+sub readBool
+{
+ my ($bool);
+ die "abstract";
+}
+
+sub readByte
+{
+ my ($byte);
+ die "abstract";
+}
+
+sub readI16
+{
+ my ($i16);
+ die "abstract";
+}
+
+sub readI32
+{
+ my ($i32);
+ die "abstract";
+}
+
+sub readI64
+{
+ my ($i64);
+ die "abstract";
+}
+
+sub readDouble
+{
+ my ($dub);
+ die "abstract";
+}
+
+sub readString
+{
+ my ($str);
+ die "abstract";
+}
+
+#
+# The skip function is a utility to parse over unrecognized data without
+# causing corruption.
+#
+# @param TType $type What type is it
+#
+sub skip
+{
+ my $self = shift;
+ my $type = shift;
+
+ my $ref;
+ my $result;
+ my $i;
+
+ if($type == Net::Cassandra::Backend::TType::BOOL)
+ {
+ return $self->readBool(\$ref);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::BYTE){
+ return $self->readByte(\$ref);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::I16){
+ return $self->readI16(\$ref);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::I32){
+ return $self->readI32(\$ref);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::I64){
+ return $self->readI64(\$ref);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::DOUBLE){
+ return $self->readDouble(\$ref);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::STRING)
+ {
+ return $self->readString(\$ref);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::STRUCT)
+ {
+ $result = $self->readStructBegin(\$ref);
+ while (1) {
+ my ($ftype,$fid);
+ $result += $self->readFieldBegin(\$ref, \$ftype, \$fid);
+ if ($ftype == Net::Cassandra::Backend::TType::STOP) {
+ last;
+ }
+ $result += $self->skip($ftype);
+ $result += $self->readFieldEnd();
+ }
+ $result += $self->readStructEnd();
+ return $result;
+ }
+ elsif($type == Net::Cassandra::Backend::TType::MAP)
+ {
+ my($keyType,$valType,$size);
+ $result = $self->readMapBegin(\$keyType, \$valType, \$size);
+ for ($i = 0; $i < $size; $i++) {
+ $result += $self->skip($keyType);
+ $result += $self->skip($valType);
+ }
+ $result += $self->readMapEnd();
+ return $result;
+ }
+ elsif($type == Net::Cassandra::Backend::TType::SET)
+ {
+ my ($elemType,$size);
+ $result = $self->readSetBegin(\$elemType, \$size);
+ for ($i = 0; $i < $size; $i++) {
+ $result += $self->skip($elemType);
+ }
+ $result += $self->readSetEnd();
+ return $result;
+ }
+ elsif($type == Net::Cassandra::Backend::TType::LIST)
+ {
+ my ($elemType,$size);
+ $result = $self->readListBegin(\$elemType, \$size);
+ for ($i = 0; $i < $size; $i++) {
+ $result += $self->skip($elemType);
+ }
+ $result += $self->readListEnd();
+ return $result;
+ }
+
+
+ return 0;
+
+ }
+
+#
+# Utility for skipping binary data
+#
+# @param TTransport $itrans TTransport object
+# @param int $type Field type
+#
+sub skipBinary
+{
+ my $self = shift;
+ my $itrans = shift;
+ my $type = shift;
+
+ if($type == Net::Cassandra::Backend::TType::BOOL)
+ {
+ return $itrans->readAll(1);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::BYTE)
+ {
+ return $itrans->readAll(1);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::I16)
+ {
+ return $itrans->readAll(2);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::I32)
+ {
+ return $itrans->readAll(4);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::I64)
+ {
+ return $itrans->readAll(8);
+ }
+ elsif($type == Net::Cassandra::Backend::TType::DOUBLE)
+ {
+ return $itrans->readAll(8);
+ }
+ elsif( $type == Net::Cassandra::Backend::TType::STRING )
+ {
+ my @len = unpack('N', $itrans->readAll(4));
+ my $len = $len[0];
+ if ($len > 0x7fffffff) {
+ $len = 0 - (($len - 1) ^ 0xffffffff);
+ }
+ return 4 + $itrans->readAll($len);
+ }
+ elsif( $type == Net::Cassandra::Backend::TType::STRUCT )
+ {
+ my $result = 0;
+ while (1) {
+ my $ftype = 0;
+ my $fid = 0;
+ my $data = $itrans->readAll(1);
+ my @arr = unpack('c', $data);
+ $ftype = $arr[0];
+ if ($ftype == Net::Cassandra::Backend::TType::STOP) {
+ last;
+ }
+ # I16 field id
+ $result += $itrans->readAll(2);
+ $result += $self->skipBinary($itrans, $ftype);
+ }
+ return $result;
+ }
+ elsif($type == Net::Cassandra::Backend::TType::MAP)
+ {
+ # Ktype
+ my $data = $itrans->readAll(1);
+ my @arr = unpack('c', $data);
+ my $ktype = $arr[0];
+ # Vtype
+ $data = $itrans->readAll(1);
+ @arr = unpack('c', $data);
+ my $vtype = $arr[0];
+ # Size
+ $data = $itrans->readAll(4);
+ @arr = unpack('N', $data);
+ my $size = $arr[0];
+ if ($size > 0x7fffffff) {
+ $size = 0 - (($size - 1) ^ 0xffffffff);
+ }
+ my $result = 6;
+ for (my $i = 0; $i < $size; $i++) {
+ $result += $self->skipBinary($itrans, $ktype);
+ $result += $self->skipBinary($itrans, $vtype);
+ }
+ return $result;
+ }
+ elsif($type == Net::Cassandra::Backend::TType::SET || $type == Net::Cassandra::Backend::TType::LIST)
+ {
+ # Vtype
+ my $data = $itrans->readAll(1);
+ my @arr = unpack('c', $data);
+ my $vtype = $arr[0];
+ # Size
+ $data = $itrans->readAll(4);
+ @arr = unpack('N', $data);
+ my $size = $arr[0];
+ if ($size > 0x7fffffff) {
+ $size = 0 - (($size - 1) ^ 0xffffffff);
+ }
+ my $result = 5;
+ for (my $i = 0; $i < $size; $i++) {
+ $result += $self->skipBinary($itrans, $vtype);
+ }
+ return $result;
+ }
+
+ return 0;
+
+}
+
+#
+# Protocol factory creates protocol objects from transports
+#
+package Net::Cassandra::Backend::TProtocolFactory;
+
+
+sub new {
+ my $classname = shift;
+ my $self = {};
+
+ return bless($self,$classname);
+}
+
+#
+# Build a protocol from the base transport
+#
+# @return TProtcol protocol
+#
+sub getProtocol
+{
+ my ($trans);
+ die "interface";
+}
+
+
+1;
View
271 lib/Net/Cassandra/Backend/Thrift/Socket.pm
@@ -0,0 +1,271 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 5.6.0;
+use strict;
+use warnings;
+
+use Net::Cassandra::Backend::Thrift;
+use Net::Cassandra::Backend::Thrift::Transport;
+
+use IO::Socket::INET;
+use IO::Select;
+
+package Net::Cassandra::Backend::Thrift::Socket;
+
+use base('Net::Cassandra::Backend::Thrift::Transport');
+
+sub new
+{
+ my $classname = shift;
+ my $host = shift || "localhost";
+ my $port = shift || 9090;
+ my $debugHandler = shift;
+
+ my $self = {
+ host => $host,
+ port => $port,
+ debugHandler => $debugHandler,
+ debug => 0,
+ sendTimeout => 100,
+ recvTimeout => 750,
+ handle => undef,
+ };
+
+ return bless($self,$classname);
+}
+
+
+sub setSendTimeout
+{
+ my $self = shift;
+ my $timeout = shift;
+
+ $self->{sendTimeout} = $timeout;
+}
+
+sub setRecvTimeout
+{
+ my $self = shift;
+ my $timeout = shift;
+
+ $self->{recvTimeout} = $timeout;
+}
+
+
+#
+#Sets debugging output on or off
+#
+# @param bool $debug
+#
+sub setDebug
+{
+ my $self = shift;
+ my $debug = shift;
+
+ $self->{debug} = $debug;
+}
+
+#
+# Tests whether this is open
+#
+# @return bool true if the socket is open
+#
+sub isOpen
+{
+ my $self = shift;
+
+ if( defined $self->{handle} ){
+ return ($self->{handle}->handles())[0]->connected;
+ }
+
+ return 0;
+}
+
+#
+# Connects the socket.
+#
+sub open
+{
+ my $self = shift;
+
+ my $sock = IO::Socket::INET->new(PeerAddr => $self->{host},
+ PeerPort => $self->{port},
+ Proto => 'tcp',
+ Timeout => $self->{sendTimeout}/1000)
+ || do {
+ my $error = 'TSocket: Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')';
+
+ if ($self->{debug}) {
+ $self->{debugHandler}->($error);
+ }
+
+ die new Net::Cassandra::Backend::Thrift::TException($error);
+
+ };
+
+
+ $self->{handle} = new IO::Select( $sock );
+}
+
+#
+# Closes the socket.
+#
+sub close
+{
+ my $self = shift;
+
+ if( defined $self->{handle} ){
+ close( ($self->{handle}->handles())[0] );
+ }
+}
+
+#
+# Uses stream get contents to do the reading
+#
+# @param int $len How many bytes
+# @return string Binary data
+#
+sub readAll
+{
+ my $self = shift;
+ my $len = shift;
+
+
+ return unless defined $self->{handle};
+
+ my $pre = "";
+ while (1) {
+
+ #check for timeout
+ my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );
+
+ if(@sockets == 0){
+ die new Net::Cassandra::Backend::Thrift::TException('TSocket: timed out reading '.$len.' bytes from '.
+ $self->{host}.':'.$self->{port});
+ }
+
+ my $sock = $sockets[0];
+
+ my ($buf,$sz);
+ $sock->recv($buf, $len);
+
+ if (!defined $buf || $buf eq '') {
+
+ die new Net::Cassandra::Backend::Thrift::TException('TSocket: Could not read '.$len.' bytes from '.
+ $self->{host}.':'.$self->{port});
+
+ } elsif (($sz = length($buf)) < $len) {
+
+ $pre .= $buf;
+ $len -= $sz;
+
+ } else {
+ return $pre.$buf;
+ }
+ }
+}
+
+#
+# Read from the socket
+#
+# @param int $len How many bytes
+# @return string Binary data
+#
+sub read
+{
+ my $self = shift;
+ my $len = shift;
+
+ return unless defined $self->{handle};
+
+ #check for timeout
+ my @sockets = $self->{handle}->can_read( $self->{sendTimeout} / 1000 );
+
+ if(@sockets == 0){
+ die new Net::Cassandra::Backend::Thrift::TException('TSocket: timed out reading '.$len.' bytes from '.
+ $self->{host}.':'.$self->{port});
+ }
+
+ my $sock = $sockets[0];
+
+ my ($buf,$sz);
+ $sock->recv($buf, $len);
+
+ if (!defined $buf || $buf eq '') {
+
+ die new TException('TSocket: Could not read '.$len.' bytes from '.
+ $self->{host}.':'.$self->{port});
+
+ }
+
+ return $buf;
+}
+
+
+#
+# Write to the socket.
+#
+# @param string $buf The data to write
+#
+sub write
+{
+ my $self = shift;
+ my $buf = shift;
+
+
+ return unless defined $self->{handle};
+
+ while (length($buf) > 0) {
+
+
+ #check for timeout
+ my @sockets = $self->{handle}->can_write( $self->{recvTimeout} / 1000 );
+
+ if(@sockets == 0){
+ die new Net::Cassandra::Backend::Thrift::TException('TSocket: timed out writing to bytes from '.
+ $self->{host}.':'.$self->{port});
+ }
+
+ my $sock = $sockets[0];
+
+ my $got = $sock->send($buf);
+
+ if (!defined $got || $got == 0 ) {
+ die new Net::Cassandra::Backend::Thrift::TException('TSocket: Could not write '.length($buf).' bytes '.
+ $self->{host}.':'.$self->{host});
+ }
+
+ $buf = substr($buf, $got);
+ }
+}
+
+#
+# Flush output to the socket.
+#
+sub flush
+{
+ my $self = shift;
+
+ return unless defined $self->{handle};
+
+ my $ret = ($self->{handle}->handles())[0]->flush;
+}
+
+1;
View
129 lib/Net/Cassandra/Backend/Thrift/Transport.pm
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+require 5.6.0;
+use strict;
+use warnings;
+
+use Net::Cassandra::Backend::Thrift;
+
+#
+# Transport exceptions
+#
+package Net::Cassandra::Backend::TTransportException;
+use base('Net::Cassandra::Backend::Thrift::TException');
+
+use constant UNKNOWN => 0;
+use constant NOT_OPEN => 1;
+use constant ALREADY_OPEN => 2;
+use constant TIMED_OUT => 3;
+use constant END_OF_FILE => 4;
+
+sub new{
+ my $classname = shift;
+ my $self = $classname->SUPER::new(@_);
+
+ return bless($self,$classname);
+}
+
+package Net::Cassandra::Backend::Thrift::Transport;
+
+#
+# Whether this transport is open.
+#
+# @return boolean true if open
+#
+sub isOpen
+{
+ die "abstract";
+}
+
+#
+# Open the transport for reading/writing
+#
+# @throws TTransportException if cannot open
+#
+sub open
+{
+ die "abstract";
+}
+
+#
+# Close the transport.
+#
+sub close
+{
+ die "abstract";
+}
+
+#
+# Read some data into the array.
+#
+# @param int $len How much to read
+# @return string The data that has been read
+# @throws TTransportException if cannot read any more data
+#
+sub read
+{
+ my ($len);
+ die("abstract");
+}
+
+#
+# Guarantees that the full amount of data is read.
+#
+# @return string The data, of exact length
+# @throws TTransportException if cannot read data
+#
+sub readAll
+{
+ my $self = shift;
+ my $len = shift;
+
+ my $data = '';
+ my $got = 0;
+
+ while (($got = length($data)) < $len) {
+ $data .= $self->read($len - $got);
+ }
+
+ return $data;
+}
+
+#
+# Writes the given data out.
+#
+# @param string $buf The data to write
+# @throws TTransportException if writing fails
+#
+sub write
+{
+ my ($buf);
+ die "abstract";
+}
+
+#
+# Flushes any pending data out of a buffer
+#
+# @throws TTransportException if a writing error occurs
+#
+sub flush {}
+
+1;
+
View
803 lib/Net/Cassandra/Backend/Types.pm
@@ -0,0 +1,803 @@
+#
+# Autogenerated by Thrift
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+require 5.6.0;
+use strict;
+use warnings;
+use Net::Cassandra::Backend::Thrift;
+
+package Net::Cassandra::Backend::column_t;
+use Class::Accessor;
+use base('Class::Accessor');
+Net::Cassandra::Backend::column_t->mk_accessors( qw( columnName value timestamp ) );
+sub new {
+my $classname = shift;
+my $self = {};
+my $vals = shift || {};
+$self->{columnName} = undef;
+$self->{value} = undef;
+$self->{timestamp} = undef;
+ if (UNIVERSAL::isa($vals,'HASH')) {
+ if (defined $vals->{columnName}) {
+ $self->{columnName} = $vals->{columnName};
+ }
+ if (defined $vals->{value}) {
+ $self->{value} = $vals->{value};
+ }
+ if (defined $vals->{timestamp}) {
+ $self->{timestamp} = $vals->{timestamp};
+ }
+ }
+return bless($self,$classname);
+}
+
+sub getName {
+ return 'Net::Cassandra::Backend::column_t';
+}
+
+sub read {
+ my $self = shift;
+ my $input = shift;
+ my $xfer = 0;
+ my $fname;
+ my $ftype = 0;
+ my $fid = 0;
+ $xfer += $input->readStructBegin(\$fname);
+ while (1)
+ {
+ $xfer += $input->readFieldBegin(\$fname, \$ftype, \$fid);
+ if ($ftype == Net::Cassandra::Backend::TType::STOP) {
+ last;
+ }
+ SWITCH: for($fid)
+ {
+ /^1$/ && do{ if ($ftype == Net::Cassandra::Backend::TType::STRING) {
+ $xfer += $input->readString(\$self->{columnName});
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ last; };
+ /^2$/ && do{ if ($ftype == Net::Cassandra::Backend::TType::STRING) {
+ $xfer += $input->readString(\$self->{value});
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ last; };
+ /^3$/ && do{ if ($ftype == Net::Cassandra::Backend::TType::I64) {
+ $xfer += $input->readI64(\$self->{timestamp});
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ last; };
+ $xfer += $input->skip($ftype);
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+}
+
+sub write {
+ my $self = shift;
+ my $output = shift;
+ my $xfer = 0;
+ $xfer += $output->writeStructBegin('Net::Cassandra::Backend::column_t');
+ if (defined $self->{columnName}) {
+ $xfer += $output->writeFieldBegin('columnName', Net::Cassandra::Backend::TType::STRING, 1);
+ $xfer += $output->writeString($self->{columnName});
+ $xfer += $output->writeFieldEnd();
+ }
+ if (defined $self->{value}) {
+ $xfer += $output->writeFieldBegin('value', Net::Cassandra::Backend::TType::STRING, 2);
+ $xfer += $output->writeString($self->{value});
+ $xfer += $output->writeFieldEnd();
+ }
+ if (defined $self->{timestamp}) {
+ $xfer += $output->writeFieldBegin('timestamp', Net::Cassandra::Backend::TType::I64, 3);
+ $xfer += $output->writeI64($self->{timestamp});
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+}
+
+package Net::Cassandra::Backend::batch_mutation_t;
+use Class::Accessor;
+use base('Class::Accessor');
+Net::Cassandra::Backend::batch_mutation_t->mk_accessors( qw( table key cfmap ) );
+sub new {
+my $classname = shift;
+my $self = {};
+my $vals = shift || {};
+$self->{table} = undef;
+$self->{key} = undef;
+$self->{cfmap} = undef;
+ if (UNIVERSAL::isa($vals,'HASH')) {
+ if (defined $vals->{table}) {
+ $self->{table} = $vals->{table};
+ }
+ if (defined $vals->{key}) {
+ $self->{key} = $vals->{key};
+ }
+ if (defined $vals->{cfmap}) {
+ $self->{cfmap} = $vals->{cfmap};
+ }
+ }
+return bless($self,$classname);
+}
+
+sub getName {
+ return 'Net::Cassandra::Backend::batch_mutation_t';
+}
+
+sub read {
+ my $self = shift;
+ my $input = shift;
+ my $xfer = 0;
+ my $fname;
+ my $ftype = 0;
+ my $fid = 0;
+ $xfer += $input->readStructBegin(\$fname);
+ while (1)
+ {
+ $xfer += $input->readFieldBegin(\$fname, \$ftype, \$fid);
+ if ($ftype == Net::Cassandra::Backend::TType::STOP) {
+ last;
+ }
+ SWITCH: for($fid)
+ {
+ /^1$/ && do{ if ($ftype == Net::Cassandra::Backend::TType::STRING) {
+ $xfer += $input->readString(\$self->{table});
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ last; };
+ /^2$/ && do{ if ($ftype == Net::Cassandra::Backend::TType::STRING) {
+ $xfer += $input->readString(\$self->{key});
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ last; };
+ /^3$/ && do{ if ($ftype == Net::Cassandra::Backend::TType::MAP) {
+ {
+ my $_size0 = 0;
+ $self->{cfmap} = {};
+ my $_ktype1 = 0;
+ my $_vtype2 = 0;
+ $xfer += $input->readMapBegin(\$_ktype1, \$_vtype2, \$_size0);
+ for (my $_i4 = 0; $_i4 < $_size0; ++$_i4)
+ {
+ my $key5 = '';
+ my $val6 = [];
+ $xfer += $input->readString(\$key5);
+ {
+ my $_size7 = 0;
+ $val6 = [];
+ my $_etype10 = 0;
+ $xfer += $input->readListBegin(\$_etype10, \$_size7);
+ for (my $_i11 = 0; $_i11 < $_size7; ++$_i11)
+ {
+ my $elem12 = undef;
+ $elem12 = new Net::Cassandra::Backend::column_t();
+ $xfer += $elem12->read($input);
+ push(@{$val6},$elem12);
+ }
+ $xfer += $input->readListEnd();
+ }
+ $self->{cfmap}->{$key5} = $val6;
+ }
+ $xfer += $input->readMapEnd();
+ }
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ last; };
+ $xfer += $input->skip($ftype);
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+}
+
+sub write {
+ my $self = shift;
+ my $output = shift;
+ my $xfer = 0;
+ $xfer += $output->writeStructBegin('Net::Cassandra::Backend::batch_mutation_t');
+ if (defined $self->{table}) {
+ $xfer += $output->writeFieldBegin('table', Net::Cassandra::Backend::TType::STRING, 1);
+ $xfer += $output->writeString($self->{table});
+ $xfer += $output->writeFieldEnd();
+ }
+ if (defined $self->{key}) {
+ $xfer += $output->writeFieldBegin('key', Net::Cassandra::Backend::TType::STRING, 2);
+ $xfer += $output->writeString($self->{key});
+ $xfer += $output->writeFieldEnd();
+ }
+ if (defined $self->{cfmap}) {
+ $xfer += $output->writeFieldBegin('cfmap', Net::Cassandra::Backend::TType::MAP, 3);
+ {
+ $output->writeMapBegin(Net::Cassandra::Backend::TType::STRING, Net::Cassandra::Backend::TType::LIST, scalar(keys %{$self->{cfmap}}));
+ {
+ while( my ($kiter13,$viter14) = each %{$self->{cfmap}})
+ {
+ $xfer += $output->writeString($kiter13);
+ {
+ $output->writeListBegin(Net::Cassandra::Backend::TType::STRUCT, scalar(@{${viter14}}));
+ {
+ foreach my $iter15 (@{${viter14}})
+ {
+ $xfer += ${iter15}->write($output);
+ }
+ }
+ $output->writeListEnd();
+ }
+ }
+ }
+ $output->writeMapEnd();
+ }
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+}
+
+package Net::Cassandra::Backend::superColumn_t;
+use Class::Accessor;
+use base('Class::Accessor');
+Net::Cassandra::Backend::superColumn_t->mk_accessors( qw( name columns ) );
+sub new {
+my $classname = shift;
+my $self = {};
+my $vals = shift || {};
+$self->{name} = undef;
+$self->{columns} = undef;
+ if (UNIVERSAL::isa($vals,'HASH')) {
+ if (defined $vals->{name}) {
+ $self->{name} = $vals->{name};
+ }
+ if (defined $vals->{columns}) {
+ $self->{columns} = $vals->{columns};
+ }
+ }
+return bless($self,$classname);
+}
+
+sub getName {
+ return 'Net::Cassandra::Backend::superColumn_t';
+}
+
+sub read {
+ my $self = shift;
+ my $input = shift;
+ my $xfer = 0;
+ my $fname;
+ my $ftype = 0;
+ my $fid = 0;
+ $xfer += $input->readStructBegin(\$fname);