-
-
Notifications
You must be signed in to change notification settings - Fork 427
/
OutboxSender.java
293 lines (264 loc) · 10.6 KB
/
OutboxSender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
package io.sentry;
import static io.sentry.SentryLevel.ERROR;
import static io.sentry.cache.EnvelopeCache.PREFIX_CURRENT_SESSION_FILE;
import io.sentry.cache.EnvelopeCache;
import io.sentry.hints.Flushable;
import io.sentry.hints.Resettable;
import io.sentry.hints.Retryable;
import io.sentry.hints.SubmissionResult;
import io.sentry.protocol.SentryId;
import io.sentry.protocol.SentryTransaction;
import io.sentry.util.CollectionUtils;
import io.sentry.util.HintUtils;
import io.sentry.util.LogUtils;
import io.sentry.util.Objects;
import io.sentry.util.SampleRateUtils;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ApiStatus.Internal
public final class OutboxSender extends DirectoryProcessor implements IEnvelopeSender {
@SuppressWarnings("CharsetObjectCanBeUsed")
private static final Charset UTF_8 = Charset.forName("UTF-8");
private final @NotNull IHub hub;
private final @NotNull IEnvelopeReader envelopeReader;
private final @NotNull ISerializer serializer;
private final @NotNull ILogger logger;
public OutboxSender(
final @NotNull IHub hub,
final @NotNull IEnvelopeReader envelopeReader,
final @NotNull ISerializer serializer,
final @NotNull ILogger logger,
final long flushTimeoutMillis) {
super(logger, flushTimeoutMillis);
this.hub = Objects.requireNonNull(hub, "Hub is required.");
this.envelopeReader = Objects.requireNonNull(envelopeReader, "Envelope reader is required.");
this.serializer = Objects.requireNonNull(serializer, "Serializer is required.");
this.logger = Objects.requireNonNull(logger, "Logger is required.");
}
@Override
protected void processFile(final @NotNull File file, @NotNull Hint hint) {
Objects.requireNonNull(file, "File is required.");
if (!isRelevantFileName(file.getName())) {
logger.log(SentryLevel.DEBUG, "File '%s' should be ignored.", file.getAbsolutePath());
return;
}
try (final InputStream stream = new BufferedInputStream(new FileInputStream(file))) {
final SentryEnvelope envelope = envelopeReader.read(stream);
if (envelope == null) {
logger.log(
SentryLevel.ERROR,
"Stream from path %s resulted in a null envelope.",
file.getAbsolutePath());
} else {
processEnvelope(envelope, hint);
logger.log(SentryLevel.DEBUG, "File '%s' is done.", file.getAbsolutePath());
}
} catch (IOException e) {
logger.log(SentryLevel.ERROR, "Error processing envelope.", e);
} finally {
HintUtils.runIfHasTypeLogIfNot(
hint,
Retryable.class,
logger,
(retryable) -> {
if (!retryable.isRetry()) {
try {
if (!file.delete()) {
logger.log(SentryLevel.ERROR, "Failed to delete: %s", file.getAbsolutePath());
}
} catch (RuntimeException e) {
logger.log(SentryLevel.ERROR, e, "Failed to delete: %s", file.getAbsolutePath());
}
}
});
}
}
@Override
protected boolean isRelevantFileName(final @Nullable String fileName) {
// ignore current.envelope
return fileName != null
&& !fileName.startsWith(PREFIX_CURRENT_SESSION_FILE)
&& !fileName.startsWith(EnvelopeCache.STARTUP_CRASH_MARKER_FILE);
// TODO: Use an extension to filter out relevant files
}
@Override
public void processEnvelopeFile(@NotNull String path, @NotNull Hint hint) {
Objects.requireNonNull(path, "Path is required.");
processFile(new File(path), hint);
}
private void processEnvelope(final @NotNull SentryEnvelope envelope, final @NotNull Hint hint)
throws IOException {
logger.log(
SentryLevel.DEBUG,
"Processing Envelope with %d item(s)",
CollectionUtils.size(envelope.getItems()));
int currentItem = 0;
for (final SentryEnvelopeItem item : envelope.getItems()) {
currentItem++;
if (item.getHeader() == null) {
logger.log(SentryLevel.ERROR, "Item %d has no header", currentItem);
continue;
}
if (SentryItemType.Event.equals(item.getHeader().getType())) {
try (final Reader eventReader =
new BufferedReader(
new InputStreamReader(new ByteArrayInputStream(item.getData()), UTF_8))) {
SentryEvent event = serializer.deserialize(eventReader, SentryEvent.class);
if (event == null) {
logEnvelopeItemNull(item, currentItem);
} else {
if (event.getSdk() != null) {
HintUtils.setIsFromHybridSdk(hint, event.getSdk().getName());
}
if (envelope.getHeader().getEventId() != null
&& !envelope.getHeader().getEventId().equals(event.getEventId())) {
logUnexpectedEventId(envelope, event.getEventId(), currentItem);
continue;
}
hub.captureEvent(event, hint);
logItemCaptured(currentItem);
if (!waitFlush(hint)) {
logTimeout(event.getEventId());
break;
}
}
} catch (Throwable e) {
logger.log(ERROR, "Item failed to process.", e);
}
} else if (SentryItemType.Transaction.equals(item.getHeader().getType())) {
try (final Reader reader =
new BufferedReader(
new InputStreamReader(new ByteArrayInputStream(item.getData()), UTF_8))) {
final SentryTransaction transaction =
serializer.deserialize(reader, SentryTransaction.class);
if (transaction == null) {
logEnvelopeItemNull(item, currentItem);
} else {
if (envelope.getHeader().getEventId() != null
&& !envelope.getHeader().getEventId().equals(transaction.getEventId())) {
logUnexpectedEventId(envelope, transaction.getEventId(), currentItem);
continue;
}
// if there is no trace context header we also won't send it to Sentry
final @Nullable TraceContext traceContext = envelope.getHeader().getTraceContext();
if (transaction.getContexts().getTrace() != null) {
// Hint: Set sampling decision in order for the transaction not to be dropped, as this
// is a transient property.
transaction
.getContexts()
.getTrace()
.setSamplingDecision(extractSamplingDecision(traceContext));
}
hub.captureTransaction(transaction, traceContext, hint);
logItemCaptured(currentItem);
if (!waitFlush(hint)) {
logTimeout(transaction.getEventId());
break;
}
}
} catch (Throwable e) {
logger.log(ERROR, "Item failed to process.", e);
}
} else {
// send unknown item types over the wire
final SentryEnvelope newEnvelope =
new SentryEnvelope(
envelope.getHeader().getEventId(), envelope.getHeader().getSdkVersion(), item);
hub.captureEnvelope(newEnvelope, hint);
logger.log(
SentryLevel.DEBUG,
"%s item %d is being captured.",
item.getHeader().getType().getItemType(),
currentItem);
if (!waitFlush(hint)) {
logger.log(
SentryLevel.WARNING,
"Timed out waiting for item type submission: %s",
item.getHeader().getType().getItemType());
break;
}
}
final Object sentrySdkHint = HintUtils.getSentrySdkHint(hint);
if (sentrySdkHint instanceof SubmissionResult) {
if (!((SubmissionResult) sentrySdkHint).isSuccess()) {
// Failed to send an item of the envelope: Stop attempting to send the rest (an attachment
// without the event that created it isn't useful)
logger.log(
SentryLevel.WARNING,
"Envelope had a failed capture at item %d. No more items will be sent.",
currentItem);
break;
}
}
// reset the Hint to its initial state as we use it multiple times.
HintUtils.runIfHasType(hint, Resettable.class, (resettable) -> resettable.reset());
}
}
private @NotNull TracesSamplingDecision extractSamplingDecision(
final @Nullable TraceContext traceContext) {
if (traceContext != null) {
final @Nullable String sampleRateString = traceContext.getSampleRate();
if (sampleRateString != null) {
try {
final Double sampleRate = Double.parseDouble(sampleRateString);
if (!SampleRateUtils.isValidTracesSampleRate(sampleRate, false)) {
logger.log(
SentryLevel.ERROR,
"Invalid sample rate parsed from TraceContext: %s",
sampleRateString);
} else {
return new TracesSamplingDecision(true, sampleRate);
}
} catch (Exception e) {
logger.log(
SentryLevel.ERROR,
"Unable to parse sample rate from TraceContext: %s",
sampleRateString);
}
}
}
return new TracesSamplingDecision(true);
}
private void logEnvelopeItemNull(final @NotNull SentryEnvelopeItem item, int itemIndex) {
logger.log(
SentryLevel.ERROR,
"Item %d of type %s returned null by the parser.",
itemIndex,
item.getHeader().getType());
}
private void logUnexpectedEventId(
final @NotNull SentryEnvelope envelope, final @Nullable SentryId eventId, int itemIndex) {
logger.log(
SentryLevel.ERROR,
"Item %d of has a different event id (%s) to the envelope header (%s)",
itemIndex,
envelope.getHeader().getEventId(),
eventId);
}
private void logItemCaptured(int itemIndex) {
logger.log(SentryLevel.DEBUG, "Item %d is being captured.", itemIndex);
}
private void logTimeout(final @Nullable SentryId eventId) {
logger.log(SentryLevel.WARNING, "Timed out waiting for event id submission: %s", eventId);
}
private boolean waitFlush(final @NotNull Hint hint) {
@Nullable Object sentrySdkHint = HintUtils.getSentrySdkHint(hint);
if (sentrySdkHint instanceof Flushable) {
return ((Flushable) sentrySdkHint).waitFlush();
} else {
LogUtils.logNotInstanceOf(Flushable.class, sentrySdkHint, logger);
}
return true;
}
}