Skip to content
Permalink
Browse files

Modify query and mutation to upsert (#21)

  • Loading branch information...
animesh2049 committed Aug 28, 2019
1 parent 024f970 commit b77b366c66c2805eabaf7828bd08ece68705a7c5
Showing with 46 additions and 138 deletions.
  1. +46 −138 main.go
184 main.go
@@ -44,30 +44,6 @@ import (
const (
cTimeFormat = "Mon Jan 02 15:04:05 -0700 2006"
cDgraphTimeFormat = "2006-01-02T15:04:05.999999999+10:00"

cDgraphTweetQuery = `
query all($tweetID: string) {
all(func: eq(id_str, $tweetID)) {
uid
}
}
`

cDgraphUserQuery = `
query all($userID: string) {
all(func: eq(user_id, $userID)) {
uid
user_id
user_name
screen_name
description
friends_count
verified
profile_banner_url
profile_image_url
}
}
`
)

var (
@@ -128,6 +104,41 @@ type twitterTweet struct {
Retweet bool `json:"retweet"`
}

func buildQuery(tweet *twitterTweet) string {
tweetQuery := `t as var(func: eq(id_str, "%s"))`
userQuery := `%s as var(func: eq(user_id, "%s"))`

query := make([]string, len(tweet.Mention)+2)

query[0] = fmt.Sprintf(tweetQuery, tweet.IDStr)
tweet.UID = "uid(t)"

query[1] = fmt.Sprintf(userQuery, "u", tweet.Author.UserID)
tweet.Author.UID = "uid(u)"

usersMap := make(map[string]string)
usersMap[tweet.Author.UserID] = "u"

// We will query only once for every user. We are storing all the users in the map who
// we have already queried. If a user_id is repeated, we will just use uid that we got
// in the previous query.
for i, user := range tweet.Mention {
var varName string
if name, ok := usersMap[user.UserID]; ok {
varName = name
} else {
varName = fmt.Sprintf("m%d", i+1)
query[i+2] = fmt.Sprintf("%s as var(func: eq(user_id, %s))", varName, user.UserID)
usersMap[user.UserID] = varName
}

tweet.Mention[i].UID = fmt.Sprintf("uid(%s)", varName)
}

finalQuery := fmt.Sprintf("query {%s}", strings.Join(query, "\n"))
return finalQuery
}

func runInserter(alphas []api.DgraphClient, c *y.Closer, tweets <-chan interface{}) {
defer c.Done()

@@ -159,10 +170,7 @@ func runInserter(alphas []api.DgraphClient, c *y.Closer, tweets <-chan interface
// txn is not being discarded deliberately
// defer txn.Discard()

if errTweet := updateFilteredTweet(ft, txn); errTweet != nil {
atomic.AddUint32(&stats.ErrorsDgraph, 1)
continue
}
queryStr := buildQuery(ft)

tweet, err := json.Marshal(ft)
if err != nil {
@@ -178,8 +186,16 @@ func runInserter(alphas []api.DgraphClient, c *y.Closer, tweets <-chan interface
// only ONE retry attempt is made
retry := true
RETRY:
apiMutation := &api.Mutation{SetJson: tweet, CommitNow: commitNow}
_, err = txn.Mutate(context.Background(), apiMutation)
apiUpsert := &api.Request{
Mutations: []*api.Mutation{
&api.Mutation{
SetJson: tweet,
},
},
CommitNow: commitNow,
Query: queryStr,
}
_, err = txn.Do(context.Background(), apiUpsert)
switch {
case err == nil:
if commitNow {
@@ -262,114 +278,6 @@ func filterTweet(jsn interface{}) (*twitterTweet, error) {
}, nil
}

func updateFilteredTweet(ft *twitterTweet, txn *dgo.Txn) error {
// first ensure that tweet doesn't exists
resp, err := txn.QueryWithVars(context.Background(), cDgraphTweetQuery,
map[string]string{"$tweetID": ft.IDStr})
if err != nil {
return err
}
var r struct {
All []struct {
UID string `json:"uid"`
} `json:"all"`
}
if err := json.Unmarshal(resp.Json, &r); err != nil {
return err
}

// possible duplicate, shouldn't happen
if len(r.All) > 0 {
log.Println("found duplicate tweet with id:", ft.IDStr)
return errShouldNotReach
}

// map to check for duplicates
users := make(map[string]string)

userID := ft.Author.UserID
if u, err := queryUser(txn, &ft.Author); err != nil {
return err
} else if u != nil {
ft.Author = *u
}
users[userID] = ft.Author.UID

userMentions := make([]twitterUser, 0)
for i, m := range ft.Mention {
if dup, ok := users[m.UserID]; ok && dup != "" {
userMentions = append(userMentions, twitterUser{UID: dup})
continue
} else if ok && dup == "" {
// TODO: find a way to not ignore this mention
continue
}

userID := m.UserID
if u, err := queryUser(txn, &m); err != nil {
return err
} else if u != nil {
ft.Mention[i] = *u
}
userMentions = append(userMentions, ft.Mention[i])
users[userID] = m.UID
}
ft.Mention = userMentions

return nil
}

func equalsUser(src, dst *twitterUser) bool {
return src.UserID == dst.UserID &&
src.UserName == dst.UserName &&
src.ScreenName == dst.ScreenName &&
src.Description == dst.Description &&
src.FriendsCount == dst.FriendsCount &&
src.Verified == dst.Verified &&
src.ProfileBannerURL == dst.ProfileBannerURL &&
src.ProfileImageURL == dst.ProfileImageURL
}

func queryUser(txn *dgo.Txn, src *twitterUser) (*twitterUser, error) {
resp, err := txn.QueryWithVars(context.Background(), cDgraphUserQuery,
map[string]string{"$userID": src.UserID})
if err != nil {
return nil, err
}

var r struct {
All []twitterUser `json:"all"`
}
if err := json.Unmarshal(resp.Json, &r); err != nil {
return nil, err
}

if len(r.All) > 1 {
log.Println("found duplicate users in Dgraph with id:", r.All[0].UserID)
return nil, errShouldNotReach
} else if len(r.All) == 0 {
return nil, nil
} else if len(r.All) == 1 && !equalsUser(src, &r.All[0]) {
return &r.All[0], nil
} else {
return &twitterUser{UID: r.All[0].UID}, nil
}
}

func getTrends(id int64, api *anaconda.TwitterApi) ([]string, error) {
resp, err := api.GetTrendsByPlace(id, nil)
if err != nil {
return nil, err
}

trends := make([]string, len(resp.Trends))
for i, t := range resp.Trends {
trends[i] = t.Name
}

return trends, nil
}

func readCredentials(path string) twitterCreds {
jsn, err := ioutil.ReadFile(path)
checkFatal(err, "Unable to open twitter credentials file '%s'", path)

0 comments on commit b77b366

Please sign in to comment.
You can’t perform that action at this time.