Skip to content

Commit

Permalink
[#] lambda_worker: Add twitter OAuth key retrieve job type
Browse files Browse the repository at this point in the history
  • Loading branch information
nykma committed Sep 16, 2023
1 parent c83e401 commit 3b0a266
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 80 deletions.
58 changes: 41 additions & 17 deletions cmd/lambda_worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,19 @@ func handler(ctx context.Context, sqs_event events.SQSEvent) (events.SQSEventRes
case types.QueueActions.ArweaveUpload:
arweaveMsgs[message.Persona] = raw_message.MessageId
case types.QueueActions.Revalidate:
if err := revalidate_single(ctx, &message); err != nil {
if err := revalidateSingle(ctx, &message); err != nil {
fmt.Printf("error revalidating proof record %d: %s\n", message.ProofID, err)
// Ignore failed revalidation job since failed job will still update DB.
// failures = append(failures, events.SQSBatchItemFailure{ItemIdentifier: raw_message.MessageId})
}
case types.QueueActions.TwitterOAuthTokenAcquire:
{
err := twitterRetrieveOAuthToken()
if err != nil {
// Ignore errors for now
fmt.Printf("Error when retrieving Twitter OAuth key: %s", err.Error())
}
}
default:
logrus.Warnf("unsupported queue action: %s", message.Action)
failures = append(failures, events.SQSBatchItemFailure{ItemIdentifier: raw_message.MessageId})
Expand Down Expand Up @@ -137,7 +145,7 @@ func arweave_upload_many(personas []string) error {
break
}

item, err := arweave_bundle_single(pc, previous)
item, err := arweaveBundleSingle(pc, previous)
if err != nil {
logrus.Errorf("error marshalling proof chain %s: %w", pc.Uuid, err)
break
Expand Down Expand Up @@ -169,7 +177,7 @@ func arweave_upload_many(personas []string) error {
return nil
}

func arweave_bundle_single(pc *model.ProofChain, previous *model.ProofChain) (*artypes.BundleItem, error) {
func arweaveBundleSingle(pc *model.ProofChain, previous *model.ProofChain) (*artypes.BundleItem, error) {
previousUuid := ""
previousArweaveID := ""
if previous != nil {
Expand Down Expand Up @@ -224,7 +232,7 @@ func arweave_bundle_single(pc *model.ProofChain, previous *model.ProofChain) (*a
return &item, nil
}

func revalidate_single(ctx context.Context, message *types.QueueMessage) error {
func revalidateSingle(ctx context.Context, message *types.QueueMessage) error {
proof := model.Proof{}
tx := model.DB.Preload("ProofChain").Preload("ProofChain.Previous").Where("id = ?", message.ProofID).First(&proof)
if tx.Error != nil {
Expand All @@ -233,15 +241,15 @@ func revalidate_single(ctx context.Context, message *types.QueueMessage) error {
return proof.Revalidate()
}

func init_db(cfg aws.Config) {
func initDB(cfg aws.Config) {
model.Init(false) // TODO: should read auto migrate from ENV
}

// func init_sqs(cfg aws.Config) {
// sqs.Init(cfg)
// }

func init_validators() {
func initValidators() {
twitter.Init()
ethereum.Init()
keybase.Init()
Expand All @@ -264,19 +272,19 @@ func init() {
logrus.Fatalf("Unable to load AWS config: %s", err)
}
common.CurrentRuntime = common.Runtimes.Lambda
init_config_from_aws_secret()
initConfigFromAWSSecret()
logrus.SetLevel(logrus.InfoLevel)

init_db(cfg)
initDB(cfg)
// init_sqs(cfg)
init_validators()
initValidators()
}

func init_config_from_aws_secret() {
func initConfigFromAWSSecret() {
if initialized {
return
}
secret_name := getE("SECRET_NAME", "")
secretName := getE("SECRET_NAME", "")
region := getE("SECRET_REGION", "")

// Create a Secrets Manager client
Expand All @@ -290,7 +298,7 @@ func init_config_from_aws_secret() {

client := secretsmanager.NewFromConfig(cfg)
input := secretsmanager.GetSecretValueInput{
SecretId: aws.String(secret_name),
SecretId: aws.String(secretName),
VersionStage: aws.String("AWSCURRENT"),
}
result, err := client.GetSecretValue(context.Background(), &input)
Expand Down Expand Up @@ -319,17 +327,33 @@ func init_config_from_aws_secret() {
initialized = true
}

func getE(env_key, default_value string) string {
result := os.Getenv(env_key)
func getE(envKey, defaultValue string) string {
result := os.Getenv(envKey)
if len(result) == 0 {
if len(default_value) > 0 {
return default_value
if len(defaultValue) > 0 {
return defaultValue
} else {
logrus.Fatalf("ENV %s must be given! Abort.", env_key)
logrus.Fatalf("ENV %s must be given! Abort.", envKey)
return ""
}

} else {
return result
}
}

func twitterRetrieveOAuthToken() (err error) {
type TokenList struct {
Tokens []twitter.Tokens `json:"tokens"`
}
// TODO: Retrieve existed token from a storage space (i.e., KV / S3)

tokens, err := twitter.GenerateOauthToken()
if err != nil {
return err
}
fmt.Printf("TWITTER OAUTH KEY REGISTERED: %+v", *tokens)

// TODO: save new token to a storage space (i.e., KV / S3)
return nil
}
10 changes: 6 additions & 4 deletions types/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package types
type QueueAction string

var QueueActions = struct {
Revalidate QueueAction
ArweaveUpload QueueAction
Revalidate QueueAction
ArweaveUpload QueueAction
TwitterOAuthTokenAcquire QueueAction
}{
Revalidate: "revalidate",
ArweaveUpload: "arweave_upload",
Revalidate: "revalidate",
ArweaveUpload: "arweave_upload",
TwitterOAuthTokenAcquire: "twitter_oauth_token_acquire",
}

// QueueMessage indicates structure of messages in Amazon SQS.
Expand Down
106 changes: 60 additions & 46 deletions validator/twitter/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/nextdotid/proof_server/util"
"github.com/samber/lo"
"golang.org/x/xerrors"
)
Expand All @@ -20,16 +21,18 @@ type APIResponse struct {
Text string `json:"text"`
}

const (
BASIC_AUTH_USERNAME = "3rJOl1ODzm9yZy63FACdg"
BASIC_AUTH_PASSWORD = "5jPoQ5kQvMJFDYRNE8bQ4rHuds4xJqhvgNJM4awaE8"
)
type Tokens struct {
AccessToken string `json:"access_token"`
GuestToken string `json:"guest_token"`
FlowToken string `json:"flow_token"`
OAuthKey string `json:"oauth_key"`
OAuthSecret string `json:"oauth_secret"`
CreatedAt string `json:"created_at"`
}

var (
// TODO: should save accessToken to somewhere else (shared by all Lambda instances)
accessToken string
guestToken string
flowToken string
const (
TWITTER_CONSUMER_KEY = "3rJOl1ODzm9yZy63FACdg"
TWITTER_CONSUMER_SECRET = "5jPoQ5kQvMJFDYRNE8bQ4rHuds4xJqhvgNJM4awaE8"
)

func fetchPostWithAPI(id string, maxRetries int) (tweet *APIResponse, err error) {
Expand All @@ -38,7 +41,7 @@ func fetchPostWithAPI(id string, maxRetries int) (tweet *APIResponse, err error)
return nil, nil
}

func setHeaders(req *http.Request, setAccessToken, setGuestToken bool) {
func setHeaders(req *http.Request, tokens *Tokens, setAccessToken, setGuestToken bool) {
req.Header.Set("User-Agent", "TwitterAndroid/9.95.0-release.0 (29950000-r-0) ONEPLUS+A3010/9 (OnePlus;ONEPLUS+A3010;OnePlus;OnePlus3;0;;1;2016)")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
Expand All @@ -49,20 +52,23 @@ func setHeaders(req *http.Request, setAccessToken, setGuestToken bool) {
req.Header.Set("System-User-Agent", "Dalvik/2.1.0 (Linux; U; Android 9; ONEPLUS A3010 Build/PKQ1.181203.001)")
req.Header.Set("X-Twitter-Active-User", "yes")
if setGuestToken {
req.Header.Set("X-Guest-Token", guestToken)
req.Header.Set("X-Guest-Token", tokens.GuestToken)
}
if setAccessToken {
req.Header.Set("Authorization", "Bearer "+accessToken)
req.Header.Set("Authorization", "Bearer "+tokens.AccessToken)
}
}

func GetOauthToken() (err error) {
if flowToken == "" {
if err := getFlowToken(); err != nil {
return err
}
// GenerateOauthToken generates a new Twitter OAuth guest token
// which can be used in calling Official APIs.
func GenerateOauthToken() (tokens *Tokens, err error) {
tokens = new(Tokens)
tokens.CreatedAt = util.TimeToTimestampString(time.Now())

if err := tokens.getFlowToken(); err != nil {
return nil, err
}
l.Infof("Access token: %s\nGuest token: %s\nFlow token: %s\n", accessToken, guestToken, flowToken)
l.Infof("Access token: %s\nGuest token: %s\nFlow token: %s\n", tokens.AccessToken, tokens.GuestToken, tokens.FlowToken)

requestBody := fmt.Sprintf(`{
"flow_token": "%s",
Expand Down Expand Up @@ -128,22 +134,22 @@ func GetOauthToken() (err error) {
"location_permission_prompt": 2,
"notifications_permission_prompt": 4
}
}`, flowToken)
}`, tokens.FlowToken)

req, err := http.NewRequest("POST", "https://api.twitter.com/1.1/onboarding/task.json", strings.NewReader(requestBody))
if err != nil {
return err
return nil, err
}
setHeaders(req, true, true)
setHeaders(req, tokens, true, true)

resp, err := new(http.Client).Do(req)
if err != nil {
return err
return nil, err
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return err
return nil, err
}
l.Infof("Response: \n%s\n", body)
type ResponseSubtask struct {
Expand All @@ -163,36 +169,37 @@ func GetOauthToken() (err error) {
// Should be "success"
Status string `json:"status"`
// A new flow token, usually ends with ":3"
FlowToken string `json:"flow_token"`
FlowToken string `json:"flow_token"`
Subtasks []ResponseSubtask `json:"subtasks"`
}
response := new(Response)
err = json.Unmarshal(body, response)
if err != nil {
return err
return nil, err
}
if response.Errors != nil {
return xerrors.Errorf("error when getting oauth token: %+v", *response.Errors)
return nil, xerrors.Errorf("error when getting oauth token: %+v", *response.Errors)
}
if response.Status != "success" {
return xerrors.Errorf("wrong API status: %s", response.Status)
return nil, xerrors.Errorf("wrong API status: %s", response.Status)
}

st, found := lo.Find(response.Subtasks, func(subtask ResponseSubtask) bool {
return (subtask.OpenAccount != nil)
})
if !found {
return xerrors.Errorf("oauth token not found in response")
return nil, xerrors.Errorf("oauth token not found in response")
}
flowToken = response.FlowToken
// Update new FlowToken
tokens.FlowToken = response.FlowToken
l.Infof("OAUTH TOKEN REGISTERED: %s --- %s", st.OpenAccount.OauthToken, st.OpenAccount.OauthTokenSecret)

return nil
return tokens, nil
}

func getFlowToken() (err error) {
if guestToken == "" {
if err := getGuestToken(); err != nil {
func (tokens *Tokens) getFlowToken() (err error) {
if tokens.GuestToken == "" {
if err := tokens.getGuestToken(); err != nil {
return err
}
}
Expand Down Expand Up @@ -269,7 +276,7 @@ func getFlowToken() (err error) {
if err != nil {
return err
}
setHeaders(req, true, true)
setHeaders(req, tokens, true, true)

resp, err := new(http.Client).Do(req)
if err != nil {
Expand All @@ -293,24 +300,24 @@ func getFlowToken() (err error) {
return xerrors.Errorf("empty FlowToken")
}

flowToken = response.FlowToken
tokens.FlowToken = response.FlowToken
return nil
}

func getGuestToken() (err error) {
if guestToken != "" {
func (tokens *Tokens) getGuestToken() (err error) {
if tokens.GuestToken != "" {
return nil
}
if accessToken == "" {
if err = getAccessToken(); err != nil {
if tokens.AccessToken == "" {
if err = tokens.getAccessToken(); err != nil {
return err
}
}
req, err := http.NewRequest("POST", "https://api.twitter.com/1.1/guest/activate.json", nil)
if err != nil {
return err
}
setHeaders(req, true, false)
setHeaders(req, tokens, true, false)
type Response struct {
GuestToken string `json:"guest_token"`
}
Expand All @@ -333,13 +340,13 @@ func getGuestToken() (err error) {
if response.GuestToken == "" {
return xerrors.Errorf("Wrong guest token: %s", response.GuestToken)
}
guestToken = response.GuestToken
tokens.GuestToken = response.GuestToken

return nil
}

func getAccessToken() (err error) {
if accessToken != "" {
func (tokens *Tokens) getAccessToken() (err error) {
if tokens.AccessToken != "" {
return nil
}

Expand All @@ -351,8 +358,8 @@ func getAccessToken() (err error) {
if err != nil {
return err
}
setHeaders(req, false, false)
req.SetBasicAuth(BASIC_AUTH_USERNAME, BASIC_AUTH_PASSWORD)
setHeaders(req, tokens, false, false)
req.SetBasicAuth(TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET)
resp, err := new(http.Client).Do(req)
if err != nil {
return err
Expand All @@ -373,6 +380,13 @@ func getAccessToken() (err error) {
return xerrors.Errorf("Wrong bearer token: %s %s", response.TokenType, response.AccessToken)
}

accessToken = response.AccessToken
tokens.AccessToken = response.AccessToken
return nil
}

func (tokens *Tokens) IsExpired() bool {
const EXPIRED_AT = "24h"
expiredAt, _ := time.ParseDuration(EXPIRED_AT)
createdAt, _ := util.TimestampStringToTime(tokens.CreatedAt)
return createdAt.Add(expiredAt).Before(time.Now())
}
Loading

0 comments on commit 3b0a266

Please sign in to comment.