/
shacl.go
185 lines (149 loc) · 4.77 KB
/
shacl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package shapes
import (
"bytes"
"fmt"
configTypes "github.com/gleanerio/gleaner/internal/config"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"sync"
"github.com/gleanerio/gleaner/internal/common"
"github.com/gleanerio/gleaner/internal/millers/graph"
"github.com/knakk/rdf"
minio "github.com/minio/minio-go/v7"
"github.com/spf13/viper"
)
// ShapeRef holds http:// or file:// URIs for shape file locations
type ShapeRef struct {
Ref string
}
// SHACLMillObjects test a concurrent version of calling mock
func SHACLMillObjects(mc *minio.Client, bucketname string, v1 *viper.Viper) {
// load the SHACL files listed in the config file
loadShapeFiles(mc, v1)
entries := common.GetMillObjects(mc, bucketname)
multiCall(entries, bucketname, mc, v1)
}
func loadShapeFiles(mc *minio.Client, v1 *viper.Viper) error {
// read config file
//miniocfg := v1.GetStringMapString("minio")
//bucketName := miniocfg["bucket"] // get the top level bucket for all of gleaner operations from config file
bucketName, err := configTypes.GetBucketName(v1)
var s []ShapeRef
err = v1.UnmarshalKey("shapefiles", &s)
if err != nil {
log.Println(err)
}
for x := range s {
if isURL(s[x].Ref) {
log.Println("Load SHACL file")
b, err := getBody(s[x].Ref)
if err != nil {
log.Println("Error getting SHACL file body")
log.Println(err)
}
as := strings.Split(s[x].Ref, "/")
// TODO caution.. we need to note the RDF encoding and perhaps pass it along or verify it
// is what we should be using
_, err = graph.LoadToMinio(string(b), bucketName, fmt.Sprintf("shapes/%s", as[len(as)-1]), mc)
if err != nil {
log.Println(err)
}
log.Printf("Loaded SHACL file: %s \n", s[x].Ref)
} else { // see if it's a file
log.Println("Load file...")
dat, err := ioutil.ReadFile(s[x].Ref)
if err != nil {
log.Printf("Error loading file %s: %s\n", s[x].Ref, err)
}
as := strings.Split(s[x].Ref, "/")
_, err = graph.LoadToMinio(string(dat), bucketName, fmt.Sprintf("shapes/%s", as[len(as)-1]), mc)
if err != nil {
log.Println(err)
}
log.Printf("Loaded SHACL file: %s \n", s[x].Ref)
}
}
return nil
}
func isURL(str string) bool {
u, err := url.Parse(str)
return err == nil && u.Scheme != "" && u.Host != ""
}
func multiCall(e []common.Entry, bucketname string, mc *minio.Client, v1 *viper.Viper) {
mcfg := v1.GetStringMapString("gleaner")
semaphoreChan := make(chan struct{}, 20) // a blocking channel to keep concurrency under control (1 == single thread)
defer close(semaphoreChan)
wg := sync.WaitGroup{} // a wait group enables the main process a wait for goroutines to finish
var gb common.Buffer
m := common.GetShapeGraphs(mc, "gleaner") // TODO: beware static bucket lists, put this in the config
for j := range m {
log.Printf("Checking data graphs against shape graph: %s\n", m[j])
for k := range e {
wg.Add(1)
// log.Printf("Ready JSON-LD package #%d #%s \n", j, e[k].Urlval)
go func(j, k int) {
semaphoreChan <- struct{}{}
status := shaclTest(e[k].Urlval, e[k].Jld, m[j].Key, m[j].Jld, &gb)
wg.Done() // tell the wait group that we be done
log.Printf("#%d #%s wrote %d bytes", j, e[k].Urlval, status) // why print the status??
<-semaphoreChan
}(j, k)
}
}
wg.Wait()
// log.Println(gb.Len())
// TODO gb is type turtle here.. need to convert to ntriples to store
// nt, err := rdf2rdf(gb.String())
// if err != nil {
// log.Println(err)
// }
// write to S3
_, err := graph.LoadToMinio(gb.String(), "gleaner-milled", fmt.Sprintf("%s/%s_shacl.nt", mcfg["runid"], bucketname), mc)
if err != nil {
log.Println(err)
}
}
func rdf2rdf(r string) (string, error) {
// Decode the existing triples
var inFormat rdf.Format
inFormat = rdf.Turtle
var outFormat rdf.Format
outFormat = rdf.NTriples
var s string
buf := bytes.NewBufferString(s)
dec := rdf.NewTripleDecoder(strings.NewReader(r), inFormat)
tr, err := dec.DecodeAll()
enc := rdf.NewTripleEncoder(buf, outFormat)
err = enc.EncodeAll(tr)
enc.Close()
return buf.String(), err
}
// this same function is in pkg/summoner resolve duplication here and
// potentially elsewhere
func getBody(url string) ([]byte, error) {
var client http.Client
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Print(err) // not even being able to make a req instance.. might be a fatal thing?
return nil, err
}
req.Header.Set("User-Agent", "EarthCube_DataBot/1.0")
resp, err := client.Do(req)
if err != nil {
log.Printf("Error reading sitemap: %s", err)
return nil, err
}
defer resp.Body.Close()
var bodyBytes []byte
if resp.StatusCode == http.StatusOK {
bodyBytes, err = ioutil.ReadAll(resp.Body)
if err != nil {
log.Print(err)
return nil, err
}
}
return bodyBytes, err
}