forked from gravitational/teleport
-
Notifications
You must be signed in to change notification settings - Fork 0
/
memlistener.go
97 lines (83 loc) · 2.07 KB
/
memlistener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/*
Copyright 2015 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// package auth implements certificate signing authority and access control server
// Authority server is composed of several parts:
//
// * Authority server itself that implements signing and acl logic
// * HTTP server wrapper for authority server
// * HTTP client wrapper
//
package utils
import (
"github.com/gravitational/trace"
"io"
"net"
"sync"
"sync/atomic"
)
// Memory listener implements net.Listener using net.Conn
type MemoryListener struct {
connections chan net.Conn
state chan int
closed uint32
}
func NewMemoryListener() *MemoryListener {
ml := &MemoryListener{}
ml.connections = make(chan net.Conn)
ml.state = make(chan int)
return ml
}
func (ml *MemoryListener) Accept() (net.Conn, error) {
select {
case newConnection := <-ml.connections:
return newConnection, nil
case <-ml.state:
return nil, io.EOF
}
}
func (ml *MemoryListener) Close() error {
if atomic.CompareAndSwapUint32(&ml.closed, 0, 1) {
close(ml.state)
}
return nil
}
func (ml *MemoryListener) Handle(conn net.Conn) error {
select {
case <-ml.state:
return trace.Errorf("MemoryListener is closed")
default:
}
local, remote := net.Pipe()
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
defer local.Close()
io.Copy(local, conn)
}()
go func() {
defer wg.Done()
defer conn.Close()
io.Copy(conn, local)
}()
ml.connections <- remote
wg.Wait()
return nil
}
func (ml *MemoryListener) Addr() net.Addr {
addr := NetAddr{
AddrNetwork: "tcp",
Addr: "memoryListener",
}
return &addr
}