This repository has been archived by the owner on Mar 1, 2021. It is now read-only.
/
inserter.go
94 lines (70 loc) · 2.04 KB
/
inserter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package inserter
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/jinzhu/gorm"
// necessary for gorm :pointup:
_ "github.com/jinzhu/gorm/dialects/postgres"
dbUtils "github.com/codeuniversity/smag-mvp/db"
"github.com/codeuniversity/smag-mvp/twitter/models"
"github.com/codeuniversity/smag-mvp/utils"
"github.com/codeuniversity/smag-mvp/worker"
"github.com/segmentio/kafka-go"
)
// Inserter represents the scraper containing all clients it uses
type Inserter struct {
*worker.Worker
qReader *kafka.Reader
db *gorm.DB
}
// New returns an initilized inserter
func New(postgresHost, postgresPassword, dbName string, qReader *kafka.Reader) *Inserter {
i := &Inserter{}
i.qReader = qReader
connectionString := fmt.Sprintf("host=%s user=postgres dbname=%s sslmode=disable", postgresHost, dbName)
if postgresPassword != "" {
connectionString += " " + "password=" + postgresPassword
}
db, err := gorm.Open("postgres", connectionString)
utils.PanicIfNotNil(err)
i.db = db // use db.Debug() here to get detailed gorm logs
db.AutoMigrate(&models.TwitterUser{})
b := worker.Builder{}.WithName("twitter_inserter_users").
WithWorkStep(i.runStep).
WithStopTimeout(10*time.Second).
AddShutdownHook("qReader", qReader.Close).
AddShutdownHook("postgres_client", db.Close)
i.Worker = b.MustBuild()
return i
}
func (i *Inserter) runStep() error {
m, err := i.qReader.FetchMessage(context.Background())
if err != nil {
return err
}
rawUser := &models.TwitterUserRaw{}
err = json.Unmarshal(m.Value, rawUser)
if err != nil {
return err
}
user := models.ConvertTwitterUser(rawUser)
log.Println("inserting user: ", user.Username)
err = i.insertUser(user)
if err != nil {
return err
}
return i.qReader.CommitMessages(context.Background(), m)
}
func (i *Inserter) insertUser(user *models.TwitterUser) error {
var err error
baseUser := &models.TwitterUser{}
filter := &models.TwitterUser{Username: user.Username}
err = dbUtils.CreateOrUpdate(i.db, baseUser, filter, user)
if err != nil {
return err
}
return nil
}