# Firehose

Basic implementation via websocket

Here's @skyware/jetstream version

In [None]:
//| export

import { Jetstream } from "@skyware/jetstream";
import { ids } from "tinychat/api/lexicons.ts";
import { z } from "zod";

const atURI = (did: string, collection: string, rkey: string) =>
  `at://${did}/${collection}/${rkey}`;

const baseCommitSchema = z.object({
  rev: z.string(),
  operation: z.string(),
  collection: z.string(),
  rkey: z.string(),
  cid: z.string(),
});

// Change to object schema that can be extended
const makeBaseSchema = <T extends z.ZodTypeAny>(recordSchema: T) =>
  z
    .object({
      did: z.string(),
      commit: baseCommitSchema.extend({
        record: recordSchema,
      }),
    })
    .transform((d) => {
      if (!d.commit) {
        throw new Error(`Invalid record: ${JSON.stringify(d, null, 2)}`);
      }
      return Object.assign({}, d, {
        uri: atURI(d.did, d.commit.collection, d.commit.rkey),
      });
    });

const newServerRecordSchema = makeBaseSchema(
  z.object({
    $type: z.literal(ids.ChatTinychatServer),
    name: z.string(),
  }),
);

const newChannelRecordSchema = makeBaseSchema(
  z.object({
    $type: z.literal(ids.ChatTinychatChannel),
    name: z.string(),
    server: z.string(),
  }),
);

const newMembershipRecordSchema = makeBaseSchema(
  z.object({
    $type: z.literal(ids.ChatTinychatGraphMembership),
    createdAt: z.string(),
    server: z.string(),
  }),
);

const newMessageRecordSchema = makeBaseSchema(
  z.object({
    $type: z.literal(ids.ChatTinychatMessage),
    channel: z.string(),
    createdAt: z.string(),
    server: z.string(),
    text: z.string(),
  }),
);

export type NewServerRecord = z.infer<typeof newServerRecordSchema>;
export type NewChannelRecord = z.infer<typeof newChannelRecordSchema>;
export type NewMembershipRecord = z.infer<typeof newMembershipRecordSchema>;
export type NewMessageRecord = z.infer<typeof newMessageRecordSchema>;

const jetstream = new Jetstream({
  wantedCollections: Deno.env.get("JETSTREAM_WANTED_COLLECTIONS")!.split(","),
  endpoint: Deno.env.get("JETSTREAM_URL")!,
  // cursor: Number(cursorFile),
});

type JetstreamCleanup = () => void;

type JetstreamConfig = {
  onNewServer: (m: NewServerRecord) => Promise<void>;
  onNewChannel: (m: NewChannelRecord) => void;
  onNewMembership: (m: NewMembershipRecord) => void;
  onNewMessage: (m: NewMessageRecord) => void;
};

export function startJetstream(
  { onNewServer, onNewMembership, onNewChannel, onNewMessage }: JetstreamConfig,
): JetstreamCleanup {
  console.log("Starting jetstream");

  // let intervalID: NodeJS.Timeout;
  // const cursorFile = fs.readFileSync("cursor.txt", "utf8");
  // if (cursorFile) ctx.logger.info(`Initiate jetstream at cursor ${cursorFile}`);

  jetstream.on("error", (err) => console.error(err));
  // jetstream.on("close", () => clearInterval(intervalID));

  jetstream.on("open", () => {
    // intervalID = setInterval(() => {
    //   if (jetstream.cursor) {
    //     fs.writeFile("cursor.txt", jetstream.cursor.toString(), (err) => {
    //       if (err) console.log(err);
    //     });
    //   }
    // }, 60000);
  });

  // handle server updates
  jetstream.on(ids.ChatTinychatServer, async (event) => {
    // we only do creates for now
    if (event.commit.operation !== "create") {
      return;
    }
    await onNewServer(newServerRecordSchema.parse(event));
  });

  // handle membership updates
  jetstream.on(ids.ChatTinychatGraphMembership, async (event) => {
    // we only do creates for now
    if (event.commit.operation !== "create") {
      return;
    }
    await onNewMembership(newMembershipRecordSchema.parse(event));
  });

  // handle channel updates
  jetstream.on(ids.ChatTinychatChannel, async (event) => {
    // we only do creates for now
    if (event.commit.operation !== "create") {
      return;
    }
    await onNewChannel(newChannelRecordSchema.parse(event));
  });

  // handle new message
  jetstream.on(ids.ChatTinychatMessage, async (event) => {
    // we only do creates for now
    if (event.commit.operation !== "create") {
      return;
    }
    await onNewMessage(newMessageRecordSchema.parse(event));
  });

  jetstream.start();
  console.log("Jetstream started");

  return () => {
    console.log("Stopping jetstream");
    jetstream.removeAllListeners();
    jetstream.close();
    console.log("Jetstream stopped");
  };
}

In [None]:
import { assert } from "asserts";
import { TID } from "@atproto/common";
import { TinychatAgent } from "tinychat/agent.ts";
import { sleep } from "tinychat/utils.ts";

Deno.test("jetstream", async (t) => {
  const servers: NewServerRecord[] = [];
  const memberships: NewMembershipRecord[] = [];
  const channels: NewChannelRecord[] = [];
  const messages: NewMessageRecord[] = [];
  const cleanup = startJetstream({
    onNewServer: async (m: NewServerRecord) => {
      servers.push(m);
      await Promise.resolve();
    },
    onNewChannel: (m: NewChannelRecord) => {
      channels.push(m);
    },
    onNewMembership: (m: NewMembershipRecord) => {
      memberships.push(m);
    },
    onNewMessage: (m: NewMessageRecord) => {
      messages.push(m);
    },
  });
  const serverName = `test-${TID.nextStr()}`;
  const agent = await TinychatAgent.create();
  const repo = await agent.agent.assertDid;

  // let's create a new chat server and watch it propagate through the system

  const chatServer = await agent.chat.tinychat.server.create({ repo }, {
    name: serverName,
  });
  await agent.chat.tinychat.graph.membership.create({ repo }, {
    server: chatServer.uri,
    createdAt: new Date().toISOString(),
  });
  const chatChannel = await agent.chat.tinychat.channel.create(
    { repo },
    {
      name: "test channel",
      server: chatServer.uri,
    },
  );
  await agent.chat.tinychat.message.create({
    repo,
  }, {
    channel: chatChannel.uri,
    server: chatServer.uri,
    text: "Hello world",
    createdAt: new Date().toISOString(),
  });

  await sleep(2000);

  await t.step("spot check jetstream updates", () => {
    assert(servers.length > 0);
    assert(servers.some((s) => s.commit.record.name === serverName));
    assert(memberships.length > 0);
    assert(memberships.some((m) => m.commit.record.server === chatServer.uri));
    assert(channels.length > 0);
    assert(
      channels.some((c) =>
        c.commit.record.name === "test channel" &&
        c.commit.record.server === chatServer.uri
      ),
    );
    assert(messages.length > 0);
    assert(
      messages.some((m) =>
        m.commit.record.text === "Hello world" &&
        m.commit.record.channel === chatChannel.uri
      ),
    );
  });

  cleanup();
  await sleep(2000);
});

In [None]:
// import { j } from "@jurassic/jurassic";
// j.initialize("firehose.ipynb");

// await j`i need sqlite sql to create tables to store server, channel, membership, and message records`;
