Permalink
Browse files

Merge branch 'proof-of-concept'

  • Loading branch information...
2 parents 5d9a1fe + 698e5da commit 8b10a22a9b61ddcc6836df05f717f45e9459d74f @bbkr bbkr committed Sep 6, 2011
Showing with 320 additions and 0 deletions.
  1. +21 −0 README
  2. +8 −0 lib/MongoDB.pm
  3. +23 −0 lib/MongoDB/Collection.pm
  4. +32 −0 lib/MongoDB/Connection.pm
  5. +11 −0 lib/MongoDB/Cursor.pm
  6. +22 −0 lib/MongoDB/DataBase.pm
  7. +129 −0 lib/MongoDB/Wire.pm
  8. +9 −0 t/load.t
  9. +48 −0 t/mongodb.t
  10. +17 −0 t/wire.t
View
21 README
@@ -0,0 +1,21 @@
+MongoDB driver for Perl6
+
+This project is currently a hacked proof-of-concept.
+
+Roadmap:
+
+- Wire protocol response parsing
+- MongoDB::Cursor class
+- Wire OP_INSERT (almost ready), OP_QUERY, OP_UPDATE, OP_GETMORE, OP_DELETE support
+- flags support
+- error handling
+- tests
+
+- release of 0.1 version
+- put module on http://modules.perl6.org/
+(expected by the end of september)
+
+- Wire OP_KILL_CURSORS, OP_MSG support
+- authentication
+- more stuff from http://www.mongodb.org/display/DOCS/Mongo+Driver+Requirements
+
View
@@ -0,0 +1,8 @@
+class MongoDB;
+
+use MongoDB::Connection;
+use MongoDB::Wire;
+
+our $wire = MongoDB::Wire.new;
+
+method ^wire ( ::T ) { return $wire };
View
@@ -0,0 +1,23 @@
+class MongoDB::Collection;
+
+use MongoDB::Cursor;
+
+has MongoDB::DataBase $.database is rw;
+has Str $.name is rw;
+
+submethod BUILD ( MongoDB::DataBase $database, Str $name ) {
+
+ $.database = $database;
+
+ # TODO validate name
+ $.name = $name;
+}
+
+method insert ( %document ) {
+ MongoDB.wire.OP_INSERT( self, %document );
+}
+
+method query ( %query = { } ) {
+ MongoDB.wire.OP_QUERY( self, %query );
+}
+
View
@@ -0,0 +1,32 @@
+class MongoDB::Connection;
+
+use MongoDB::DataBase;
+
+has IO::Socket::INET $!sock;
+
+submethod BUILD ( Str $host = 'localhost', Int $port = 27017 ) {
+
+ $!sock = IO::Socket::INET.new( host => $host, port => $port );
+}
+
+method database ( Str $name ) {
+
+ return MongoDB::DataBase.new(
+ connection => self,
+ name => $name,
+ );
+}
+
+method send ( Buf $b, Bool $has_response ) {
+
+ $!sock.send( $b.unpack( 'A*' ) );
+
+ if $has_response {
+ # obtain int32 response length
+ my $l = $!sock.recv( 4 ).encode;
+ my $r = $!sock.recv( $l.unpack( 'V' ) - 4 ).encode;
+
+ # receive remaining response bytes from socket
+ return Buf.new( $l.contents.list, $r.contents.list );
+ }
+}
View
@@ -0,0 +1,11 @@
+class MongoDB::Cursor;
+
+# int64 (8 byte buffer)
+has Buf $!cid;
+
+submethod BUILD ( Buf $b ) {
+ $!cid = Buf.new( );
+
+ $!cid.contents.push( $b.contents.shift ) for ^ 8;
+}
+
View
@@ -0,0 +1,22 @@
+class MongoDB::DataBase;
+
+use MongoDB::Collection;
+
+has MongoDB::Connection $.connection is rw;
+has Str $.name is rw;
+
+submethod BUILD ( MongoDB::Connection $connection, Str $name ) {
+
+ $.connection = $connection;
+
+ # TODO validate name
+ $.name = $name;
+}
+
+method collection ( Str $name ) {
+
+ return MongoDB::Collection.new(
+ database => self,
+ name => $name,
+ );
+}
View
@@ -0,0 +1,129 @@
+use BSON;
+
+class MongoDB::Wire is BSON;
+
+# Implements Mongo Wire Protocol
+# http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol
+
+multi method _header ( Int $length, Str $op_code ) {
+ # http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-StandardMessageHeader
+
+ # struct MsgHeader {
+ # int32 messageLength; // total message size, including this
+ # int32 requestID; // identifier for this message
+ # int32 responseTo; // requestID from the original request
+ # // (used in reponses from db)
+ # int32 opCode; // request type - see table below
+ # }
+
+ # http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-RequestOpcodes
+ my %op_codes = {
+ 'OP_REPLY' => 1, # Reply to a client request. responseTo is set
+ 'OP_MSG' => 1000, # generic msg command followed by a string
+ 'OP_UPDATE' => 2001, # update document
+ 'OP_INSERT' => 2002, # insert new document
+ 'RESERVED' => 2003, # formerly used for OP_GET_BY_OID
+ 'OP_QUERY' => 2004, # query a collection
+ 'OP_GET_MORE' => 2005, # Get more data from a query. See Cursors
+ 'OP_DELETE' => 2006, # Delete documents
+ 'OP_KILL_CURSORS' => 2007, # Tell database client is done with a cursor
+ }
+
+ my $struct = self._int32( $length + 4 * 4 )
+ ~ self._int32( ( 1 .. 2147483647 ).pick )
+ ~ self._int32( 0 )
+ ~ self._int32( %op_codes{ $op_code } );
+
+ return $struct;
+}
+
+multi method _header ( Buf $b ) {
+
+ my %h = (
+ 'length' => self._int32( $b ),
+ 'request_id' => self._int32( $b ),
+ 'response_to' => self._int32( $b ),
+ 'op_code' => self._int32( $b ),
+ );
+
+ return %h;
+}
+
+method OP_INSERT ( MongoDB::Collection $collection, %document, Int $flags = 0 ) {
+ # http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPINSERT
+
+ # struct {
+ # MsgHeader header; // standard message header
+ # int32 flags; // bit vector - see below
+ # cstring fullCollectionName; // "dbname.collectionname"
+ # document* documents; // one or more documents to insert into the collection
+ # }
+
+ my $struct = self._int32( $flags )
+ ~ self._cstring( join '.', $collection.database.name, $collection.name )
+ ~ self._document( %document );
+
+ my $header = self._header( +$struct.contents, 'OP_INSERT' );
+
+ $collection.database.connection.send( $header ~ $struct, False );
+}
+
+method OP_QUERY ( MongoDB::Collection $collection, %query, Int $flags = 0 ) {
+ # http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPQUERY
+
+# struct OP_QUERY {
+# MsgHeader header; // standard message header
+# int32 flags; // bit vector of query options
+# cstring fullCollectionName; // "dbname.collectionname"
+# int32 numberToSkip; // number of documents to skip
+# int32 numberToReturn; // number of documents to return
+# // in the first OP_REPLY batch
+# document query; // query object. See below for details.
+# [ document returnFieldSelector; ] // Optional. Selector indicating the fields
+# // to return. See below for details.
+# }
+
+
+ my $struct = self._int32( 0 )
+ ~ self._cstring( join '.', $collection.database.name, $collection.name )
+ ~ self._int32( 0 )
+ ~ self._int32( 0 )
+ ~ self._document( %query );
+
+ my $header = self._header( +$struct.contents, 'OP_QUERY' );
+
+ my $reply = $collection.database.connection.send( $header ~ $struct, True );
+ $reply.contents.perl.say;
+ self.OP_REPLY( $reply );
+}
+
+method OP_REPLY ( Buf $b ) {
+ # struct {
+ # MsgHeader header; // standard message header
+ # int32 responseFlags; // bit vector - see details below
+ # int64 cursorID; // cursor id if client needs to do get more's
+ # int32 startingFrom; // where in the cursor this reply is starting
+ # int32 numberReturned; // number of documents in the reply
+ # document* documents; // documents
+ # }
+
+ my %r = (
+ 'header' => self._header( $b ),
+ my $f = self._int32( $b );
+ my $c = MongoDB::Cursor.new( b => $b );
+ my $s = self._int32( $b );
+ my $r = self._int32( $b );
+ my @d;
+ for ^$r {
+ my %d = self._document( $b );
+ @d.push( { %d } )
+ }
+ @d.perl.say;
+}
+
+# HACK to concatenate 2 Buf()s
+# workaround for https://rt.perl.org/rt3/Public/Bug/Display.html?id=96430
+multi sub infix:<~>(Buf $a, Buf $b) {
+
+ return Buf.new( $a.contents.list, $b.contents.list );
+}
View
@@ -0,0 +1,9 @@
+BEGIN { @*INC.unshift( 'lib' ); @*INC.unshift( '../bson/lib' ); }
+
+use Test;
+
+plan( 1 );
+
+lives_ok
+ { use MongoDB },
+ 'Load classes';
View
@@ -0,0 +1,48 @@
+BEGIN { @*INC.unshift( 'lib' ); @*INC.unshift( '../bson/lib' ); }
+
+use Test;
+use BSON;
+
+plan( 1 );
+
+
+#struct MsgHeader {
+# int32 messageLength; // total message size, including this
+# int32 requestID; // identifier for this message
+# int32 responseTo; // requestID from the original request
+# // (used in reponses from db)
+# int32 opCode; // request type - see table below
+#}
+
+my $b = BSON.new( );
+
+my $document = $b.encode( { "Hello" => "world!", "tab" => [ 1,2,3 ], "obj" => { "4" => 5 }, "zażółć" => "jaźń" } );
+
+my $requestID = $b._int32( 666 );
+
+my $responseTo = $b._int32( 0 );
+
+my $opCode = $b._int32( 2002 );
+
+# struct {
+# MsgHeader header; // standard message header
+# int32 flags; // bit vector - see below
+# cstring fullCollectionName; // "dbname.collectionname"
+# document* documents; // one or more documents to insert into the collection
+#}
+
+my $flags = $b._int32( 0 );
+my $fullCollectionName = $b._cstring( "test.perl" );
+
+my $length = $b._int32( 4 + +$requestID.contents + +$responseTo.contents + +$opCode.contents + +$document.contents + +$flags.contents + +$fullCollectionName.contents );
+
+my @full = $length.contents, $requestID.contents, $responseTo.contents, $opCode.contents, $flags.contents, $fullCollectionName.contents, $document.contents;
+
+@full.perl.say;
+
+my $full = Buf.new( @full );
+
+#say $full.unpack( 'A*');
+
+my $sock = IO::Socket::INET.new( host => '127.0.0.1', port => 27017 );
+$sock.send( $full.unpack( 'A*' ) );
View
@@ -0,0 +1,17 @@
+BEGIN { @*INC.unshift( 'lib' ); @*INC.unshift( '../bson/lib' ); }
+
+use Test;
+use MongoDB;
+
+plan( 1 );
+
+my $connection = MongoDB::Connection.new( );
+my $database = $connection.database( 'test' );
+my $collection = $database.collection( 'perl' );
+
+$collection.insert( {"ala" => "kot" } );
+say "inserted";
+$collection.insert( {"zażółć" => ["gęślą", "jaźń"] } );
+say "inserted";
+$collection.query( {} );
+say "queried";

0 comments on commit 8b10a22

Please sign in to comment.