/
cmd_load.go
400 lines (361 loc) · 12 KB
/
cmd_load.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
package main
import (
"flag"
"io"
path "path/filepath"
"strings"
"github.com/kr/text"
"github.com/BurntSushi/ty/fun"
"github.com/BurntSushi/goim/imdb"
)
var (
flagLoadDownload = ""
flagLoadUrls = false
flagLoadLists = "movies"
flagWarnings = false
)
// loadLists is the set of all list names that may be passed on the command
// line to be updated. Note that this list also specifies the *order* in
// which lists are updated, which is respected regardless of the order given
// on the command line. (This is important because tables like 'movies' should
// always be updated before their corresponding attribute tables.)
var loadLists = []string{
"movies", "actors",
"release-dates", "running-times", "aka-titles",
"alternate-versions", "color-info", "mpaa-ratings-reasons", "sound-mix",
"genres", "taglines", "trivia", "goofs", "language", "literature",
"locations", "movie-links", "quotes", "plot", "ratings",
}
type listHandler func(*imdb.DB, *atomizer, io.ReadCloser) error
var simpleLoaders = map[string]listHandler{
"release-dates": listReleaseDates,
"running-times": listRunningTimes,
"aka-titles": listAkaTitles,
"alternate-versions": listAlternateVersions,
"color-info": listColorInfo,
"mpaa-ratings-reasons": listMPAARatings,
"sound-mix": listSoundMixes,
"genres": listGenres,
"taglines": listTaglines,
"trivia": listTrivia,
"goofs": listGoofs,
"language": listLanguages,
"literature": listLiterature,
"locations": listLocations,
"movie-links": listMovieLinks,
"quotes": listQuotes,
"plot": listPlots,
"ratings": listRatings,
// Functions for loading movies and actors are excluded from this list
// since they require some special attention.
}
var cmdLoad = &command{
name: "load",
positionalUsage: "[ berlin | digital | funet | uiuc | " +
"ftp://... | http://... | dir ]",
shortHelp: "creates/updates database with IMDb data",
help: `
This command loads the current database with the contents of the IMDb
database given. It may be a named FTP location, an FTP url, an HTTP url or
a directory on the local file system. Regardless of how the location is
specified, it must point to a directory (whether remote or local) containing
IMDb gzipped list files.
By default, the 'berlin' public FTP site is used and only the 'movies' table
is updated. To update more tables, use the '-lists' flag. It is better to
specify as many lists as possible, since they can be updated in parallel.
This command can create a database from scratch or it can update an existing
one. The update procedure is pretty brutish; in most cases, it truncates the
table it's updating and rebuilds it. The only tables that are immune to this
sort of treatment are 'atom' and 'name'. Therefore, the surrogate primary keys
are preserved across updating *if and only if* the primary keys provided by
IMDb don't change. Unfortunately, IMDb primary keys can change (for example,
by adding a title to an episode). This results in stale rows in the 'atom' and
'name' tables (but will be hidden from search results).
`,
flags: flag.NewFlagSet("load", flag.ExitOnError),
run: cmd_load,
addFlags: func(c *command) {
c.flags.StringVar(&flagLoadDownload, "download", flagLoadDownload,
"When set, the data retrieved will be stored in the directory\n"+
"specified. Then goim will quit.")
c.flags.BoolVar(&flagLoadUrls, "urls", flagLoadUrls,
"When set, the URLs for downloading the lists specified will\n"+
"be printed to stdout, each on their own line. Then goim\n"+
"will quit.")
lists := text.Wrap(strings.Join(loadLists, ", "), 80)
c.flags.StringVar(&flagLoadLists, "lists", flagLoadLists,
"Set to a comma separated list of IMDB movie lists to load, with\n"+
"no whitespace. Only lists named here will be loaded. If not\n"+
"specified, then only the 'movie' list is loaded.\n"+
"Use 'all' to load all lists or 'attr' to load all attribute\n"+
"lists (e.g., quotes, running times, etc.).\n"+
"Available lists: "+lists)
c.flags.BoolVar(&flagWarnings, "warn", flagWarnings,
"When set, warnings messages about the data will be shown.\n"+
"When enabled, this can produce a lot of output saying that\n"+
"an identifier could not be found for some entries. This is\n"+
"(likely) a result of inconsistent data in IMDb's text files.")
},
}
func cmd_load(c *command) bool {
driver, dsn := c.dbinfo()
db := openDb(driver, dsn)
defer closeDb(db)
// With SQLite, we can get some performance benefit by disabling
// synchronous writes.
// It is still safe from application crashes (e.g., bugs in Goim), but
// not safe from power failures or operating system crashes.
// I think we're OK with that, right?
if db.Driver == "sqlite3" {
_, err := db.Exec("PRAGMA synchronous = OFF")
if err != nil {
pef("Could not disable SQLite synchronous mode: %s", err)
return false
}
}
// Figure out which lists we're loading and make sure each list name is
// valid before proceeding.
var userLoadLists []string
if flagLoadLists == "all" {
userLoadLists = loadLists
} else if flagLoadLists == "attr" {
for _, name := range loadLists {
if name == "movies" || name == "actors" {
continue
}
userLoadLists = append(userLoadLists, name)
}
} else {
for _, name := range strings.Split(flagLoadLists, ",") {
name = strings.ToLower(strings.TrimSpace(name))
if !fun.In(name, loadLists) {
pef("%s is not a valid list name. See 'goim help load'.", name)
return false
}
userLoadLists = append(userLoadLists, name)
}
}
// Build the "fetcher" to retrieve lists (whether it be from the file
// system, HTTP or FTP).
getFrom := c.flags.Arg(0)
if len(getFrom) == 0 {
getFrom = "berlin"
}
// Just print the URLs to download.
if flagLoadUrls {
fetch := newFetcher(getFrom)
if fetch == nil {
return false
}
for _, list := range userLoadLists {
pf("%s\n", fetch.location(list))
if list == "actors" {
pf("%s\n", fetch.location("actresses"))
}
}
return true
}
// If we're downloading, then just do that and quit.
if len(flagLoadDownload) > 0 {
// We're just saving to disk, so no need to decompress. Get a plain
// fetcher.
fetch := newFetcher(getFrom)
if fetch == nil {
return false
}
download := func(name string) struct{} {
if err := downloadList(fetch, name); err != nil {
pef("%s", err)
}
if name == "actors" {
if err := downloadList(fetch, "actresses"); err != nil {
pef("%s", err)
}
}
return struct{}{}
}
conns := maxFtpConns
if flagCpu < conns {
conns = flagCpu
}
fun.ParMapN(download, userLoadLists, conns)
return true
}
// We'll be reading, so get a gzip fetcher.
fetch := newGzipFetcher(getFrom)
if fetch == nil {
return false
}
// Get the tables with indices corresponding to the lists we're updating.
tables, err := tablesFromLists(db, userLoadLists)
if err != nil {
pef("%s", err)
return false
}
logf("Dropping indices for: %s", strings.Join(tables, ", "))
if err := db.DropIndices(tables...); err != nil {
pef("Could not drop indices: %s", err)
return false
}
// Before launching into loading---which can be done in parallel---we need
// to load movies and actors first since they insert data that most of the
// other lists depend on. Also, they cannot be loaded in parallel since
// they are the only loaders that *add* atoms to the database.
if in := loaderIndex("movies", userLoadLists); in > -1 {
if err := loadMovies(driver, dsn, fetch); err != nil {
pef("%s", err)
return false
}
userLoadLists = append(userLoadLists[:in], userLoadLists[in+1:]...)
}
if in := loaderIndex("actors", userLoadLists); in > -1 {
if err := loadActors(driver, dsn, fetch); err != nil {
pef("%s", err)
return false
}
userLoadLists = append(userLoadLists[:in], userLoadLists[in+1:]...)
}
// This must be done after movies/actors are loaded so that we get all
// of their atoms.
if len(userLoadLists) > 0 {
logf("Reading atom identifiers from database...")
atoms, err := newAtomizer(db, nil) // read-only
if err != nil {
pef("%s", err)
return false
}
simpleLoad := func(name string) bool {
loader := simpleLoaders[name]
if loader == nil {
// This is a bug since we should have verified all list names.
logf("BUG: %s does not have a simpler loader.", name)
return true
}
db := openDb(driver, dsn)
defer closeDb(db)
list, err := fetch.list(name)
if err != nil {
pef("%s", err)
return false
}
defer list.Close()
if err := loader(db, atoms, list); err != nil {
pef("Could not store %s list: %s", name, err)
return false
}
return true
}
// SQLite doesn't handle concurrent writes very well, so force it
// to be single-threaded. Also, we've got to limit connections if
// we're fetching from FTP too.
maxConcurrent := flagCpu
if db.Driver == "sqlite3" {
maxConcurrent = 1
} else {
switch fetch.(gzipFetcher).fetcher.(type) {
case ftpFetcher:
maxConcurrent = maxFtpConns
}
}
fun.ParMapN(simpleLoad, userLoadLists, maxConcurrent)
}
logf("Creating indices for: %s", strings.Join(tables, ", "))
if err := db.CreateIndices(tables...); err != nil {
pef("Could not create indices: %s", err)
return false
}
return true
}
func downloadList(fetch fetcher, name string) error {
list, err := fetch.list(name)
if err != nil {
return err
}
defer list.Close()
saveto := path.Join(flagLoadDownload, sf("%s.list.gz", name))
logf("Downloading %s to %s...", name, saveto)
f := createFile(saveto)
if _, err := io.Copy(f, list); err != nil {
return ef("Could not save '%s' to disk: %s", name, err)
}
return nil
}
func loadMovies(driver, dsn string, fetch fetcher) error {
list, err := fetch.list("movies")
if err != nil {
return err
}
defer list.Close()
db := openDb(driver, dsn)
defer closeDb(db)
if err := listMovies(db, list); err != nil {
return ef("Could not store movies list: %s", err)
}
return nil
}
func loadActors(driver, dsn string, fetch fetcher) error {
list1, err := fetch.list("actors")
if err != nil {
return err
}
defer list1.Close()
list2, err := fetch.list("actresses")
if err != nil {
return err
}
defer list2.Close()
db := openDb(driver, dsn)
defer closeDb(db)
if err := listActors(db, list1, list2); err != nil {
return ef("Could not store actors/actresses list: %s", err)
}
return nil
}
func loaderIndex(name string, userList []string) int {
name = strings.ToLower(name)
for i, load := range userList {
load = strings.TrimSpace(load)
if name == strings.ToLower(load) {
return i
}
}
return -1
}
// Returns a list of tables that have indices which are modified by updating
// the lists given.
// Each table name will only appear once.
//
// If the name/atom table has more than 0 rows, then it will not be included
// in the list returned. This prevents rebuilding the indices on each update,
// which usually contains nominal additions to the name/atom table.
func tablesFromLists(db *imdb.DB, lists []string) (tables []string, err error) {
var pre []string
for _, name := range lists {
tablesForList, ok := listTables[name]
if !ok {
return nil, ef("BUG: Could not find tables for list %s", name)
}
pre = append(pre, tablesForList...)
}
pre = fun.Keys(fun.Set(pre)).([]string)
updatingEmpty := func(table string) bool {
return rowCount(db, table) == 0 && fun.In(table, pre)
}
for _, table := range pre {
switch table {
case "atom", "name":
// This is a little complex. Basically, we want to avoid rebuilding
// indices for incremental updates. So we only let it happen when
// we're updating the actor or movie lists from scratch.
// (In general, this should apply to any table that is updated
// concurrently with name/atom. We exclude tvshow and episode since
// they are only updated when movie is updated.)
if updatingEmpty("actor") || updatingEmpty("movie") {
tables = append(tables, table)
}
default:
tables = append(tables, table)
}
}
return
}