-
Notifications
You must be signed in to change notification settings - Fork 244
/
fd.go
130 lines (112 loc) · 2.42 KB
/
fd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package monitor
import (
"context"
"encoding/json"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/gptscript-ai/gptscript/pkg/runner"
"github.com/gptscript-ai/gptscript/pkg/types"
)
type Event struct {
runner.Event `json:",inline"`
Program *types.Program `json:"program,omitempty"`
Input string `json:"input,omitempty"`
Output string `json:"output,omitempty"`
Err string `json:"err,omitempty"`
}
type fileFactory struct {
file *os.File
}
// NewFileFactory creates a new monitor factory that writes events to the location specified.
// The location can be one of three things:
// 1. a file descriptor/handle in the form "fd://2"
// 2. a file name
// 3. a named pipe in the form "\\.\pipe\my-pipe"
func NewFileFactory(loc string) (runner.MonitorFactory, error) {
var (
file *os.File
err error
)
if strings.HasPrefix(loc, "fd://") {
fd, err := strconv.Atoi(strings.TrimPrefix(loc, "fd://"))
if err != nil {
return nil, err
}
file = os.NewFile(uintptr(fd), "events")
} else {
file, err = os.OpenFile(loc, os.O_WRONLY|os.O_CREATE, 0)
if err != nil {
return nil, err
}
}
return &fileFactory{
file: file,
}, nil
}
func (s fileFactory) Start(_ context.Context, prg *types.Program, env []string, input string) (runner.Monitor, error) {
fd := &fd{
prj: prg,
env: env,
input: input,
file: s.file,
}
fd.event(Event{
Event: runner.Event{
Time: time.Now(),
Type: "runStart",
},
Program: prg,
})
return fd, nil
}
type fd struct {
prj *types.Program
env []string
input string
file *os.File
runLock sync.Mutex
}
func (f *fd) Event(event runner.Event) {
f.event(Event{
Event: event,
Input: f.input,
})
}
func (f *fd) event(event Event) {
f.runLock.Lock()
defer f.runLock.Unlock()
b, err := json.Marshal(event)
if err != nil {
log.Errorf("Failed to marshal event: %v", err)
return
}
if _, err = f.file.Write(append(b, '\n', '\n')); err != nil {
log.Errorf("Failed to write event to file: %v", err)
}
}
func (f *fd) Stop(output string, err error) {
e := Event{
Event: runner.Event{
Time: time.Now(),
Type: "runFinish",
},
Input: f.input,
Output: output,
}
if err != nil {
e.Err = err.Error()
}
f.event(e)
if err = f.file.Close(); err != nil {
log.Errorf("Failed to close file: %v", err)
}
}
func (f *fd) Pause() func() {
f.runLock.Lock()
return func() {
f.runLock.Unlock()
}
}