This repository has been archived by the owner on May 11, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
131 lines (97 loc) · 3.85 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import Stream from 'user-stream';
import Rx from 'rx';
import {readConfig} from './conf';
const fromEvent = Rx.Observable.fromEvent;
const config = readConfig();
import {Intelligence} from './intelligence';
import {Memory} from './memory';
import {TagFollowers} from './follow';
const memory = new Memory(config.redisUri, 'metagu');
const tagFollowers = new TagFollowers(memory);
const intelligence = new Intelligence(tagFollowers);
const stream = new Stream({
consumer_key: config.twitterConsumerKey,
consumer_secret: config.twitterConsumerSecret,
access_token_key: config.twitterAccessTokenKey,
access_token_secret: config.twitterAccessTokenSecret
});
stream.stream({with: 'user'});
const twitterHandle = config.twitterHandle;
const mentionPrefix = new RegExp(`^\.?@${twitterHandle} +`, 'i');
const data = fromEvent(stream, 'data');
const error = fromEvent(stream, 'error').flatMap(Rx.Observable.throw);
const end = fromEvent(stream, 'end');
const obs = data.merge(error).takeUntil(end);
const tweets = obs.filter(tweet => !! tweet.id_str);
const mentions = tweets.filter(tweet => tweet.text.match(mentionPrefix));
// TODO: retry after delay on 503 { type: 'response', data: { code: 503 } }
mentions.
subscribe(tweet => {
const author = tweet.user.screen_name;
const tweetContent = tweet.text.replace(mentionPrefix, '');
const tweetId = tweet.id_str;
console.log(`Received from @${author}: ${tweetContent}`);
intelligence.respond(tweetContent, author).
flatMap(reply => {
console.log(`Respond to @${author}: ${reply}`);
return post(author, reply, tweetId);
}).
// TODO: flatmap the whole stream
subscribe();
}, error => {
console.error("ERROR", error);
});
import twitterClient from 'twitter-node-client';
const {Twitter} = twitterClient;
const twitter = new Twitter({
consumerKey: config.twitterConsumerKey,
consumerSecret: config.twitterConsumerSecret,
accessToken: config.twitterAccessTokenKey,
accessTokenSecret: config.twitterAccessTokenSecret
});
function post(recipient, message, replyTweetId) {
return Rx.Observable.create(observer => {
const tweet = {
status: `@${recipient} ${message}`,
in_reply_to_status_id: replyTweetId
};
twitter.postTweet(
tweet,
error => { observer.onError(error); observer.onCompleted(); },
resp => { observer.onNext(resp); observer.onCompleted(); }
);
});
}
// TODO: Use CAPI stream
// const capiEvents$ = ...
// const publishingEvents$ = capiEvents$.filter(event => event.xxxx)
// const publishedContent$ = publishingEvents$.map(event => event.content)
import {getPublishedContent$} from './capi-feed';
const publishedContent$ = getPublishedContent$();
const publishedContentAndFollower$ = publishedContent$.flatMap(content => {
console.log(`Content published: ${content.webTitle} - ${content.webUrl}`);
const tags = content.tags;
return Rx.Observable.from(tags).
flatMap(tag => tagFollowers.get$(tag)).
distinct().
map(nickname => ({content, nickname}));
});
publishedContentAndFollower$.subscribe(({content, nickname}) => {
// TODO: give context, 'new XY'?
const message = `${content.webTitle} ${content.webUrl}`;
// TODO: flatmap the whole stream
post(nickname, message).
subscribe();
}, error => {
console.error("FOLLOW ERROR", error);
});
// Dummy HTTP server to keep alive on heroku
import http from 'http';
const PORT = process.env.PORT || 5000;
var server = http.createServer(function(request, response) {
response.writeHead(200, {"Content-Type": "text/html"});
response.write('hello there');
response.end();
});
server.listen(PORT);
console.log(`HTTP server listening on ${PORT}`);