This repository has been archived by the owner on May 28, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
hook.go
103 lines (89 loc) · 2.87 KB
/
hook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package findy
import (
"time"
"github.com/findy-network/findy-agent-vault/agency/model"
"github.com/findy-network/findy-agent-vault/utils"
agency "github.com/findy-network/findy-common-go/grpc/agency/v1"
ops "github.com/findy-network/findy-common-go/grpc/ops/v1"
"github.com/golang/glog"
"github.com/lainio/err2"
)
const waitTime = 5
func (f *Agency) archive(info *model.ArchiveInfo, status *agency.ProtocolStatus) {
switch status.State.ProtocolID.TypeID {
case agency.Protocol_DIDEXCHANGE:
connection := statusToConnection(status)
f.archiver.ArchiveConnection(info, connection)
case agency.Protocol_ISSUE_CREDENTIAL:
credential := statusToCredential(status)
f.archiver.ArchiveCredential(info, credential)
case agency.Protocol_PRESENT_PROOF:
proof := statusToProof(status)
f.archiver.ArchiveProof(info, proof)
case agency.Protocol_BASIC_MESSAGE:
message := statusToMessage(status)
f.archiver.ArchiveMessage(info, message)
default:
utils.LogHigh().Infof(
"Received unknown protocol type %s",
status.State.ProtocolID.TypeID.String(),
)
}
}
func (f *Agency) startHookOrWait() {
for {
err := f.listenAdminHook()
if err == nil {
break
}
glog.Warningf("listenAdminHook: cannot connect server, reconnecting after %d secs...", waitTime)
time.Sleep(waitTime * time.Second)
}
}
func (f *Agency) adminStatusLoop(ch chan *ops.AgencyStatus) {
defer err2.Catch(func(err error) {
glog.Errorf("Recovered error in psm hook routine: %s", err.Error())
go f.adminStatusLoop(ch)
})
for {
status, ok := <-ch
if !ok {
glog.Warningln("listenAdminHook: server lost, try reconnecting...")
time.Sleep(waitTime * time.Second)
f.startHookOrWait()
break
}
utils.LogMed().Infoln("received psm hook data for:", status.GetDID())
protocolStatus := status.GetProtocolStatus()
jobID := protocolStatus.State.ProtocolID.ID
// TODO: pass also timestamps: when protocol was started/approved/sent/issued/verified etc.
// revise this when we have "a real client" for the archive
info := &model.ArchiveInfo{
AgentID: status.GetDID(),
ConnectionID: status.GetConnectionID(),
JobID: jobID,
InitiatedByUs: protocolStatus.State.ProtocolID.Role == agency.Protocol_INITIATOR,
}
// archive currently only successful protocol results
if protocolStatus.State.State == agency.ProtocolState_OK {
f.archive(info, protocolStatus)
} else {
utils.LogLow().Infof(
"Skipping archiving for protocol run %s in state %s",
protocolStatus.State.ProtocolID.TypeID,
protocolStatus.State.State,
)
}
}
}
func (f *Agency) listenAdminHook() (err error) {
defer err2.Return(&err)
glog.Info("Start listening to PSM events.")
cmd := f.adminClient()
// Error in registration is not notified here, instead all relevant info comes
// in stream callback from now on
ch, err := cmd.psmHook()
err2.Check(err)
go f.adminStatusLoop(ch)
return nil
}