forked from scionproto/scion
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reader.go
119 lines (110 loc) · 3.42 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Copyright 2018 ETH Zurich
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package reader implements a reader object that reads from tun, routes with
// support from egress/router to determine the correct egressDispatcher, and
// puts data on the ring buffer of the egressDispatcher.
package reader
import (
"io"
"net"
"os"
"github.com/scionproto/scion/go/lib/common"
"github.com/scionproto/scion/go/lib/log"
"github.com/scionproto/scion/go/lib/ringbuf"
"github.com/scionproto/scion/go/sig/egress"
"github.com/scionproto/scion/go/sig/egress/router"
)
const (
ip4Ver = 0x4
ip6Ver = 0x6
ip4DstOff = 16
ip6DstOff = 24
)
var _ egress.Runner = (*Reader)(nil)
type Reader struct {
log log.Logger
tunIO io.ReadWriteCloser
}
func NewReader(tunIO io.ReadWriteCloser) *Reader {
return &Reader{log: log.New(), tunIO: tunIO}
}
func (r *Reader) Run() {
defer log.LogPanicAndExit()
r.log.Info("EgressReader: starting")
bufs := make(ringbuf.EntryList, egress.EgressBufPkts)
BatchLoop:
for {
n, _ := egress.EgressFreePkts.Read(bufs, true)
if n < 0 {
break
}
for i := 0; i < n; i++ {
buf := bufs[i].(common.RawBytes)
bufs[i] = nil
buf = buf[:cap(buf)]
length, err := r.tunIO.Read(buf)
if err != nil {
if err == io.EOF {
// TUN is closed, shut down reader.
break BatchLoop
}
// Sometimes we don't receive a clean EOF, so we check if the
// tunnel device is closed.
if pErr, ok := err.(*os.PathError); ok {
if pErr.Err == os.ErrClosed {
break BatchLoop
}
}
r.log.Error("EgressReader: error reading from TUN device", "err", err)
continue
}
buf = buf[:length]
dstIP, err := r.getDestIP(buf)
if err != nil {
// Release buffer back to free buffer pool
egress.EgressFreePkts.Write(ringbuf.EntryList{buf}, true)
// FIXME(kormat): replace with metric.
r.log.Error("EgressReader: unable to get dest IP", "err", err)
continue
}
dstIA, dstRing := router.NetMap.Lookup(dstIP)
if dstRing == nil {
// Release buffer back to free buffer pool
egress.EgressFreePkts.Write(ringbuf.EntryList{buf}, true)
// FIXME(kormat): replace with metric.
r.log.Error("EgressReader: unable to find dest IA", "ip", dstIP)
continue
}
if n, _ := dstRing.Write(ringbuf.EntryList{buf}, false); n != 1 {
// Release buffer back to free buffer pool
egress.EgressFreePkts.Write(ringbuf.EntryList{buf}, true)
// FIXME(kormat): replace with metric.
r.log.Error("EgressReader: no space in egress worker queue", "ia", dstIA)
}
}
}
r.log.Info("EgressReader: stopping")
}
func (r *Reader) getDestIP(b common.RawBytes) (net.IP, error) {
ver := (b[0] >> 4)
switch ver {
case ip4Ver:
return net.IP(b[ip4DstOff : ip4DstOff+net.IPv4len]), nil
case ip6Ver:
return net.IP(b[ip6DstOff : ip6DstOff+net.IPv6len]), nil
default:
return nil, common.NewBasicError("Unsupported IP protocol version in egress packet", nil,
"type", ver)
}
}