/
tilelogs.go
289 lines (251 loc) · 8.11 KB
/
tilelogs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
// SPDX-FileCopyrightText: 2022 Sascha Brawer <sascha@brawer.ch>
// SPDX-License-Identifier: MIT
package main
import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"time"
"github.com/andybalholm/brotli"
"github.com/lanrat/extsort"
"github.com/ulikunitz/xz"
"golang.org/x/sync/errgroup"
)
// Return a list of weeks for which OpenStreetMap has tile logs.
// Weeks are returned in ISO 8601 format such as "2021-W07".
// The result is sorted from least to most recent week.
// We return only those weeks where OpenStreetMap has tile logs
// for all seven days.
func GetAvailableWeeks(client *http.Client) ([]string, error) {
url := "https://planet.openstreetmap.org/tile_logs/"
r, err := client.Get(url)
if err != nil {
return nil, err
}
// Only accept HTTP responses with status code 200 OK
// and when the Content-Type header is HTML.
contentType := r.Header.Get("Content-Type")
if strings.ContainsRune(contentType, ';') { // text/html;charset=UTF-8
contentType = strings.Split(contentType, ";")[0]
}
if r.StatusCode != 200 || contentType != "text/html" {
return nil, fmt.Errorf("failed to fetch %s, StatusCode=%d Content-Type=%s", url, r.StatusCode, contentType)
}
body, err := io.ReadAll(r.Body)
if err != nil {
return nil, err
}
// Find out what weeks are available. For each week, we keep a bitmask
// that tells for which days of that week the OSM Planet server
// has log files available. For example, if this map contains
// the entry 202107 → 5 (in binary: 0000101), the server has log files
// for Tuesday (0000100) and Sunday (0000001) for the 7th week of 2021.
// That is, Tuesday, February 16, and Sunday, February 21.
re := regexp.MustCompile(`<a href="tiles-(\d{4}-\d\d-\d\d)\.txt\.xz">`)
available := make(map[int]int8) // (year*100+isoweek) → 7 bits
for _, m := range re.FindAllSubmatch(body, -1) {
if t, err := time.Parse("2006-01-02", string(m[1])); err == nil {
year, week := t.ISOWeek()
available[year*100+week] |= 1 << int8(t.Weekday())
}
}
// To our callers, we return weeks in ISO 8601 format, eg. "2021-W07".
result := make([]string, 0, len(available))
for week, days := range available {
if days == 127 { // server has logs for all seven days of this week
isoWeekString := fmt.Sprintf("%04d-W%02d", week/100, week%100)
result = append(result, isoWeekString)
}
}
sort.Strings(result)
return result, nil
}
var tileLogRegexp = regexp.MustCompile(`^(\d+)/(\d+)/(\d+)\s+(\d+)$`)
// GetTileLogs returns an io.Reader for the sorted log records of a week.
// If cachedir contains already contains cached records for the requested week,
// the data will be read from local disk. Otherwise, the seven daily log files
// for the requested week are fetched from the OpenStreetMap planet server,
// uncompressed, sorted by TileKey, and stored as a compressed file into
// cachedir.
func GetTileLogs(week string, client *http.Client, workdir string, storage Storage) (io.Reader, error) {
ctx := context.Background()
remotePath := fmt.Sprintf("internal/osmviews-builder/tilelogs-%s.br", week)
if storage != nil {
if _, err := storage.Stat(ctx, "qrank", remotePath); err == nil {
if r, err := storage.Get(ctx, "qrank", remotePath); err == nil {
return brotli.NewReader(r), nil
}
}
}
path := filepath.Join(workdir, fmt.Sprintf("tilelogs-%s.br", week))
if f, err := os.Open(path); err == nil {
return brotli.NewReader(f), nil
}
if logger != nil {
logger.Printf("building %s", path)
}
if err := os.MkdirAll(workdir, os.ModePerm); err != nil {
return nil, err
}
ch := make(chan extsort.SortType, 100000)
g, subCtx := errgroup.WithContext(ctx)
config := extsort.DefaultConfig()
config.NumWorkers = runtime.NumCPU()
sorter, outChan, errChan := extsort.New(ch, TileCountFromBytes, TileCountLess, config)
g.Go(func() error {
return fetchWeeklyTileLogs(week, client, ch, subCtx)
})
g.Go(func() error {
sorter.Sort(ctx) // not subCtx, as per extsort docs
return nil
})
if err := g.Wait(); err != nil {
return nil, err
}
// We write to a temporary file first, and rename it atomically
// once it is finished in usable state. This prevents hiccups
// in case the process crashes (or the machine dies) while the
// output file is being written.
tmppath := path + ".tmp"
tmpfile, err := os.Create(tmppath)
if err != nil {
return nil, err
}
defer tmpfile.Close()
writer := brotli.NewWriterLevel(tmpfile, 9)
defer writer.Close()
var last TileCount
for data := range outChan {
cur := data.(TileCount)
if cur.Key != last.Key {
if last.Count > 0 {
zoom, x, y := last.Key.ZoomXY()
fmt.Fprintf(writer, "%d/%d/%d %d\n", zoom, x, y, last.Count)
}
last = cur
} else {
last.Count += cur.Count
}
}
if last.Count > 0 {
zoom, x, y := last.Key.ZoomXY()
fmt.Fprintf(writer, "%d/%d/%d %d\n", zoom, x, y, last.Count)
}
// Check for errors from the external sorting library.
if err := <-errChan; err != nil {
return nil, err
}
// Close writer/compressor, ask kernel to ensure temp file is on disk, and close it.
if err := writer.Close(); err != nil {
return nil, err
}
if err := tmpfile.Sync(); err != nil {
return nil, err
}
if err := tmpfile.Close(); err != nil {
return nil, err
}
// Now that we have the result on disk, rename it to final path.
if err := os.Rename(tmppath, path); err != nil {
return nil, err
}
// Upload the file to object storage and return a reader for it.
if storage != nil {
contentType := "application/x-brotli"
if err := storage.PutFile(ctx, "qrank", remotePath, path, contentType); err != nil {
return nil, err
}
if err := os.Remove(path); err != nil {
return nil, err
}
if r, err := storage.Get(ctx, "qrank", remotePath); err == nil {
return brotli.NewReader(r), nil
}
}
// Open the file for reading and return a reader for it.
if f, err := os.Open(path); err == nil {
return brotli.NewReader(f), nil
} else {
return nil, err
}
}
func fetchWeeklyTileLogs(week string, client *http.Client, ch chan<- extsort.SortType, ctx context.Context) error {
defer close(ch)
// Fetch the tile logs for the seven days in this week, in parallel.
parsedYear, parsedWeek, err := ParseWeek(week)
if err != nil {
return err
}
// Initially we did the fetches in parallel, but planet.openstreetmap.org
// only seems to accept 1-2 connections from the same IP address.
firstDay := weekStart(parsedYear, parsedWeek)
for i := 0; i < 7; i++ {
day := firstDay.AddDate(0, 0, i)
if err := fetchTileLogs(day, client, ch, ctx); err != nil {
return err
}
}
return nil
}
func fetchTileLogs(day time.Time, client *http.Client, ch chan<- extsort.SortType, ctx context.Context) error {
url := fmt.Sprintf(
"https://planet.openstreetmap.org/tile_logs/tiles-%04d-%02d-%02d.txt.xz",
day.Year(), day.Month(), day.Day())
r, err := client.Get(url)
if err != nil {
return err
}
reader, err := xz.NewReader(r.Body)
if err != nil {
return err
}
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
// Check if our task has been canceled. Typically this can happen
// because of an error in another goroutine in the same x.sync.errroup.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if tc := ParseTileCount(scanner.Text()); tc.Count > 0 {
ch <- tc
}
}
if err := scanner.Err(); err != nil {
return err
}
return nil
}
// Reverse of Go’s time.ISOWeek() function.
func weekStart(year, week int) time.Time {
// Find the first Monday before July 1 of the given year.
t := time.Date(year, 7, 1, 0, 0, 0, 0, time.UTC)
if wd := t.Weekday(); wd == time.Sunday {
t = t.AddDate(0, 0, -6)
} else {
t = t.AddDate(0, 0, -int(wd)+1)
}
_, w := t.ISOWeek()
return t.AddDate(0, 0, (week-w)*7)
}
var isoWeekRegexp = regexp.MustCompile(`(\d{4})-W(\d{2})`)
// ParseWeek gives the year and week for an ISO week string like "2018-W34".
func ParseWeek(s string) (year int, week int, err error) {
match := isoWeekRegexp.FindStringSubmatch(s)
if match == nil || len(match) != 3 {
return 0, 0, fmt.Errorf("week not in ISO 8601 format: %s", s)
}
year, _ = strconv.Atoi(match[1])
week, _ = strconv.Atoi(match[2])
return year, week, nil
}