Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions src/logic/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { State, TransactionRecord } from "../types";
import { getProverURL, sendPendingTransactionsToProver } from "./verify";
import { decodePayload } from "./decode";
import { verifyPayloadSignature } from "../utils";
import { pruneAndSyncOnchain } from "./sync";
import { getLatestTransactionNonce, pruneAndSyncOnchain } from "./sync";

const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain

Expand Down Expand Up @@ -95,9 +95,13 @@ export async function handleMessage(blob: Buffer) {
// throw new Error("Token ID not found for public key: " + publicKey);
// }

const latestNonce = Number(await rollupContract.nonce(tokenId));
const latestNonce = await getLatestTransactionNonce(tokenId);

console.log("[info] Fetched tokenId and latestNonce from chain:", tokenId, latestNonce);
console.log(
"[info] Fetched tokenId and latestNonce from chain and local state:",
tokenId,
latestNonce
);

// save new meter with devEui
const newMeter = {
Expand All @@ -114,9 +118,9 @@ export async function handleMessage(blob: Buffer) {
updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]);

// fetch and update latest nonce from chain
const latestNonce = Number(await rollupContract.nonce(existingMeter.tokenId));
const latestNonce = await getLatestTransactionNonce(existingMeter.tokenId);

console.log("[info] Fetched latestNonce from chain:", latestNonce);
console.log("[info] Fetched latestNonce from chain and local state:", latestNonce);

updateMeterNonce(`0x${publicKey}`, latestNonce);
}
Expand Down Expand Up @@ -178,13 +182,7 @@ export async function handleMessage(blob: Buffer) {
raw: transactionHex.toString("hex"),
} as TransactionRecord;

try {
insertTransaction(transactionRecord);

console.log("[info] Inserted transaction record:", transactionRecord);
} catch (error) {
console.error("Error inserting transaction:", error);
}
insertTransaction(transactionRecord);

updateMeterNonce(`0x${publicKey}`, expectedNonce);

Expand Down
32 changes: 30 additions & 2 deletions src/logic/sync.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import { getMeterByPublicKey, getMeterByTokenId, pruneTransactionsBefore, updateMeterNonce } from "../store/sqlite";
import {
getMeterByPublicKey,
getMeterByTokenId,
getTransactionByNonce,
pruneTransactionsAfter,
pruneTransactionsBefore,
updateMeterNonce,
} from "../store/sqlite";
import { rollup as rollupContract } from "./context";

export async function pruneAndSyncOnchain(meterIdentifier: number | string): Promise<number> {
const meter = typeof meterIdentifier === "number" ? getMeterByTokenId(meterIdentifier) : getMeterByPublicKey(meterIdentifier);
const meter =
typeof meterIdentifier === "number"
? getMeterByTokenId(meterIdentifier)
: getMeterByPublicKey(meterIdentifier);

if (!meter) {
throw new Error(`Meter with identifier ${meterIdentifier} not found`);
Expand All @@ -22,3 +32,21 @@ export async function pruneAndSyncOnchain(meterIdentifier: number | string): Pro

return onchainNonce;
}

export async function getLatestTransactionNonce(meterIdentifier: number): Promise<number> {
// get latest nonce from chain
let latestNonce = Number(await rollupContract.nonce(meterIdentifier));

// check local state for the highest nonce we have
while (true) {
const existingTransaction = getTransactionByNonce(latestNonce + 1, meterIdentifier);
if (existingTransaction) {
latestNonce += 1;
} else {
pruneTransactionsAfter(latestNonce, meterIdentifier);
break;
}
}

return latestNonce;
}
30 changes: 28 additions & 2 deletions src/store/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import fs from "fs";
import Database from "better-sqlite3";
import type { Database as DatabaseType, Statement as DatabaseStatementType } from "better-sqlite3";
import { MeterRecord, TransactionRecord } from "../types";
import { get } from "http";

// meter queries
let db: DatabaseType;
Expand Down Expand Up @@ -219,13 +220,25 @@ export function updateMeterDevEui(publicKey: string, devEui: string): boolean {
}
}

export function getTransactionByNonce(nonce: number, identifier: number): TransactionRecord | null {
try {
const result = getTransactionByNonceQuery.get(nonce, identifier) as
| TransactionRecord
| undefined;
return result || null;
} catch (err: any) {
console.error("Failed to get transaction by nonce:", err);
return null;
}
}

// Transaction insertion function
export function insertTransaction(transactionData: TransactionRecord): void {
try {
const existingTransaction = getTransactionByNonceQuery.get(
const existingTransaction = getTransactionByNonce(
transactionData.nonce,
transactionData.identifier
) as TransactionRecord | undefined;
);

if (existingTransaction) {
throw new Error(`Transaction with nonce ${transactionData.nonce} already exists`);
Expand Down Expand Up @@ -260,3 +273,16 @@ export function pruneTransactionsBefore(nonce: number, meterNumber: number) {
console.error("Failed to prune transactions:", err);
}
}

export function pruneTransactionsAfter(nonce: number, meterNumber: number) {
try {
const result = db
.prepare(`DELETE FROM transactions WHERE identifier = ? AND nonce > ?`)
.run(meterNumber, nonce);
console.log(
`Pruned ${result.changes} transactions for meter ${meterNumber} with nonce > ${nonce}`
);
} catch (err: any) {
console.error("Failed to prune transactions:", err);
}
}