forked from mozilla-services/FindMyDevice
/
encoders.go
94 lines (85 loc) · 2.89 KB
/
encoders.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/***** BEGIN LICENSE BLOCK *****
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2012
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Rob Miller (rmiller@mozilla.com)
# Mike Trinkala (trink@mozilla.com)
#
# ***** END LICENSE BLOCK *****/
package client
import (
"code.google.com/p/goprotobuf/proto"
"crypto/hmac"
"crypto/md5"
"crypto/sha1"
"fmt"
"github.com/mozilla-services/heka/message"
"hash"
)
type Encoder interface {
EncodeMessage(msg *message.Message) ([]byte, error)
EncodeMessageStream(msg *message.Message, outBytes *[]byte) error
}
type ProtobufEncoder struct {
signer *message.MessageSigningConfig
}
func NewProtobufEncoder(signer *message.MessageSigningConfig) *ProtobufEncoder {
return &ProtobufEncoder{signer}
}
func (p *ProtobufEncoder) EncodeMessage(msg *message.Message) ([]byte, error) {
return proto.Marshal(msg)
}
func (p *ProtobufEncoder) EncodeMessageStream(msg *message.Message, outBytes *[]byte) (err error) {
msgBytes, err := p.EncodeMessage(msg) // TODO if we compute the size of the header first this can be marshaled directly to outBytes
if err == nil {
err = createStream(msgBytes, outBytes, p.signer)
}
return
}
func createStream(msgBytes []byte, outBytes *[]byte, msc *message.MessageSigningConfig) error {
h := &message.Header{}
h.SetMessageLength(uint32(len(msgBytes)))
if msc != nil {
h.SetHmacSigner(msc.Name)
h.SetHmacKeyVersion(msc.Version)
var hm hash.Hash
switch msc.Hash {
case "sha1":
hm = hmac.New(sha1.New, []byte(msc.Key))
h.SetHmacHashFunction(message.Header_SHA1)
default:
hm = hmac.New(md5.New, []byte(msc.Key))
}
hm.Write(msgBytes)
h.SetHmac(hm.Sum(nil))
}
headerSize := proto.Size(h)
requiredSize := message.HEADER_FRAMING_SIZE + headerSize + len(msgBytes)
if requiredSize > message.MAX_RECORD_SIZE {
return fmt.Errorf("Message too big, requires %d (MAX_RECORD_SIZE = %d)",
message.MAX_RECORD_SIZE, requiredSize)
}
if cap(*outBytes) < requiredSize {
*outBytes = make([]byte, requiredSize)
} else {
*outBytes = (*outBytes)[:requiredSize]
}
(*outBytes)[0] = message.RECORD_SEPARATOR
(*outBytes)[1] = uint8(headerSize)
// This looks odd but is correct; it effectively "seeks" the initial write
// position for the protobuf output to be at the
// `(*outBytes)[message.HEADER_DELIMITER_SIZE]` position.
pbuf := proto.NewBuffer((*outBytes)[message.HEADER_DELIMITER_SIZE:message.HEADER_DELIMITER_SIZE])
if err := pbuf.Marshal(h); err != nil {
return err
}
(*outBytes)[headerSize+message.HEADER_DELIMITER_SIZE] = message.UNIT_SEPARATOR
copy((*outBytes)[message.HEADER_FRAMING_SIZE+headerSize:], msgBytes)
return nil
}