-
-
Notifications
You must be signed in to change notification settings - Fork 124
/
rss.go
344 lines (272 loc) · 8.29 KB
/
rss.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
// Copyright (c) 2021 - 2024, Ludvig Lundgren and the autobrr contributors.
// SPDX-License-Identifier: GPL-2.0-or-later
package feed
import (
"context"
"encoding/xml"
"net/url"
"regexp"
"strconv"
"strings"
"time"
"github.com/autobrr/autobrr/internal/domain"
"github.com/autobrr/autobrr/internal/release"
"github.com/autobrr/autobrr/pkg/errors"
"github.com/mmcdole/gofeed"
"github.com/rs/zerolog"
)
var (
rxpSize = regexp.MustCompile(`(?mi)(([0-9.]+)\s*(b|kb|kib|kilobyte|mb|mib|megabyte|gb|gib|gigabyte|tb|tib|terabyte))`)
rxpFreeleech = regexp.MustCompile(`(?mi)(\bfreeleech\b)`)
rxpHTML = regexp.MustCompile(`(?mi)<.*?>`)
)
type RSSJob struct {
Feed *domain.Feed
Name string
Log zerolog.Logger
URL string
Repo domain.FeedRepo
CacheRepo domain.FeedCacheRepo
ReleaseSvc release.Service
Timeout time.Duration
attempts int
errors []error
JobID int
}
func NewRSSJob(feed *domain.Feed, name string, log zerolog.Logger, url string, repo domain.FeedRepo, cacheRepo domain.FeedCacheRepo, releaseSvc release.Service, timeout time.Duration) FeedJob {
return &RSSJob{
Feed: feed,
Name: name,
Log: log,
URL: url,
Repo: repo,
CacheRepo: cacheRepo,
ReleaseSvc: releaseSvc,
Timeout: timeout,
}
}
func (j *RSSJob) Run() {
ctx := context.Background()
if err := j.RunE(ctx); err != nil {
j.Log.Err(err).Int("attempts", j.attempts).Msg("rss feed process error")
j.errors = append(j.errors, err)
}
j.attempts = 0
j.errors = j.errors[:0]
}
func (j *RSSJob) RunE(ctx context.Context) error {
if err := j.process(ctx); err != nil {
j.Log.Err(err).Msg("rss feed process error")
return err
}
return nil
}
func (j *RSSJob) process(ctx context.Context) error {
items, err := j.getFeed(ctx)
if err != nil {
j.Log.Error().Err(err).Msgf("error fetching rss feed items")
return errors.Wrap(err, "error getting rss feed items")
}
j.Log.Debug().Msgf("found (%d) new items to process", len(items))
if len(items) == 0 {
return nil
}
releases := make([]*domain.Release, 0)
for _, item := range items {
item := item
j.Log.Debug().Msgf("item: %v", item.Title)
rls := j.processItem(item)
if rls != nil {
releases = append(releases, rls)
}
}
// process all new releases
go j.ReleaseSvc.ProcessMultiple(releases)
return nil
}
func (j *RSSJob) processItem(item *gofeed.Item) *domain.Release {
now := time.Now()
if j.Feed.MaxAge > 0 {
if item.PublishedParsed != nil && item.PublishedParsed.After(time.Date(1970, time.April, 1, 0, 0, 0, 0, time.UTC)) {
if !isNewerThanMaxAge(j.Feed.MaxAge, *item.PublishedParsed, now) {
return nil
}
}
}
rls := domain.NewRelease(domain.IndexerMinimal{ID: j.Feed.Indexer.ID, Name: j.Feed.Indexer.Name, Identifier: j.Feed.Indexer.Identifier, IdentifierExternal: j.Feed.Indexer.IdentifierExternal})
rls.Implementation = domain.ReleaseImplementationRSS
rls.ParseString(item.Title)
if j.Feed.Settings != nil && j.Feed.Settings.DownloadType == domain.FeedDownloadTypeMagnet {
rls.MagnetURI = item.Link
rls.DownloadURL = ""
}
if len(item.Enclosures) > 0 {
e := item.Enclosures[0]
if e.Type == "application/x-bittorrent" && e.URL != "" {
rls.DownloadURL = e.URL
}
if e.Length != "" && e.Length != "1" && e.Length != "39399" {
rls.ParseSizeBytesString(e.Length)
}
if j.Feed.Settings != nil && j.Feed.Settings.DownloadType == domain.FeedDownloadTypeMagnet {
if !strings.HasPrefix(rls.MagnetURI, "magnet:?") && strings.HasPrefix(e.URL, "magnet:?") {
rls.MagnetURI = e.URL
rls.DownloadURL = ""
}
}
}
if rls.DownloadURL == "" && item.Link != "" {
rls.DownloadURL = item.Link
}
if rls.DownloadURL != "" {
// handle no baseurl with only relative url
// grab url from feed url and create full url
if parsedURL, _ := url.Parse(rls.DownloadURL); parsedURL != nil && len(parsedURL.Hostname()) == 0 {
if parentURL, _ := url.Parse(j.URL); parentURL != nil {
parentURL.Path, parentURL.RawPath = "", ""
// unescape the query params for max compatibility
escapedUrl, _ := url.QueryUnescape(parentURL.JoinPath(rls.DownloadURL).String())
rls.DownloadURL = escapedUrl
}
}
}
for _, v := range item.Categories {
rls.Categories = append(rls.Categories, item.Categories...)
if len(rls.Category) != 0 {
rls.Category += ", "
}
rls.Category += v
}
for _, v := range item.Authors {
if len(rls.Uploader) != 0 {
rls.Uploader += ", "
}
rls.Uploader += v.Name
}
// When custom->size and enclosures->size differ, `ParseSizeBytesString` will pick the largest one.
if size, ok := item.Custom["size"]; ok {
rls.ParseSizeBytesString(size)
}
// additional size parsing
// some feeds have a fixed size for enclosure so lets check for custom elements
// and parse size from there if it differs
if customTorrent, ok := item.Custom["torrent"]; ok {
var element itemCustomElement
if err := xml.Unmarshal([]byte("<torrent>"+customTorrent+"</torrent>"), &element); err != nil {
j.Log.Error().Err(err).Msg("could not unmarshal item.Custom.Torrent")
}
if element.ContentLength > 0 {
if uint64(element.ContentLength) > rls.Size {
rls.Size = uint64(element.ContentLength)
}
}
if rls.TorrentHash == "" && element.InfoHash != "" {
rls.TorrentHash = element.InfoHash
}
}
// basic freeleech parsing
if isFreeleech([]string{item.Title, item.Description}) {
rls.Freeleech = true
rls.Bonus = []string{"Freeleech"}
}
if item.Description != "" {
rls.Description = item.Description
readSizeFromDescription(item.Description, rls)
j.Log.Trace().Msgf("Set new size %d from description", rls.Size)
}
// add cookie to release for download if needed
if j.Feed.Cookie != "" {
rls.RawCookie = j.Feed.Cookie
}
return rls
}
func (j *RSSJob) getFeed(ctx context.Context) (items []*gofeed.Item, err error) {
ctx, cancel := context.WithTimeout(ctx, j.Timeout)
defer cancel()
feed, err := NewFeedParser(j.Timeout, j.Feed.Cookie).ParseURLWithContext(ctx, j.URL)
if err != nil {
return nil, errors.Wrap(err, "error fetching rss feed items")
}
// get feed as JSON string
feedData := feed.String()
if err := j.Repo.UpdateLastRunWithData(ctx, j.Feed.ID, feedData); err != nil {
j.Log.Error().Err(err).Msgf("error updating last run for feed id: %v", j.Feed.ID)
}
j.Log.Debug().Msgf("refreshing rss feed: %v, found (%d) items", j.Name, len(feed.Items))
if len(feed.Items) == 0 {
return
}
//sort.Sort(feed)
toCache := make([]domain.FeedCacheItem, 0)
// set ttl to 1 month
ttl := time.Now().AddDate(0, 1, 0)
for _, i := range feed.Items {
item := i
key := item.GUID
if len(key) == 0 {
key = item.Link
if len(key) == 0 {
key = item.Title
}
}
exists, err := j.CacheRepo.Exists(j.Feed.ID, key)
if err != nil {
j.Log.Error().Err(err).Msg("could not check if item exists")
continue
}
if exists {
j.Log.Trace().Msgf("cache item exists, skipping release: %s", item.Title)
continue
}
j.Log.Debug().Msgf("found new release: %s", i.Title)
toCache = append(toCache, domain.FeedCacheItem{
FeedId: strconv.Itoa(j.Feed.ID),
Key: key,
Value: []byte(i.Title),
TTL: ttl,
})
// only append if we successfully added to cache
items = append(items, item)
}
if len(toCache) > 0 {
go func(items []domain.FeedCacheItem) {
ctx := context.Background()
if err := j.CacheRepo.PutMany(ctx, items); err != nil {
j.Log.Error().Err(err).Msg("cache.PutMany: error storing items in cache")
}
}(toCache)
}
// send to filters
return
}
func isNewerThanMaxAge(maxAge int, item, now time.Time) bool {
// now minus max age
nowMaxAge := now.Add(time.Duration(-maxAge) * time.Second)
if item.After(nowMaxAge) {
return true
}
return false
}
// isFreeleech basic freeleech parsing
func isFreeleech(str []string) bool {
for _, s := range str {
match := rxpFreeleech.FindAllString(s, -1)
if len(match) > 0 {
return true
}
}
return false
}
// readSizeFromDescription get size from description
func readSizeFromDescription(str string, r *domain.Release) {
clean := rxpHTML.ReplaceAllString(str, " ")
for _, sz := range rxpSize.FindAllString(clean, -1) {
r.ParseSizeBytesString(sz)
}
}
// itemCustomElement
// used for some feeds like Aviztas network
type itemCustomElement struct {
ContentLength int64 `xml:"contentLength"`
InfoHash string `xml:"infoHash"`
}