-
Notifications
You must be signed in to change notification settings - Fork 0
/
crh-rpc.go
134 lines (118 loc) · 4.6 KB
/
crh-rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package middleware
import (
"reflect"
"github.com/gfads/midarch/pkg/gmidarch/development/messages"
"github.com/gfads/midarch/pkg/gmidarch/development/protocols"
"github.com/gfads/midarch/pkg/shared"
"github.com/gfads/midarch/pkg/shared/lib"
)
// @Type: CRHRPC
// @Behaviour: Behaviour = InvP.e1 -> I_Process -> TerP.e1 -> Behaviour
type CRHRPC struct{}
// func (c CRHRPC) getLocalTcpAddr() *net.TCPAddr {
// lib.PrintlnDebug("----------------------------------------->", shared.GetFunction(), "CRHRPC Version Not adapted")
// //fmt.Println("github.com/gfads/midarch/src/shared.LocalAddr:", shared.LocalAddr)
// lib.PrintlnDebug("github.com/gfads/midarch/src/shared.LocalAddr:", shared.LocalAddr)
// var err error = nil
// var localTCPAddr *net.TCPAddr = nil
// //shared.LocalAddr = "127.0.0.1:37521"
// if shared.LocalAddr != "" {
// localTCPAddr, err = net.ResolveTCPAddr("tcp", shared.LocalAddr)
// if err != nil {
// shared.ErrorHandler(shared.GetFunction(), err.Error())
// }
// }
// return localTCPAddr
// }
func (c CRHRPC) I_Process(id string, msg *messages.SAMessage, info *interface{}, reset *bool) {
lib.PrintlnDebug("----------------------------------------->", shared.GetFunction(), "CRHRPC Version Not adapted")
infoTemp := *info
crhInfo := infoTemp.(messages.CRHInfo)
// check message
//payload := msg.Payload.([]byte)
payload := msg.Payload.(messages.RequestorInfo).MarshalledMessage
h := msg.Payload.(messages.RequestorInfo).Inv.Endpoint.Host
p := msg.Payload.(messages.RequestorInfo).Inv.Endpoint.Port
host := ""
port := ""
if h == "" || p == "" {
host = crhInfo.EndPoint.Host
port = crhInfo.EndPoint.Port
} else {
host = h
port = p
}
msgToServer := payload
addr := host + ":" + port
var err error
//fmt.Println("Vai conectar", crhInfo.Conns[addr])
lib.PrintlnDebug("Vai conectar", crhInfo.Conns[addr])
if _, ok := crhInfo.Protocols[addr]; !ok || reflect.TypeOf(crhInfo.Protocols[addr]).Elem().Name() != "RPC" { // no connection open yet
lib.PrintlnInfo("Try to connect", crhInfo.Protocols[addr])
if ok {
lib.PrintlnInfo("ElemName", reflect.TypeOf(crhInfo.Protocols[addr]).Elem().Name())
crhInfo.Protocols[addr].CloseConnection()
}
crhInfo.Protocols[addr] = &protocols.RPC{}
crhInfo.Protocols[addr].ConnectToServer(host, port)
}
lib.PrintlnInfo("Connected", crhInfo.Protocols[addr])
err = crhInfo.Protocols[addr].Send(msgToServer)
if err != nil {
lib.PrintlnError("Error trying to send message:", err.Error())
*msg = messages.SAMessage{Payload: nil} // TODO dcruzb: adjust message
crhInfo.Protocols[addr].CloseConnection()
crhInfo.Protocols[addr] = nil
delete(crhInfo.Protocols, addr)
return
}
lib.PrintlnInfo("Sent message", crhInfo.Protocols[addr])
msgFromServer, err := crhInfo.Protocols[addr].Receive()
if err != nil {
lib.PrintlnError("Error trying to read message:", err.Error())
*msg = messages.SAMessage{Payload: nil} // TODO dcruzb: adjust message
crhInfo.Protocols[addr].CloseConnection()
crhInfo.Protocols[addr] = nil
delete(crhInfo.Protocols, addr)
return
}
lib.PrintlnInfo("Received message", crhInfo.Protocols[addr])
VerifyProtocolAdaptation(msgFromServer, crhInfo.Protocols[addr])
lib.PrintlnInfo("Adaptation Verified", crhInfo.Protocols[addr])
*msg = messages.SAMessage{Payload: msgFromServer}
}
// func (c CRHRPC) send(sizeOfMsgSize []byte, msgToServer []byte, conn net.Conn) error {
// lib.PrintlnDebug("----------------------------------------->", shared.GetFunction(), "CRHRPC Version Not adapted")
// binary.LittleEndian.PutUint32(sizeOfMsgSize, uint32(len(msgToServer)))
// _, err := conn.Write(sizeOfMsgSize)
// if err != nil {
// //shared.ErrorHandler(shared.GetFunction(), err.Error())
// return err
// }
// // send message
// _, err = conn.Write(msgToServer)
// if err != nil {
// //shared.ErrorHandler(shared.GetFunction(), err.Error())
// return err
// }
// return nil
// }
// func (c CRHRPC) read(conn net.Conn, size []byte) ([]byte, error) {
// lib.PrintlnDebug("----------------------------------------->", shared.GetFunction(), "CRHRPC Version Not adapted")
// // receive reply's size
// _, err := conn.Read(size)
// if err != nil {
// lib.PrintlnError(shared.GetFunction(), err)
// //shared.ErrorHandler(shared.GetFunction(), err.Error())
// return nil, err
// }
// // receive reply
// msgFromServer := make([]byte, binary.LittleEndian.Uint32(size), shared.NUM_MAX_MESSAGE_BYTES)
// _, err = conn.Read(msgFromServer)
// if err != nil {
// lib.PrintlnError(shared.GetFunction(), err)
// //shared.ErrorHandler(shared.GetFunction(), err.Error())
// return nil, err
// }
// return msgFromServer, nil
// }