/
EDC.pm6
204 lines (173 loc) · 5.81 KB
/
EDC.pm6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
use v6;
use BSON::D;
use BSON::EDC-Tools;
#-------------------------------------------------------------------------------
#
class X::BSON::Encodable is Exception {
has $.operation; # Operation encode, decode or other
has $.type; # Type to handle
has $.emsg; # Extra message
method message () {
return "\n$!operation\() on $!type error: $!emsg";
}
}
#-------------------------------------------------------------------------------
# This role implements BSON serialization functions. To provide full encoding
# of a type more information must be stored. This class must represent
# a document such as { key => SomeType.new(...) }. Therefore it needs to store
# the key name and the data representing the class.
# Furthermore it needs a code for the specific BSON type.
#
#
# Role to encode to and/or decode from a BSON representation.
#
package BSON {
class Encodable is BSON::Encodable-Tools {
constant $BSON-DOUBLE = 0x01;
constant $BSON-DOCUMENT = 0x03;
# Visible in all objects of this class
#
my Int $index = 0;
# has Int $thread-count = 0;
# has Array $!threads;
#---------------------------------------------------------------------------
#
method encode ( Hash $document --> Buf ) {
my Int $doc-length = 0;
my Buf $stream-part;
my Buf $stream = Buf.new();
# Process the document. The order in which the keys are selected
# is not important.
#
for $document.keys -> $var-name {
# Get the data of the given key and test for its type
#
my $data = $document{$var-name};
given $data {
# Embedded document
# element ::= "\x03" e_name document
# document ::= int32 e_list "\x00"
# e_list ::= element e_list | ""
#
when Hash {
$stream-part = [~] Buf.new($BSON-DOCUMENT),
self.enc_cstring($var-name),
self.encode($data);
$stream ~= $stream-part;
}
# Double precision, e_name is a cstring, double is 64-bit IEEE 754
# floating point number
# element ::= "\x01" e_name double.
#
when Num {
my $promoted-self = self.clone;
$promoted-self does BSON::Double;
$stream-part = [~] Buf.new($BSON-DOUBLE),
self.enc_cstring($var-name),
$promoted-self.encode_obj($data);
$stream ~= $stream-part;
}
}
}
# Build document
# document ::= int32 e_list "\x00"
#
return [~] self.enc_int32($stream.elems + 5), $stream, Buf.new(0x00);
}
#---------------------------------------------------------------------------
# This one is used to start decode process
#
multi method decode ( Buf $stream --> Hash ) {
$index = 0;
# $thread-count = 0;
# $!threads = [];
return self!decode_document($stream.list);
}
# This one is used to recursively decode sub documents
#
multi method decode ( Array $stream --> Hash ) {
return self!decode_document($stream);
}
method !decode_document ( Array $encoded-document --> Hash ) {
# Result document
#
my Hash $document;
# document ::= int32 e_list "\x00"
#
my Int $doc-length = self.dec_int32( $encoded-document, $index);
# element ::= bson_code e_name data
#
my Int $bson_code = $encoded-document[$index++];
while $bson_code {
# Get e_name
#
my Str $key_name = self.dec_cstring( $encoded-document, $index);
given $bson_code {
when $BSON-DOUBLE {
# Clone this object and then promote it to play the role of the
# matched type. In this case BSON::Double.
#
my $nbr-bytes-channel = Channel.new;
my $promoted-self = self.clone;
$promoted-self does BSON::Double;
# $!threads[$thread-count] = Thread.new(
# code => {
$*SCHEDULER.cue( {
#say "Code started";
my Num $keyval = $promoted-self.decode_obj(
$encoded-document,
$index,
$nbr-bytes-channel
);
cas(
$document,
-> $doc {
my Hash $new-doc = $doc;
$new-doc{$key_name} = $keyval;
$new-doc;
}
);
#say "Code stopped";
},
# name => 't ' ~ $thread-count
);
#say "Code scheduled, ";
# $!threads[$thread-count].run;
# $thread-count++;
$index += $nbr-bytes-channel.receive;
#say "New index $index";
$nbr-bytes-channel.close;
}
when $BSON-DOCUMENT {
my Hash $sub-doc = self.decode($encoded-document);
cas(
$document,
-> $doc {
my Hash $new-doc = $doc;
$new-doc{$key_name} = $sub-doc;
$new-doc;
}
);
}
default {
say "What?!: $bson_code";
}
}
# if $thread-count > 9 {
# self!clear-threads;
# $thread-count = 0;
# }
$bson_code = $encoded-document[$index++];
}
# self!clear-threads;
return $document;
}
# method !clear-threads ( ) {
# for $!threads.list -> $thread is rw {
#say "Finished $thread" if $thread.defined;
# $thread.finish if $thread.defined;
# undefine($thread);
# }
# }
}
}