From 5f6124731fa279d27f90b12ac861a488928399f9 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Wed, 12 Nov 2025 19:17:55 +0100 Subject: [PATCH 1/2] feat: implement verifiers cache initialization and retrieval; add tests for cache functionality --- src/index.ts | 32 ++++++++-- src/logic/mqtt.ts | 22 +++---- src/logic/prover.ts | 10 ---- src/logic/sync.ts | 118 +++++++++++++++++++++++++++++-------- src/types.ts | 6 ++ src/utils.ts | 15 ----- tests/logic/sync.test.ts | 122 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 259 insertions(+), 66 deletions(-) create mode 100644 tests/logic/sync.test.ts diff --git a/src/index.ts b/src/index.ts index b58b6a1..1f411a0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,18 +1,40 @@ import "dotenv/config"; import { handleUplinks } from "./logic/mqtt"; import { Request, Response } from "express"; -import { app, m3ter, rollup } from "./logic/context"; +import { app } from "./logic/context"; import setupDatabase, { getAllMeterRecords, - saveMeter, deleteMeterByPublicKey, } from "./store/sqlite"; +import { initializeVerifiersCache } from "./logic/sync"; import "./logic/streamr"; -handleUplinks(); +// Async initialization function +async function initializeApp() { + try { + console.log("[info] Starting application initialization..."); + + // Initialize database tables and jobs + setupDatabase(); + console.log("[info] Database setup completed"); -// Initialize database tables and jobs -setupDatabase(); + // Initialize verifiers cache on startup + await initializeVerifiersCache(); + console.log("[info] Verifiers cache initialized successfully"); + + // Start MQTT handling + handleUplinks(); + console.log("[info] MQTT uplinks handler started"); + + console.log("[info] Application initialization completed successfully"); + } catch (error) { + console.error("[fatal] Failed to initialize application:", error); + process.exit(1); + } +} + +// Start initialization +initializeApp(); app.get("/", async (req: Request, res: Response) => { const m3ters = getAllMeterRecords(); diff --git a/src/logic/mqtt.ts b/src/logic/mqtt.ts index a2f5b12..79468ac 100644 --- a/src/logic/mqtt.ts +++ b/src/logic/mqtt.ts @@ -2,7 +2,7 @@ import { connect } from "mqtt"; import { enqueue } from "./grpc"; import { interact } from "./arweave"; import { encode } from "./encode"; -import { m3ter as m3terContract, rollup as rollupContract } from "./context"; +import { m3ter as m3terContract } from "./context"; import { deleteMeterByPublicKey, getAllMeterRecords, @@ -18,7 +18,7 @@ import { import { State, TransactionRecord } from "../types"; import { getProverURL, sendPendingTransactionsToProver } from "./prover"; import { decodePayload } from "./decode"; -import { getLocalIPv4, verifyPayloadSignature } from "../utils"; +import { verifyPayloadSignature } from "../utils"; import { getLatestTransactionNonce, pruneAndSyncOnchain, @@ -28,7 +28,7 @@ import { import { createMeterLogger, MeterLogger } from "../utils/logger"; import { publishPendingTransactionsToStreamr } from "./streamr"; -const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST || getLocalIPv4(); +const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST; const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain const deviceLocks = new Map(); // Lock per devEUI to prevent concurrent message processing @@ -50,6 +50,7 @@ export function handleUplinks() { client.on("error", (err) => { console.error("Connection error: ", err); client.end(); + process.exit(1); }); client.on("reconnect", () => { @@ -159,8 +160,10 @@ export async function handleMessage(blob: Buffer) { }); // update existing meter with devEui if not already set - logger.info(`Updating meter with DevEui: ${message["deviceInfo"]["devEui"]}`); - updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]); + if (!existingMeter.devEui || existingMeter.devEui !== message["deviceInfo"]["devEui"]) { + logger.info(`Updating meter with DevEui: ${message["deviceInfo"]["devEui"]}`); + updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]); + } // fetch and update latest nonce from chain const latestNonce = await getLatestTransactionNonce(existingMeter.tokenId); @@ -227,7 +230,6 @@ export async function handleMessage(blob: Buffer) { } // if device nonce is correct - if (decoded.nonce === expectedNonce) { logger.info(`Nonce is valid: ${decoded.nonce}`); @@ -248,8 +250,6 @@ export async function handleMessage(blob: Buffer) { updateMeterNonce(`0x${publicKey}`, expectedNonce); - logger.debug(`Current all meters: ${JSON.stringify(getAllMeterRecords())}`); - logger.info(`Updated meter nonce to: ${expectedNonce}`); const pendingTransactions = getAllTransactionRecords(); @@ -262,11 +262,7 @@ export async function handleMessage(blob: Buffer) { const response = await sendPendingTransactionsToProver(proverURL!, pendingTransactions); logger.info("done sending to prover"); - try { - logger.info(`Prover response: ${JSON.stringify(await response?.json())}`); - } catch (jsonError) { - logger.info(`Prover response (text): ${await response?.text()}`); - } + logger.info(`Prover response (text): ${await response?.text()}`); } catch (error) { logger.error(`Error sending pending transactions to prover: ${error}`); } diff --git a/src/logic/prover.ts b/src/logic/prover.ts index 9b42312..c4f828a 100644 --- a/src/logic/prover.ts +++ b/src/logic/prover.ts @@ -73,7 +73,6 @@ export async function sendTransactionsToProver( }); console.log("[info] received", response.status, "from the prover"); - console.log("prover response", await response.text()); if (!response.ok) { throw new Error(`Prover responded with status: ${response.status}`); @@ -128,15 +127,6 @@ export async function getProverURL(): Promise { } export async function sendPendingTransactionsToProver(proverURL: string, pendingTransactions: TransactionRecord[]) { - if (!proverURL) { - console.error("No active prover node available"); - return; - } - - if (pendingTransactions.length > 0) { - console.log("[info] Pending transactions:", pendingTransactions); - } - console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL); const requestPayload = buildBatchPayload(pendingTransactions); diff --git a/src/logic/sync.ts b/src/logic/sync.ts index 4cb4469..909e397 100644 --- a/src/logic/sync.ts +++ b/src/logic/sync.ts @@ -12,8 +12,93 @@ import { ccipRevenueReader as ccipRevenueReaderContract, priceContext as priceContextContract, } from "./context"; -import { JsonRpcProvider, Contract } from "ethers"; +import { JsonRpcProvider, Contract, ZeroAddress } from "ethers"; import { retry } from "../utils"; +import type { VerifierInfo } from "../types"; + +// Cache for verifiers - populated once on startup +let verifiersCache: VerifierInfo[] | null = null; +let isCacheInitialized = false; + +/** + * Initialize verifiers cache on program startup + * Fetches all verifiers and resolves their ENS names once + * Throws error if any fetch/resolution fails + */ +export async function initializeVerifiersCache(): Promise { + try { + console.log("[info] Initializing verifiers cache..."); + + // Get the number of verifiers + const verifierCount = Number(await retry(() => ccipRevenueReaderContract.verifierCount())); + console.log(`[info] Found ${verifierCount} verifiers to cache`); + + const verifiers: VerifierInfo[] = []; + + // Fetch all verifiers and resolve their ENS names + for (let i = 0; i < verifierCount; i++) { + try { + // Get verifier info (ensName, targetContractAddress) + const [ensName, targetAddress] = await retry(() => ccipRevenueReaderContract.verifiers(i)); + + console.log(`[info] Fetching verifier ${i}: ENS: ${ensName}, target: ${targetAddress}`); + + // Resolve ENS name to get the verifier address + const verifierAddress = await retry(() => provider.resolveName(ensName)); + + if (!verifierAddress || verifierAddress === ZeroAddress) { + throw new Error(`Failed to resolve ENS name: ${ensName}`); + } + + console.log(`[info] Resolved ${ensName} to verifier address: ${verifierAddress}`); + + verifiers.push({ + ensName, + targetAddress, + verifierAddress, + }); + } catch (error) { + console.error(`[error] Failed to initialize verifier ${i}:`, error); + throw error; // Fail fast as requested + } + } + + // Cache the verifiers + verifiersCache = verifiers; + isCacheInitialized = true; + + console.log(`[info] Successfully cached ${verifiers.length} verifiers`); + } catch (error) { + console.error("[error] Failed to initialize verifiers cache:", error); + isCacheInitialized = false; + verifiersCache = null; + throw error; + } +} + +/** + * Get cached verifiers, throws error if cache is not initialized + */ +function getCachedVerifiers(): VerifierInfo[] { + if (!isCacheInitialized || !verifiersCache) { + throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first."); + } + return verifiersCache; +} + +/** + * Check if verifiers cache is initialized + */ +export function isVerifiersCacheInitialized(): boolean { + return isCacheInitialized && verifiersCache !== null; +} + +/** + * Get the number of cached verifiers + */ +export function getCachedVerifiersCount(): number { + return verifiersCache?.length ?? 0; +} export async function pruneAndSyncOnchain(meterIdentifier: number | string): Promise { const meter = @@ -61,42 +146,29 @@ export async function getLatestTransactionNonce(meterIdentifier: number): Promis // get revenue across suppored chains export async function getCrossChainRevenue(tokenId: number): Promise { try { - // Get the number of verifiers - const verifierCount = Number(await retry(() => ccipRevenueReaderContract.verifierCount())); - + // Use cached verifiers instead of fetching them each time + const verifiers = getCachedVerifiers(); + let totalRevenue = 0; - // Iterate through all verifiers and get revenue from each chain - for (let i = 0; i < verifierCount; i++) { + // Iterate through all cached verifiers and get revenue from each chain + for (const verifier of verifiers) { try { - // Get verifier info (ensName, targetContractAddress) - const [ensName, targetAddress] = await retry(() => ccipRevenueReaderContract.verifiers(i)); - - console.log(`[info] Getting revenue from ENS: ${ensName}, target: ${targetAddress}`); - - // Resolve ENS name to get the verifier address - const verifierAddress = await retry(() => provider.resolveName(ensName)); - - if (!verifierAddress) { - console.error(`[error] Failed to resolve ENS name: ${ensName}`); - continue; - } - - console.log(`[info] Resolved ${ensName} to verifier address: ${verifierAddress}`); + console.log(`[info] Getting revenue from ENS: ${verifier.ensName}, target: ${verifier.targetAddress}, verifier: ${verifier.verifierAddress}`); // Get revenue from this specific chain using CCIP read // Parameters: tokenId, target (L2 contract), verifier (resolved from ENS) const revenue = await retry(() => - ccipRevenueReaderContract.read(tokenId, targetAddress, verifierAddress, { + ccipRevenueReaderContract.read(tokenId, verifier.targetAddress, verifier.verifierAddress, { enableCcipRead: true, }) ); const revenueAmount = Number(revenue); - console.log(`[info] Revenue from ${ensName} (${verifierAddress}): ${revenueAmount}`); + console.log(`[info] Revenue from ${verifier.ensName} (${verifier.verifierAddress}): ${revenueAmount}`); totalRevenue += revenueAmount; } catch (error) { - console.error(`[error] Failed to get revenue from verifier ${i}:`, error); + console.error(`[error] Failed to get revenue from verifier ${verifier.ensName}:`, error); // Continue with other verifiers even if one fails } } diff --git a/src/types.ts b/src/types.ts index 0439b23..8082331 100644 --- a/src/types.ts +++ b/src/types.ts @@ -45,3 +45,9 @@ export interface DecodedPayload { }; buf: Buffer; } + +export interface VerifierInfo { + ensName: string; + targetAddress: string; + verifierAddress: string; +} \ No newline at end of file diff --git a/src/utils.ts b/src/utils.ts index 90b8c23..0fbacdd 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -67,18 +67,3 @@ export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer): return false; } } - -export function getLocalIPv4() { - const nets = os.networkInterfaces(); - console.log(nets); - for (const network of Object.values(nets)) { - if (network) { - for (const iface of network) { - if (iface.family === "IPv4" && !iface.internal) { - return iface.address; - } - } - } - } - return "127.0.0.1"; -} diff --git a/tests/logic/sync.test.ts b/tests/logic/sync.test.ts new file mode 100644 index 0000000..4582f68 --- /dev/null +++ b/tests/logic/sync.test.ts @@ -0,0 +1,122 @@ +import { ZeroAddress } from "ethers"; +import { + initializeVerifiersCache, + isVerifiersCacheInitialized, + getCachedVerifiersCount, + getCrossChainRevenue, +} from "../../src/logic/sync"; + +// Mock the context module +jest.mock("../../src/logic/context", () => ({ + provider: { + resolveName: jest.fn(), + }, + ccipRevenueReader: { + verifierCount: jest.fn(), + verifiers: jest.fn(), + read: jest.fn(), + }, +})); + +// Mock the retry utility +jest.mock("../../src/utils", () => ({ + retry: jest.fn((fn) => fn()), +})); + +describe("Verifiers Cache", () => { + beforeEach(() => { + // Reset mocks before each test + jest.clearAllMocks(); + }); + + it("should initialize cache successfully", async () => { + const { provider, ccipRevenueReader } = require("../../src/logic/context"); + + // Mock successful responses + ccipRevenueReader.verifierCount.mockResolvedValue(2n); + ccipRevenueReader.verifiers + .mockResolvedValueOnce(["test1.eth", "0x1234567890123456789012345678901234567890"]) + .mockResolvedValueOnce(["test2.eth", "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdef"]); + + provider.resolveName + .mockResolvedValueOnce("0x1111111111111111111111111111111111111111") + .mockResolvedValueOnce("0x2222222222222222222222222222222222222222"); + + expect(isVerifiersCacheInitialized()).toBe(false); + expect(getCachedVerifiersCount()).toBe(0); + + await initializeVerifiersCache(); + + expect(isVerifiersCacheInitialized()).toBe(true); + expect(getCachedVerifiersCount()).toBe(2); + expect(ccipRevenueReader.verifierCount).toHaveBeenCalledTimes(1); + expect(ccipRevenueReader.verifiers).toHaveBeenCalledTimes(2); + expect(provider.resolveName).toHaveBeenCalledTimes(2); + }); + + it("should throw error if ENS resolution fails (returns null)", async () => { + const { provider, ccipRevenueReader } = require("../../src/logic/context"); + + // Mock responses with ENS resolution failure + ccipRevenueReader.verifierCount.mockResolvedValue(1n); + ccipRevenueReader.verifiers.mockResolvedValue(["invalid.eth", "0x1234567890123456789012345678901234567890"]); + provider.resolveName.mockResolvedValue(null); // ENS resolution fails + + await expect(initializeVerifiersCache()).rejects.toThrow("Failed to resolve ENS name: invalid.eth"); + + expect(isVerifiersCacheInitialized()).toBe(false); + expect(getCachedVerifiersCount()).toBe(0); + }); + + it("should throw error if ENS resolution fails (returns zero address)", async () => { + const { provider, ccipRevenueReader } = require("../../src/logic/context"); + + // Mock responses with ENS resolution failure + ccipRevenueReader.verifierCount.mockResolvedValue(1n); + ccipRevenueReader.verifiers.mockResolvedValue(["invalid.eth", "0x1234567890123456789012345678901234567890"]); + provider.resolveName.mockResolvedValue(ZeroAddress); // ENS resolution fails + + await expect(initializeVerifiersCache()).rejects.toThrow("Failed to resolve ENS name: invalid.eth"); + + expect(isVerifiersCacheInitialized()).toBe(false); + expect(getCachedVerifiersCount()).toBe(0); + }); + + it("should throw error if cache is not initialized when calling getCrossChainRevenue", async () => { + // Ensure cache is not initialized + expect(isVerifiersCacheInitialized()).toBe(false); + + await expect(getCrossChainRevenue(123)).rejects.toThrow( + "Verifiers cache not initialized. Call initializeVerifiersCache() first." + ); + }); + + it("should use cached verifiers for getCrossChainRevenue", async () => { + const { provider, ccipRevenueReader } = require("../../src/logic/context"); + + // Mock successful initialization + ccipRevenueReader.verifierCount.mockResolvedValue(1n); + ccipRevenueReader.verifiers.mockResolvedValue(["test.eth", "0x1234567890123456789012345678901234567890"]); + provider.resolveName.mockResolvedValue("0x1111111111111111111111111111111111111111"); + + // Initialize cache + await initializeVerifiersCache(); + + // Mock revenue reading + ccipRevenueReader.read.mockResolvedValue(1000n); + + const result = await getCrossChainRevenue(123); + + expect(result).toBe(1000); + expect(ccipRevenueReader.read).toHaveBeenCalledWith( + 123, + "0x1234567890123456789012345678901234567890", + "0x1111111111111111111111111111111111111111", + { enableCcipRead: true } + ); + + // Verify that verifierCount and verifiers are NOT called again + expect(ccipRevenueReader.verifierCount).toHaveBeenCalledTimes(1); + expect(ccipRevenueReader.verifiers).toHaveBeenCalledTimes(1); + }); +}); \ No newline at end of file From 0396c28a2e8cdf86fb1d08c2de829fb674d6f411 Mon Sep 17 00:00:00 2001 From: Emmo00 Date: Wed, 12 Nov 2025 19:33:25 +0100 Subject: [PATCH 2/2] fix: handle potential errors when fetching cross-chain revenue and owed amounts; remove unused function --- src/logic/mqtt.ts | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/logic/mqtt.ts b/src/logic/mqtt.ts index 79468ac..949092b 100644 --- a/src/logic/mqtt.ts +++ b/src/logic/mqtt.ts @@ -5,7 +5,6 @@ import { encode } from "./encode"; import { m3ter as m3terContract } from "./context"; import { deleteMeterByPublicKey, - getAllMeterRecords, getAllTransactionRecords, getMeterByDevEui, getMeterByPublicKey, @@ -84,6 +83,8 @@ export async function handleMessage(blob: Buffer) { return; } + let is_on = true; + // Set lock for this specific device deviceLocks.set(devEui, true); @@ -189,9 +190,14 @@ export async function handleMessage(blob: Buffer) { if (m3ter.latestNonce === 0 && decoded.nonce === 0) { logger.info("Both latest nonce and received nonce are 0, enqueuing 0 immediately"); - const is_on = - (await getCrossChainRevenue(m3ter.tokenId)) >= - (await getOwedFromPriceContext(m3ter.tokenId)); + try { + is_on = + (await getCrossChainRevenue(m3ter.tokenId)) >= + (await getOwedFromPriceContext(m3ter.tokenId)); + } catch (error) { + logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); + } + const state = { nonce: 0, is_on }; logger.info(`Enqueuing state: ${JSON.stringify(state)}`); @@ -276,8 +282,13 @@ export async function handleMessage(blob: Buffer) { } } - const is_on = - (await getCrossChainRevenue(m3ter.tokenId)) >= (await getOwedFromPriceContext(m3ter.tokenId)); + try { + is_on = + (await getCrossChainRevenue(m3ter.tokenId)) >= + (await getOwedFromPriceContext(m3ter.tokenId)); + } catch (error) { + logger.error(`Error fetching cross chain revenue or owed amount: ${error}`); + } const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on }; logger.info(`Enqueuing state: ${JSON.stringify(state)}`);