Skip to content

Commit

Permalink
add a sample of FLUX code
Browse files Browse the repository at this point in the history
These samples were taken from the paper "Flux: A Language for
Programming High-Performance Servers", by Burns et al and Flux V0.02
which can be found here:

https://plasma.cs.umass.edu/emery/flux.1.html
  • Loading branch information
kusma committed Feb 18, 2016
1 parent 8de50ed commit 41713d7
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 0 deletions.
54 changes: 54 additions & 0 deletions samples/FLUX/gameserver.fx
@@ -0,0 +1,54 @@
typedef engine isEngineMessage;
typedef turn isTurnMessage;
typedef connect isConnectMessage;
typedef disconnect isDisconnectMessage;

ClientMessage(char* data) => ();
ParseMessage(char* data) => (int type, int client, char* data);
ReadMessage(int type, int client, char* data) => ();

ParseEngine(int type, int client, char* data) => (int client, int direction);
DoEngine(int client, int direction) => ();

ParseTurn(int type, int client, char* data) => (int client, int direction);
DoTurn(int client, int direction) => ();

ParseConnect(int type, int client, char* data)
=> (int client, char* host, int port);
DoConnect(int client, char* host, int port) => ();

ParseDisconnect(int type, int client, char* data) => (int client);
DoDisconnect(int client) => ();

UpdateBoard(ClientList clients) => (ClientList clients);
SendData(ClientList clients) => ();

DoUpdate(ClientList clients) => ();

DataTimer() => (ClientList clients);

GetClients() => (ClientList clients);

Wait(ClientList clients) => (ClientList clients);

Listen () => (char* data);

source Listen => ClientMessage;
source DataTimer => DoUpdate;

DataTimer = GetClients -> Wait;

DoUpdate = UpdateBoard -> SendData;

ClientMessage=ParseMessage -> ReadMessage;

ReadMessage:[engine, _, _] = ParseEngine -> DoEngine;
ReadMessage:[turn, _, _] = ParseTurn -> DoTurn;
ReadMessage:[connect, _, _] = ParseConnect -> DoConnect;
ReadMessage:[disconnect, _, _] = ParseDisconnect -> DoDisconnect;

atomic GetClients:{client_lock};
atomic DoConnect:{client_lock};
atomic DoDisconnect:{client_lock};


44 changes: 44 additions & 0 deletions samples/FLUX/imageserver.fx
@@ -0,0 +1,44 @@
typedef xml TestXML;
typedef html TestHTML;

typedef inCache TestInCache;

Page (int socket) => ();

ReadRequest (int socket) => (int socket, bool close, image_tag *request);

CheckCache (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);

Handler (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);

Complete (int socket, bool close, image_tag *request) => ();

ReadInFromDisk (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request, __u8 *rgb_data);

Write (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);

Compress(int socket, bool close, image_tag *request, __u8 *rgb_data)
=> (int socket, bool close, image_tag *request);

StoreInCache(int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);

Listen ()
=> (int socket);

source Listen => Page;

Handler:[_, _, inCache]=;
Handler:[_, _, _]=ReadInFromDisk -> Compress -> StoreInCache;

Page = ReadRequest -> CheckCache-> Handler -> Write -> Complete;

atomic CheckCache:{cache};
atomic StoreInCache:{cache};
atomic Complete:{cache};

handle error ReadInFromDisk => FourOhFor;
151 changes: 151 additions & 0 deletions samples/FLUX/mbittorrent.fx
@@ -0,0 +1,151 @@
typedef choke TestChoke;
typedef unchoke TestUnchoke;
typedef interested TestInterested;
typedef uninterested TestUninterested;
typedef request TestRequest;
typedef cancel TestCancel;
typedef piece TestPiece;
typedef bitfield TestBitfield;
typedef have TestHave;
typedef piececomplete TestPieceComplete;

CheckinWithTracker (torrent_data_t *tdata)
=> ();

SendRequestToTracker (torrent_data_t *tdata)
=> (torrent_data_t *tdata, int socket);

GetTrackerResponse (torrent_data_t *tdata, int socket)
=> ();

UpdateChokeList (torrent_data_t *tdata)
=> ();

PickChoked (torrent_data_t *tdata)
=> (torrent_data_t *tdata, chokelist_t clist);

SendChokeUnchoke (torrent_data_t *tdata, chokelist_t clist)
=> ();

SetupConnection (torrent_data_t *tdata, int socket)
=> ();

Handshake (torrent_data_t *tdata, int socket)
=> (torrent_data_t *tdata, client_data_t *client);

SendBitfield (torrent_data_t *tdata, client_data_t *client)
=> ();

Message (torrent_data_t *tdata, client_data_t *client)
=> ();

ReadMessage (torrent_data_t *tdata, client_data_t *client)
=> (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload);

HandleMessage (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);

MessageDone (client_data_t *client)
=> ();

CompletePiece (torrent_data_t *tdata, client_data_t *client, int piece)
=> (torrent_data_t *tdata, client_data_t *client);

VerifyPiece (torrent_data_t *tdata, client_data_t *client, int piece)
=> (torrent_data_t *tdata, client_data_t *client, int piece);

SendHave (torrent_data_t *tdata, client_data_t *client, int piece)
=> (torrent_data_t *tdata, client_data_t *client);

SendUninterested (torrent_data_t *tdata, client_data_t *client)
=> (torrent_data_t *tdata, client_data_t *client);

Choke (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);

Cancel (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);

Interested (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);

Uninterested (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);

Bitfield (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);

Unchoke (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (torrent_data_t *tdata, client_data_t *client);

SendRequest (torrent_data_t *tdata, client_data_t *client)
=> (client_data_t *client);

Have (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (torrent_data_t *tdata, client_data_t *client);

Piece (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (torrent_data_t *tdata, client_data_t *client, int piece);

Request (torrent_data_t *tdata, client_data_t *client, int type, int length, char *payload)
=> (client_data_t *client);

SendKeepAlives (torrent_data_t *tdata)
=> ();

GetClients ()
=> (int maxfd, fd_set *fds);

SelectSockets (int maxfd, fd_set *fds)
=> (fd_set *fds);

CheckSockets (fd_set *fds)
=> (torrent_data_t *tdata, client_data_t *client);

TrackerTimer ()
=> (torrent_data_t *tdata);

ChokeTimer ()
=> (torrent_data_t *tdata);

Connect ()
=> (torrent_data_t *tdata, int socket);

KeepAliveTimer ()
=> (torrent_data_t *tdata);

Listen ()
=> (torrent_data_t *tdata, client_data_t *client);

source TrackerTimer => CheckinWithTracker;
source ChokeTimer => UpdateChokeList;
source Connect => SetupConnection;
source Listen => Message;
source KeepAliveTimer => SendKeepAlives;

Listen = GetClients -> SelectSockets -> CheckSockets;
CheckinWithTracker = SendRequestToTracker -> GetTrackerResponse;
UpdateChokeList = PickChoked -> SendChokeUnchoke;
SetupConnection = Handshake -> SendBitfield;
Message = ReadMessage -> HandleMessage -> MessageDone;

CompletePiece:[_, _, piececomplete] = VerifyPiece -> SendHave -> SendUninterested;

HandleMessage:[_, _, choke, _, _] = Choke;
HandleMessage:[_, _, unchoke, _, _] = Unchoke -> SendRequest;
HandleMessage:[_, _, interested, _, _] = Interested;

HandleMessage:[_, _, uninterested, _, _] = Uninterested;
HandleMessage:[_, _, request, _, _] = Request;
HandleMessage:[_, _, cancel, _, _] = Cancel;
HandleMessage:[_, _, piece, _, _] = Piece -> CompletePiece -> SendRequest;
HandleMessage:[_, _, bitfield, _, _] = Bitfield;
HandleMessage:[_, _, have, _, _] = Have -> SendRequest;

atomic GetClients:{BigLock};
atomic CheckSockets:{BigLock};
atomic Message:{BigLock};
atomic CheckinWithTracker:{BigLock};
atomic UpdateChokeList:{BigLock};
atomic SetupConnection:{BigLock};
atomic SendKeepAlives:{BigLock};
38 changes: 38 additions & 0 deletions samples/FLUX/test.fx
@@ -0,0 +1,38 @@
// concrete node signatures
Listen ()
=> (int socket);

ReadRequest (int socket)
=> (int socket, bool close, image_tag *request);

CheckCache (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);

// omitted for space:
// ReadInFromDisk, StoreInCache
Compress (int socket, bool close, image_tag *request, __u8 *rgb_data)
=> (int socket, bool close, image_tag *request);
Write (int socket, bool close, image_tag *request)
=> (int socket, bool close, image_tag *request);
Complete (int socket, bool close, image_tag *request) => ();

// source node
source Listen => Image;

// abstract node
Image = ReadRequest -> CheckCache -> Handler -> Write -> Complete;

// predicate type & dispatch
typedef hit TestInCache;
Handler:[_, _, hit] = ;
Handler:[_, _, _] =
ReadInFromDisk -> Compress -> StoreInCache;

// error handler
handle error ReadInFromDisk => FourOhFor;

// atomicity constraints
atomic CheckCache:{cache};
atomic StoreInCache:{cache};
atomic Complete:{cache};

0 comments on commit 41713d7

Please sign in to comment.