-
Notifications
You must be signed in to change notification settings - Fork 1
/
UserStream.cs
174 lines (147 loc) · 6.1 KB
/
UserStream.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Windows;
using System.ComponentModel;
using System.Threading;
using System.Windows.Forms;
using Hammock;
using Hammock.Web;
using Hammock.Streaming;
using Hammock.Authentication.OAuth;
using Twitter.Json;
using Twitter.API.Basic;
namespace Twitter.API.Streaming
{
public class UserStream
{
public enum ReceiveType
{
FriendsList = 0,
Tweet = 1,
Reply = 2,
DirectMessage = 3,
Delete = 4
}
public const string C_BASE_URL = "https://userstream.twitter.com";
public const string C_USER_STREAM_URL = "user.json";
public const string C_VERSION_PATH = "2";
public delegate void ReceiveHandler(object sender, JsonDocument jdData, ReceiveType rtRecvType);
public event ReceiveHandler Receive;
private OAuthCredentials m_oaCredentials;
private RestClient m_rcClient;
private IAsyncResult m_iaConnectionAsync = null;
private BasicAPI m_bscAPI;
public UserStream(OAuthCredentials oaCredentials)
{
m_oaCredentials = oaCredentials;
m_bscAPI = new BasicAPI(m_oaCredentials);
m_rcClient = new RestClient
{
Authority = C_BASE_URL,
VersionPath = C_VERSION_PATH,
Credentials = m_oaCredentials,
Method = WebMethod.Get,
StreamOptions = new StreamOptions
{
ResultsPerCallback = 1
}
};
}
public void Connect()
{
Thread thdConnect = new Thread(new ThreadStart(HomeTimeline));
thdConnect.Start();
}
private void HomeTimeline()
{
//first get initial timeline from the basic API
m_bscAPI.GetHomeTimeline(HomeTimelineCallback, null, 20, 1, -1, -1, false, true);
}
private void HomeTimelineCallback(APICallbackArgs acArgs)
{
UserTimeline utInitial = (UserTimeline)acArgs.ResponseObject;
for (int i = utInitial.Statuses.Count - 1; i >= 0; i--)
APIReturn.SynchronizeInvoke(Receive, new object[] { this, new JsonDocument(utInitial.Statuses[i].Object), ReceiveType.Tweet });
m_bscAPI.GetMentions(MentionsCallback, null);
}
private void MentionsCallback(APICallbackArgs acArgs)
{
UserTimeline utInitial = (UserTimeline)acArgs.ResponseObject;
for (int i = utInitial.Statuses.Count - 1; i >= 0; i--)
APIReturn.SynchronizeInvoke(Receive, new object[] { this, new JsonDocument(utInitial.Statuses[i].Object), ReceiveType.Reply });
EstablishStream();
}
private void EstablishStream()
{
m_oaCredentials.Verifier = null;
m_oaCredentials.Type = OAuthType.ProtectedResource;
//construct and open streaming request
RestRequest rrqRequest = new RestRequest
{
Path = C_USER_STREAM_URL,
};
m_rcClient.AddHeader("User-Agent", "Twitter/1.0");
//@TODO: uncomment these for production
m_iaConnectionAsync = m_rcClient.BeginRequest(rrqRequest, RequestCallback);
m_rcClient.CancelStreaming(); //don't know why this is necessary - maybe it isn't?
}
private void RequestCallback(RestRequest rrqRequest, RestResponse rrsResponse, object objUserState)
{
try
{
StreamReader srReader = new StreamReader(rrsResponse.ContentStream, Encoding.UTF8);
string sCurLine;
do
{
sCurLine = srReader.ReadLine().Trim();
} while ((sCurLine == "") && (!srReader.EndOfStream));
JsonDocument jdFinal = JsonParser.GetParser().ParseString(sCurLine);
if (jdFinal != null)
{
if (jdFinal.Root.IsNode())
{
if (jdFinal.Root.ToNode().ContainsKey("friends"))
{
//this is the friends list that's sent at the beginning of each userstream connection
APIReturn.SynchronizeInvoke(Receive, this, jdFinal, ReceiveType.FriendsList);
}
else if (jdFinal.Root.ToNode().ContainsKey("retweeted"))
{
Status stNewStatus = new Status(jdFinal.Root.ToNode());
if (stNewStatus.IsReply && stNewStatus.ReplyNames.Contains(m_oaCredentials.ClientUsername))
APIReturn.SynchronizeInvoke(Receive, this, jdFinal, ReceiveType.Reply);
else
APIReturn.SynchronizeInvoke(Receive, this, jdFinal, ReceiveType.Tweet);
}
else if (jdFinal.Root.ToNode().ContainsKey("recipient_id") && jdFinal.Root.ToNode().ContainsKey("sender_id"))
{
DirectMessage dmNewMessage = new DirectMessage(jdFinal.Root.ToNode());
APIReturn.SynchronizeInvoke(Receive, this, jdFinal, ReceiveType.DirectMessage);
}
//also need to add OnDelete for when a tweet gets deleted
}
}
}
catch(Exception e)
{
MessageBox.Show("An unknown Twitter API error has occurred (user streams). " + e.Message, "API Error", MessageBoxButtons.OK, MessageBoxIcon.Exclamation);
}
}
public void Disconnect()
{
if (m_iaConnectionAsync != null)
m_rcClient.EndRequest(m_iaConnectionAsync, new TimeSpan(0, 0, 6));
}
public bool Connected
{
get { return (m_iaConnectionAsync != null); }
}
~UserStream()
{
Disconnect();
}
}
}