This repository has been archived by the owner on Aug 17, 2022. It is now read-only.
/
database.go
executable file
·153 lines (139 loc) · 4.4 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package main
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"log"
"os"
)
var setupDatabase = dbSetupWithCtx
var lastVersionNum = dbLastVersionNum
// dbSetupWithCtx sets up the database from environment variables and
// checks connection conditions.
func dbSetupWithCtx(ctx *context) (string, error) {
var err error
// Setup RDS db from env variables
rdsHostname := os.Getenv("RDS_HOSTNAME")
rdsPort := os.Getenv("RDS_PORT")
rdsDbName := os.Getenv("RDS_DB_NAME")
rdsUsername := os.Getenv("RDS_USERNAME")
rdsPassword := os.Getenv("RDS_PASSWORD")
sourceName := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s",
rdsUsername, rdsPassword, rdsHostname, rdsPort, rdsDbName)
log.Print("DB connection string: " + sourceName)
if ctx.db, err = sql.Open("mysql", sourceName); err != nil {
return sourceName, handle("Failed to set up database opener", err)
}
if err = ctx.db.Ping(); err != nil {
return sourceName, handle("Failed to ping database", err)
}
dbCreateTable(ctx)
log.Print("Successfully checked database.")
return sourceName, err
}
// dbCreateTable creates the table and schema in the db if not present.
func dbCreateTable(ctx *context) {
query := "CREATE TABLE IF NOT EXISTS entries (" +
"PathName VARCHAR(500) NOT NULL, " +
"VersionNum INT NOT NULL, " +
"DateModified DATETIME, " +
"ArchiveKey VARCHAR(50), " +
"PRIMARY KEY (PathName, VersionNum));"
if _, err := ctx.db.Exec(query); err != nil {
log.Print(err)
log.Fatal("Failed to find or create table.")
}
}
// dbArchiveFile updates the old db entry with a new archive blob for
// reference.
func dbArchiveFile(ctx *context, file string, key string, num int) error {
query := fmt.Sprintf(
"update entries set ArchiveKey='%s' where "+
"PathName='%s' and VersionNum=%d;", key, file, num)
log.Print("db query: " + query)
_, err := ctx.db.Exec("update entries set ArchiveKey=? where "+
"PathName=? and VersionNum=?;", key, file, num)
if err != nil {
return handle("Error in updating db entry.", err)
}
return err
}
// dbGetModTime gets the modified time for the latest file version recorded in
// the database.
func dbGetModTime(ctx *context, file string) (string, error) {
var res string
err := ctx.db.QueryRow("select DateModified from entries "+
"where PathName=? and DateModified is not null order by VersionNum desc",
file).Scan(&res)
switch {
case err == sql.ErrNoRows:
log.Print("No entries found for: " + file)
return "", nil
case err != nil:
return "", handle("Error in querying database.", err)
}
return res, err
}
// dbNewVersion handles one file with a new version on disk. Sets the version
// number for the new entry. Gets the datetime modified from the FTP server as
// a workaround for the lack of original date modified times after syncing to
// S3. Adds the new entry into the db.
func dbNewVersion(ctx *context, pathName string,
cache map[string]map[string]string) error {
var err error
log.Print("Handling new version of: " + pathName)
// Set version number
versionNum := 1
prevNum := lastVersionNum(ctx, pathName, true)
if prevNum > -1 {
// Some version already exists
versionNum = prevNum + 1
}
// Set datetime modified using directory listing cache
modTime := getModTime(pathName, cache)
// Insert into database
if modTime != "" {
_, err = ctx.db.Exec("insert into entries(PathName, "+
"VersionNum, DateModified) values(?, ?, ?)", pathName,
versionNum, modTime)
} else {
_, err = ctx.db.Exec("insert into entries(PathName, "+
"VersionNum) values(?, ?)", pathName, versionNum)
}
if err != nil {
return handle("Error in new version insertion query", err)
}
return err
}
// dbLastVersionNum finds the latest version number of the file in the db.
func dbLastVersionNum(ctx *context, file string, inclArchive bool) int {
num := -1
var err error
var rows *sql.Rows
// Query
if inclArchive {
rows, err = ctx.db.Query("select VersionNum from entries "+
"where PathName=? order by VersionNum desc", file)
} else {
// Specify not to include archived entries
rows, err = ctx.db.Query("select VersionNum from entries "+
"where PathName=? and ArchiveKey is null order by VersionNum desc",
file)
}
if err != nil {
errOut("Error in getting VersionNum.", err)
return num
}
defer func() {
if err = rows.Close(); err != nil {
errOut("Error in closing rows", err)
}
}()
if rows.Next() {
err = rows.Scan(&num)
if err != nil {
errOut("Error scanning row.", err)
}
}
return num
}