Skip to content

Commit

Permalink
Revert "Revert "Merge pull request lf-lang#99 from lf-lang/serializat…
Browse files Browse the repository at this point in the history
…ion""

This reverts commit e055dca.
  • Loading branch information
goekberk committed May 11, 2022
1 parent c22e23e commit aa1a6ee
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 38 deletions.
2 changes: 1 addition & 1 deletion lingua-franca-ref.txt
@@ -1 +1 @@
774ca3c3bfcfadd490cf878a2b389b6fec7ff1cb
dd9669f7b3ee0d3f1ae906d14c29fe7a71bace78
15 changes: 7 additions & 8 deletions src/core/action.ts
Expand Up @@ -78,7 +78,6 @@ export abstract class SchedulableAction<T extends Present> implements Sched<T> {
tag = tag.getMicroStepLater();
}
}

if (this.action instanceof FederatePortAction) {
if (intendedTag === undefined) {
throw new Error("FederatedPortAction must have an intended tag from RTI.");
Expand All @@ -95,10 +94,10 @@ export abstract class SchedulableAction<T extends Present> implements Sched<T> {
Log.debug(this, () => "Using intended tag from RTI, similar to schedule_at_tag(tag) with an intended tag: " +
intendedTag);
tag = intendedTag;
} else if (this.action.origin == Origin.logical && !(this.action instanceof Startup)) {
} else {
tag = tag.getMicroStepLater();
}

}
Log.debug(this, () => "Scheduling " + this.action.origin +
" action " + this.action._getFullyQualifiedName() + " with tag: " + tag);

Expand Down Expand Up @@ -140,8 +139,8 @@ export class Shutdown extends Action<Present> {
}
}

export class FederatePortAction extends Action<Buffer> {
constructor(__parent__: Reactor) {
super(__parent__, Origin.logical)
export class FederatePortAction<T extends Present> extends Action<T> {
constructor(__parent__: Reactor, origin: Origin) {
super(__parent__, origin)
}
}
}
55 changes: 34 additions & 21 deletions src/core/federation.ts
Expand Up @@ -3,7 +3,7 @@ import {Socket, createConnection, SocketConnectOpts} from 'net'
import {EventEmitter} from 'events';
import {
Log, Tag, TimeValue, Origin, getCurrentPhysicalTime, Alarm,
Present, App, Action, FederatePortAction, TaggedEvent
Present, App, Action, TaggedEvent
} from './internal';

//---------------------------------------------------------------------//
Expand Down Expand Up @@ -241,15 +241,20 @@ class RTIClient extends EventEmitter {

// The mapping between a federate port ID and the federate port action
// scheduled upon reception of a message designated for that federate port.
private federatePortActionByID: Map<number, FederatePortAction> = new Map<number, FederatePortAction>();

/**
* A mapping from port IDs to FederatePortAction instances. Unfortunately, the data type of the action has to be `any`,
* meaning that the type checker cannot check whether uses of the action are type safe.
* In an alternative design, type information might be preserved. TODO(marten): Look into this.
*/
private federatePortActionByID: Map<number, Action<any>> = new Map<number, Action<any>>();

/**
* Establish the mapping between a federate port's action and its ID.
* @param federatePortID The federate port's ID.
* @param federatePort The federate port's action.
*/
public registerFederatePortAction<T extends Present>(federatePortID: number, federatePortAction: Action<Buffer>) {
Object.setPrototypeOf(federatePortAction, FederatePortAction.prototype);
public registerFederatePortAction<T extends Present>(federatePortID: number, federatePortAction: Action<T>) {
this.federatePortActionByID.set(federatePortID, federatePortAction);
}

Expand Down Expand Up @@ -420,13 +425,15 @@ class RTIClient extends EventEmitter {
* @param destPortID The port ID for the port on the destination
* federate to which this message should be sent.
*/
public sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number) {
let msg = Buffer.alloc(data.length + 9);
public sendRTIMessage<T extends Present>(data: T, destFederateID: number, destPortID: number) {
const value = Buffer.from(JSON.stringify(data), "utf-8");

let msg = Buffer.alloc(value.length + 9);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_MESSAGE, 0);
msg.writeUInt16LE(destPortID, 1);
msg.writeUInt16LE(destFederateID, 3);
msg.writeUInt32LE(data.length, 5);
data.copy(msg, 9); // Copy data into the message
msg.writeUInt32LE(value.length, 5);
value.copy(msg, 9); // Copy data into the message
try {
Log.debug(this, () => {return `Sending RTI (untimed) message to `
+ `federate ID: ${destFederateID} and port ID: ${destPortID}.`});
Expand All @@ -447,15 +454,17 @@ class RTIClient extends EventEmitter {
* @param time The time of the message encoded as a 64 bit little endian
* unsigned integer in a Buffer.
*/
public sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number, time: Buffer) {
let msg = Buffer.alloc(data.length + 21);
public sendRTITimedMessage<T extends Present>(data: T, destFederateID: number, destPortID: number, time: Buffer) {
const value = Buffer.from(JSON.stringify(data), "utf-8");

let msg = Buffer.alloc(value.length + 21);
msg.writeUInt8(RTIMessageTypes.MSG_TYPE_TAGGED_MESSAGE, 0);
msg.writeUInt16LE(destPortID, 1);
msg.writeUInt16LE(destFederateID, 3);
msg.writeUInt32LE(data.length, 5);
msg.writeUInt32LE(value.length, 5);
time.copy(msg, 9); // Copy the current time into the message
// FIXME: Add microstep properly.
data.copy(msg, 21); // Copy data into the message
value.copy(msg, 21); // Copy data into the message
try {
Log.debug(this, () => {return `Sending RTI (timed) message to `
+ `federate ID: ${destFederateID}, port ID: ${destPortID} `
Expand Down Expand Up @@ -897,7 +906,7 @@ export class FederatedApp extends App {
* unique among all port IDs on this federate and be a number between 0 and NUMBER_OF_PORTS - 1
* @param federatePort The federate port's action for registration.
*/
public registerFederatePortAction(federatePortID: number, federatePortAction: Action<Buffer>) {
public registerFederatePortAction<T extends Present>(federatePortID: number, federatePortAction: Action<T>) {
if (federatePortAction.origin === Origin.logical) {
this.rtiSynchronized = true;
}
Expand All @@ -911,7 +920,7 @@ export class FederatedApp extends App {
* @param destFederateID The ID of the federate intended to receive the message.
* @param destPortID The ID of the federate's port intended to receive the message.
*/
public sendRTIMessage(msg: Buffer, destFederateID: number, destPortID: number ) {
public sendRTIMessage<T extends Present>(msg: T, destFederateID: number, destPortID: number ) {
Log.debug(this, () => {return `Sending RTI message to federate ID: ${destFederateID}`
+ ` port ID: ${destPortID}`});
this.rtiClient.sendRTIMessage(msg, destFederateID, destPortID);
Expand All @@ -925,7 +934,7 @@ export class FederatedApp extends App {
* @param destFederateID The ID of the Federate intended to receive the message.
* @param destPortID The ID of the FederateInPort intended to receive the message.
*/
public sendRTITimedMessage(msg: Buffer, destFederateID: number, destPortID: number ) {
public sendRTITimedMessage<T extends Present>(msg: T, destFederateID: number, destPortID: number ) {
let time = this.util.getCurrentTag().toBinary();
Log.debug(this, () => {return `Sending RTI timed message to federate ID: ${destFederateID}`
+ ` port ID: ${destPortID} and time: ${time.toString('hex')}`});
Expand Down Expand Up @@ -1015,14 +1024,16 @@ export class FederatedApp extends App {
}
});

this.rtiClient.on('message', (destPortAction: FederatePortAction, messageBuffer: Buffer) => {
this.rtiClient.on('message', <T extends Present>(destPortAction: Action<T>, messageBuffer: Buffer) => {
// Schedule this federate port's action.
// This message is untimed, so schedule it immediately.
Log.debug(this, () => {return `(Untimed) Message received from RTI.`})
destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, messageBuffer);
const value: T = JSON.parse(messageBuffer.toString());

destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, value);
});

this.rtiClient.on('timedMessage', (destPortAction: FederatePortAction, messageBuffer: Buffer,
this.rtiClient.on('timedMessage', <T extends Present>(destPortAction: Action<T>, messageBuffer: Buffer,
tag: Tag) => {
// Schedule this federate port's action.

Expand All @@ -1047,13 +1058,15 @@ export class FederatedApp extends App {
// FIXME: implement decentralized control.

Log.debug(this, () => {return `Timed Message received from RTI with tag ${tag}.`})
const value: T = JSON.parse(messageBuffer.toString());

if (destPortAction.origin == Origin.logical) {
destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, messageBuffer, tag);
destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, value, tag);

} else {
// The schedule function for physical actions implements
// Tr = max(r, R + A)
destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, messageBuffer);
destPortAction.asSchedulable(this._getKey(destPortAction)).schedule(0, value);
}
});

Expand Down Expand Up @@ -1092,4 +1105,4 @@ export class FederatedApp extends App {
*/
export class RemoteFederatePort {
constructor(public federateID: number, public portID: number) {}
}
}
16 changes: 8 additions & 8 deletions src/core/reactor.ts
Expand Up @@ -1555,8 +1555,8 @@ interface UtilityFunctions {
getCurrentPhysicalTime(): TimeValue;
getElapsedLogicalTime(): TimeValue;
getElapsedPhysicalTime(): TimeValue;
sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number): void;
sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number): void;
sendRTIMessage<T extends Present>(data: T, destFederateID: number, destPortID: number): void;
sendRTITimedMessage<T extends Present>(data: T, destFederateID: number, destPortID: number): void;
}

export interface MutationSandbox extends ReactionSandbox {
Expand Down Expand Up @@ -1663,11 +1663,11 @@ export class App extends Reactor {
return getCurrentPhysicalTime().subtract(this.app._startOfExecution);
}

public sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number) {
public sendRTIMessage<T extends Present>(data: T, destFederateID: number, destPortID: number) {
return this.app.sendRTIMessage(data, destFederateID, destPortID);
};

public sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number) {
public sendRTITimedMessage<T extends Present>(data: T, destFederateID: number, destPortID: number) {
return this.app.sendRTITimedMessage(data, destFederateID, destPortID);
};

Expand Down Expand Up @@ -1780,22 +1780,22 @@ export class App extends Reactor {
/**
* Send an (untimed) message to the designated federate port through the RTI.
* This function throws an error if it isn't called on a FederatedApp.
* @param data A Buffer containing the body of the message.
* @param data The data that contain the body of the message.
* @param destFederateID The federate ID that is the destination of this message.
* @param destPortID The port ID that is the destination of this message.
*/
protected sendRTIMessage(data: Buffer, destFederateID: number, destPortID: number) {
protected sendRTIMessage<T extends Present>(data: T, destFederateID: number, destPortID: number) {
throw new Error("Cannot call sendRTIMessage from an App. sendRTIMessage may be called only from a FederatedApp");
}

/**
* Send a (timed) message to the designated federate port through the RTI.
* This function throws an error if it isn't called on a FederatedApp.
* @param data A Buffer containing the body of the message.
* @param data The data that contain the body of the message.
* @param destFederateID The federate ID that is the destination of this message.
* @param destPortID The port ID that is the destination of this message.
*/
protected sendRTITimedMessage(data: Buffer, destFederateID: number, destPortID: number) {
protected sendRTITimedMessage<T extends Present>(data: T, destFederateID: number, destPortID: number) {
throw new Error("Cannot call sendRTIMessage from an App. sendRTIMessage may be called only from a FederatedApp");
}

Expand Down

0 comments on commit aa1a6ee

Please sign in to comment.