forked from eclipse-uprotocol/up-transport-android-rust
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutransport.rs
More file actions
415 lines (366 loc) · 14.8 KB
/
utransport.rs
File metadata and controls
415 lines (366 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
use crate::{UPClientAndroid, UPCLIENTANDROID_TAG};
use async_std::task;
use async_trait::async_trait;
use jni::objects::{JByteArray, JClass, JObject, JValue, JValueOwned};
use jni::sys::{jbyteArray, jlong, jstring};
use jni::JNIEnv;
use lazy_static::lazy_static;
use log::{error, trace};
use protobuf::{EnumOrUnknown, Message};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Mutex};
use up_rust::{ComparableListener, UCode, UListener, UMessage, UStatus, UTransport, UUri};
const UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG: &str = "register_listener:";
const CLASS_UURI: &str = "Lorg/eclipse/uprotocol/v1/UUri;";
const CLASS_ULISTENER: &str = "Lorg/eclipse/uprotocol/transport/UListener;";
const CLASS_USTATUS: &str = "Lorg/eclipse/uprotocol/v1/UStatus;";
fn register_listener_signature() -> String {
format!("({}{}){}", CLASS_UURI, CLASS_ULISTENER, CLASS_USTATUS)
}
fn deserialize_uuri_signature() -> String {
format!("([B){}", CLASS_UURI)
}
lazy_static! {
static ref LISTENERS: Mutex<HashSet<(UUri, ComparableListener)>> = Mutex::new(HashSet::new());
static ref HASH_TO_LISTENER: Mutex<HashMap<u64, Arc<dyn UListener>>> =
Mutex::new(HashMap::new());
}
fn store_listener(uuri: UUri, listener: Arc<dyn UListener>) -> u64 {
let mut listeners = LISTENERS.lock().unwrap();
let mut hash_to_listener = HASH_TO_LISTENER.lock().unwrap();
let hash_tuple = (uuri.clone(), ComparableListener::new(listener.clone()));
let mut hasher = DefaultHasher::new();
hash_tuple.hash(&mut hasher);
let hash_value = hasher.finish();
// Check if the hash is already in the map to handle collision or reinsertion scenarios.
if !hash_to_listener.contains_key(&hash_value) {
listeners.insert(hash_tuple);
hash_to_listener.insert(hash_value, listener.clone());
}
hash_value
}
fn get_listener(hash: u64) -> Option<Arc<dyn UListener>> {
let hash_to_listener = HASH_TO_LISTENER.lock().unwrap();
hash_to_listener.get(&hash).cloned()
}
#[no_mangle]
pub extern "system" fn Java_org_eclipse_uprotocol_streamer_service_UListenerNativeBridge_onReceiveNative<
'local,
>(
mut env: JNIEnv<'local>,
// This is the class that owns our static method. It's not going to be used,
// but still must be present to match the expected signature of a static
// native method.
class: JClass<'local>,
listener_id: u64,
message_bytes: JByteArray,
// what tyep should the Java UMessage message be here in Rust?
) {
let message_bytes_vec = env.convert_byte_array(message_bytes).unwrap();
let message = UMessage::parse_from_bytes(&message_bytes_vec);
let Ok(message) = message else {
error!("Unable to convert to UMessage!");
return;
};
let hash_to_listener = HASH_TO_LISTENER.lock().unwrap();
let listener = hash_to_listener.get(&listener_id);
if let Some(listener) = listener {
// TODO: Consider if we want to block here or send over a channel to an existing task or
// something a little more performant than this
task::block_on(listener.on_receive(message));
}
}
#[async_trait]
impl UTransport for UPClientAndroid {
async fn send(&self, message: UMessage) -> Result<(), UStatus> {
todo!()
}
async fn receive(&self, topic: UUri) -> Result<UMessage, UStatus> {
unimplemented!("UPClientAndroid listens, no need to call receive()")
}
async fn register_listener(
&self,
topic: UUri,
listener: Arc<dyn UListener>,
) -> Result<(), UStatus> {
trace!(
"{}:{} Entered register_listener",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
// Get JNIEnv for the current thread
let mut env = self
.vm
.attach_current_thread()
.expect("Failed to attach current thread");
trace!(
"{}:{} Got JNIEnv",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
// TODO: Add logic for making sure we only add listener once
let hash = store_listener(topic.clone(), listener);
let Ok(listener_class) =
env.find_class("org/eclipse/uprotocol/streamer/service/UListenerNativeBridge")
else {
error!("Failed to find UListenerNativeBridge class");
env.exception_describe().unwrap();
env.exception_clear().unwrap();
return Err(UStatus::fail_with_code(UCode::INTERNAL, "Class not found"));
};
trace!(
"{}:{} Got UListenerNativeBridge class",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
// Check if an exception occurred
if env.exception_check().unwrap() {
trace!(
"{}:{} Hit exception constructing trying to find UListenerNativeBridge class",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
// Optionally log or describe the exception
env.exception_describe().unwrap(); // This will print the exception details to the console
env.exception_clear().unwrap(); // Clears the exception so that JNI calls can continue
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Exception was thrown",
)); // Replace UStatus::Error with appropriate error handling
}
trace!(
"{}:{} Got UListenerNativeBridge class",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
let Ok(listener_obj) =
env.new_object(listener_class, "(J)V", &[JValue::Long(hash as jlong)])
else {
error!("Failed to create a new instance of UListenerBridge class");
env.exception_describe().unwrap();
env.exception_clear().unwrap();
return Err(UStatus::fail_with_code(UCode::INTERNAL, "Class not found"));
};
trace!(
"{}:{} Constructed UListenerNativeBridge object",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
// Check if an exception occurred
if env.exception_check().unwrap() {
trace!(
"{}:{} Hit exception constructing UListenerNativeBridge object",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
// Optionally log or describe the exception
env.exception_describe().unwrap(); // This will print the exception details to the console
env.exception_clear().unwrap(); // Clears the exception so that JNI calls can continue
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Exception was thrown",
)); // Replace UStatus::Error with appropriate error handling
}
let up_client_ref = self.up_client.as_obj(); // Convert GlobalRef to JObject
trace!(
"{}:{} Converted GlobalRef to JObject",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
let Ok(uuri_bytes) = topic.write_to_bytes() else {
error!("Failed to serialize UUri to bytes");
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Failed to obtain UUri bytes",
)); // Replace UStatus::Error with appropriate error handling
};
trace!(
"{}:{} Turned UUri into byte vec",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
let byte_array = env
.byte_array_from_slice(&uuri_bytes)
.expect("Couldn't create jbyteArray from Rust Vec<u8>");
trace!(
"{}:{} Turned byte vec into JByteArray",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
let native_bridge_class = Arc::new(
env.find_class("org/eclipse/uprotocol/streamer/service/NativeBridge")
.expect("Couldn't find the Helper class"),
);
trace!(
"{}:{} Found NativeBridge class",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
let jvalue_byte_array = JValue::Object(&*byte_array);
let Ok(uuri_obj) = env.call_static_method(
&*native_bridge_class,
"deserializeToUUri",
format!("([B){CLASS_UURI}"),
&[jvalue_byte_array],
) else {
trace!(
"{}:{} Failed when calling deserializeToUUri",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
env.exception_describe().unwrap();
env.exception_clear().unwrap();
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Failed when calling deserializeToUUri",
)); // Replace UStatus::Error with appropriate error handling
};
let Ok(uuri_obj) = uuri_obj.l() else {
trace!(
"{}:{} Failed when converting uuri_obj to a JObject",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
env.exception_describe().unwrap();
env.exception_clear().unwrap();
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Failed when converting uuri_obj to a JObject",
)); // Replace UStatus::Error with appropriate error handling
};
trace!(
"{}:{} Turned serialized JByteArray into UUri JObject",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
// Check if an exception occurred
if env.exception_check().unwrap() {
trace!(
"{}:{} Exception occured while turning JByteArray into UUri JObject",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
// Optionally log or describe the exception
env.exception_describe().unwrap(); // This will print the exception details to the console
env.exception_clear().unwrap(); // Clears the exception so that JNI calls can continue
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Exception was thrown",
)); // Replace UStatus::Error with appropriate error handling
}
let args = [JValue::Object(&uuri_obj), JValue::Object(&listener_obj)];
trace!(
"{}:{} Formed arguments to the Java UPClient's registerListener",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
let Ok(ustatus) = env.call_method(
up_client_ref,
"registerListener",
register_listener_signature(),
&args,
) else {
trace!(
"{}:{} Unable to call UPClient.registerListener()",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
env.exception_describe().unwrap();
env.exception_clear().unwrap();
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Failed when calling UPClient.registerListener()",
)); // Replace UStatus::Error with appropriate error handling
};
trace!(
"{}:{} Called registerListener on the Java UPClient",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
trace!(
"{}:{} returned UStatus as a JValueOwned: {ustatus:?}",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
let Ok(ustatus_bytes) = env.call_static_method(
&*native_bridge_class,
"serializeFromUStatus",
format!("({CLASS_USTATUS})[B"),
&[(&ustatus).into()],
) else {
trace!(
"{}:{} Failed when calling serializeFromUStatus",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
env.exception_describe().unwrap();
env.exception_clear().unwrap();
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Failed when calling serializeFromUStatus",
)); // Replace UStatus::Error with appropriate error handling
};
trace!(
"{}:{} Called serializeFromUStatus",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
let foo = match ustatus_bytes {
JValueOwned::Object(obj) => {
let jbytearray = JByteArray::from(obj);
Ok(env.convert_byte_array(jbytearray).expect("oop"))
}
_ => Err("Oops"),
};
// let foo = env.convert_byte_array(*ustatus_bytes).expect("oop");
let Ok(ustatus_bytes) = foo else {
trace!(
"{}:{} Unable to convert from JValueOwned to Vec<u8>",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
env.exception_describe().unwrap();
env.exception_clear().unwrap();
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Unable to convert from JValueOwned to Vec<u8>",
)); // Replace UStatus::Error with appropriate error handling
};
let ustatus = UStatus::parse_from_bytes(&*ustatus_bytes);
let Ok(ustatus) = ustatus else {
trace!(
"{}:{} Unable to from ustatus_bytes to a Rust UStatus",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
return Err(UStatus::fail_with_code(
UCode::INTERNAL,
"Unable to from ustatus_bytes to a Rust UStatus",
)); // Replace UStatus::Error with appropriate error handling
};
trace!(
"{}:{} Rust UStatus: {ustatus:?}",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
trace!(
"{}:{} Reached bottom of function",
UPCLIENTANDROID_TAG,
UPANDROIDCLIENT_FN_REGISTER_LISTENER_TAG
);
let code = ustatus.code.enum_value_or(UCode::UNKNOWN);
match code {
UCode::OK => Ok(()),
_ => Err(ustatus),
}
}
async fn unregister_listener(
&self,
topic: UUri,
listener: Arc<dyn UListener>,
) -> Result<(), UStatus> {
todo!()
}
}