Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,40 @@
import "dotenv/config";
import { handleUplinks } from "./logic/mqtt";
import { Request, Response } from "express";
import { app, m3ter, rollup } from "./logic/context";
import { app } from "./logic/context";
import setupDatabase, {
getAllMeterRecords,
saveMeter,
deleteMeterByPublicKey,
} from "./store/sqlite";
import { initializeVerifiersCache } from "./logic/sync";
import "./logic/streamr";

handleUplinks();
// Async initialization function
async function initializeApp() {
try {
console.log("[info] Starting application initialization...");

// Initialize database tables and jobs
setupDatabase();
console.log("[info] Database setup completed");

// Initialize database tables and jobs
setupDatabase();
// Initialize verifiers cache on startup
await initializeVerifiersCache();
console.log("[info] Verifiers cache initialized successfully");

// Start MQTT handling
handleUplinks();
console.log("[info] MQTT uplinks handler started");

console.log("[info] Application initialization completed successfully");
} catch (error) {
console.error("[fatal] Failed to initialize application:", error);
process.exit(1);
}
}

// Start initialization
initializeApp();

app.get("/", async (req: Request, res: Response) => {
const m3ters = getAllMeterRecords();
Expand Down
45 changes: 26 additions & 19 deletions src/logic/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ import { connect } from "mqtt";
import { enqueue } from "./grpc";
import { interact } from "./arweave";
import { encode } from "./encode";
import { m3ter as m3terContract, rollup as rollupContract } from "./context";
import { m3ter as m3terContract } from "./context";
import {
deleteMeterByPublicKey,
getAllMeterRecords,
getAllTransactionRecords,
getMeterByDevEui,
getMeterByPublicKey,
Expand All @@ -18,7 +17,7 @@ import {
import { State, TransactionRecord } from "../types";
import { getProverURL, sendPendingTransactionsToProver } from "./prover";
import { decodePayload } from "./decode";
import { getLocalIPv4, verifyPayloadSignature } from "../utils";
import { verifyPayloadSignature } from "../utils";
import {
getLatestTransactionNonce,
pruneAndSyncOnchain,
Expand All @@ -28,7 +27,7 @@ import {
import { createMeterLogger, MeterLogger } from "../utils/logger";
import { publishPendingTransactionsToStreamr } from "./streamr";

const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST || getLocalIPv4();
const CHIRPSTACK_HOST = process.env.CHIRPSTACK_HOST;
const SYNC_EPOCH = 100; // after 100 transactions, sync with blockchain
const deviceLocks = new Map<string, boolean>(); // Lock per devEUI to prevent concurrent message processing

Expand All @@ -50,6 +49,7 @@ export function handleUplinks() {
client.on("error", (err) => {
console.error("Connection error: ", err);
client.end();
process.exit(1);
});

client.on("reconnect", () => {
Expand Down Expand Up @@ -83,6 +83,8 @@ export async function handleMessage(blob: Buffer) {
return;
}

let is_on = true;

// Set lock for this specific device
deviceLocks.set(devEui, true);

Expand Down Expand Up @@ -159,8 +161,10 @@ export async function handleMessage(blob: Buffer) {
});

// update existing meter with devEui if not already set
logger.info(`Updating meter with DevEui: ${message["deviceInfo"]["devEui"]}`);
updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]);
if (!existingMeter.devEui || existingMeter.devEui !== message["deviceInfo"]["devEui"]) {
logger.info(`Updating meter with DevEui: ${message["deviceInfo"]["devEui"]}`);
updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]);
}

// fetch and update latest nonce from chain
const latestNonce = await getLatestTransactionNonce(existingMeter.tokenId);
Expand All @@ -186,9 +190,14 @@ export async function handleMessage(blob: Buffer) {
if (m3ter.latestNonce === 0 && decoded.nonce === 0) {
logger.info("Both latest nonce and received nonce are 0, enqueuing 0 immediately");

const is_on =
(await getCrossChainRevenue(m3ter.tokenId)) >=
(await getOwedFromPriceContext(m3ter.tokenId));
try {
is_on =
(await getCrossChainRevenue(m3ter.tokenId)) >=
(await getOwedFromPriceContext(m3ter.tokenId));
} catch (error) {
logger.error(`Error fetching cross chain revenue or owed amount: ${error}`);
}

const state = { nonce: 0, is_on };

logger.info(`Enqueuing state: ${JSON.stringify(state)}`);
Expand Down Expand Up @@ -227,7 +236,6 @@ export async function handleMessage(blob: Buffer) {
}

// if device nonce is correct

if (decoded.nonce === expectedNonce) {
logger.info(`Nonce is valid: ${decoded.nonce}`);

Expand All @@ -248,8 +256,6 @@ export async function handleMessage(blob: Buffer) {

updateMeterNonce(`0x${publicKey}`, expectedNonce);

logger.debug(`Current all meters: ${JSON.stringify(getAllMeterRecords())}`);

logger.info(`Updated meter nonce to: ${expectedNonce}`);

const pendingTransactions = getAllTransactionRecords();
Expand All @@ -262,11 +268,7 @@ export async function handleMessage(blob: Buffer) {
const response = await sendPendingTransactionsToProver(proverURL!, pendingTransactions);

logger.info("done sending to prover");
try {
logger.info(`Prover response: ${JSON.stringify(await response?.json())}`);
} catch (jsonError) {
logger.info(`Prover response (text): ${await response?.text()}`);
}
logger.info(`Prover response (text): ${await response?.text()}`);
} catch (error) {
logger.error(`Error sending pending transactions to prover: ${error}`);
}
Expand All @@ -280,8 +282,13 @@ export async function handleMessage(blob: Buffer) {
}
}

const is_on =
(await getCrossChainRevenue(m3ter.tokenId)) >= (await getOwedFromPriceContext(m3ter.tokenId));
try {
is_on =
(await getCrossChainRevenue(m3ter.tokenId)) >=
(await getOwedFromPriceContext(m3ter.tokenId));
} catch (error) {
logger.error(`Error fetching cross chain revenue or owed amount: ${error}`);
}
const state = decoded.nonce === expectedNonce ? { is_on } : { nonce: m3ter.latestNonce, is_on };

logger.info(`Enqueuing state: ${JSON.stringify(state)}`);
Expand Down
10 changes: 0 additions & 10 deletions src/logic/prover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ export async function sendTransactionsToProver(
});

console.log("[info] received", response.status, "from the prover");
console.log("prover response", await response.text());

if (!response.ok) {
throw new Error(`Prover responded with status: ${response.status}`);
Expand Down Expand Up @@ -128,15 +127,6 @@ export async function getProverURL(): Promise<string | null> {
}

export async function sendPendingTransactionsToProver(proverURL: string, pendingTransactions: TransactionRecord[]) {
if (!proverURL) {
console.error("No active prover node available");
return;
}

if (pendingTransactions.length > 0) {
console.log("[info] Pending transactions:", pendingTransactions);
}

console.log("[info] Sending", pendingTransactions.length, "transactions to prover at", proverURL);

const requestPayload = buildBatchPayload(pendingTransactions);
Expand Down
118 changes: 95 additions & 23 deletions src/logic/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,93 @@ import {
ccipRevenueReader as ccipRevenueReaderContract,
priceContext as priceContextContract,
} from "./context";
import { JsonRpcProvider, Contract } from "ethers";
import { JsonRpcProvider, Contract, ZeroAddress } from "ethers";
import { retry } from "../utils";
import type { VerifierInfo } from "../types";

// Cache for verifiers - populated once on startup
let verifiersCache: VerifierInfo[] | null = null;
let isCacheInitialized = false;

/**
* Initialize verifiers cache on program startup
* Fetches all verifiers and resolves their ENS names once
* Throws error if any fetch/resolution fails
*/
export async function initializeVerifiersCache(): Promise<void> {
try {
console.log("[info] Initializing verifiers cache...");

// Get the number of verifiers
const verifierCount = Number(await retry(() => ccipRevenueReaderContract.verifierCount()));
console.log(`[info] Found ${verifierCount} verifiers to cache`);

const verifiers: VerifierInfo[] = [];

// Fetch all verifiers and resolve their ENS names
for (let i = 0; i < verifierCount; i++) {
try {
// Get verifier info (ensName, targetContractAddress)
const [ensName, targetAddress] = await retry(() => ccipRevenueReaderContract.verifiers(i));

console.log(`[info] Fetching verifier ${i}: ENS: ${ensName}, target: ${targetAddress}`);

// Resolve ENS name to get the verifier address
const verifierAddress = await retry(() => provider.resolveName(ensName));

if (!verifierAddress || verifierAddress === ZeroAddress) {
throw new Error(`Failed to resolve ENS name: ${ensName}`);
}

console.log(`[info] Resolved ${ensName} to verifier address: ${verifierAddress}`);

verifiers.push({
ensName,
targetAddress,
verifierAddress,
});
} catch (error) {
console.error(`[error] Failed to initialize verifier ${i}:`, error);
throw error; // Fail fast as requested
}
}

// Cache the verifiers
verifiersCache = verifiers;
isCacheInitialized = true;

console.log(`[info] Successfully cached ${verifiers.length} verifiers`);
} catch (error) {
console.error("[error] Failed to initialize verifiers cache:", error);
isCacheInitialized = false;
verifiersCache = null;
throw error;
}
}

/**
* Get cached verifiers, throws error if cache is not initialized
*/
function getCachedVerifiers(): VerifierInfo[] {
if (!isCacheInitialized || !verifiersCache) {
throw new Error("Verifiers cache not initialized. Call initializeVerifiersCache() first.");
}
return verifiersCache;
}

/**
* Check if verifiers cache is initialized
*/
export function isVerifiersCacheInitialized(): boolean {
return isCacheInitialized && verifiersCache !== null;
}

/**
* Get the number of cached verifiers
*/
export function getCachedVerifiersCount(): number {
return verifiersCache?.length ?? 0;
}

export async function pruneAndSyncOnchain(meterIdentifier: number | string): Promise<number> {
const meter =
Expand Down Expand Up @@ -61,42 +146,29 @@ export async function getLatestTransactionNonce(meterIdentifier: number): Promis
// get revenue across suppored chains
export async function getCrossChainRevenue(tokenId: number): Promise<number> {
try {
// Get the number of verifiers
const verifierCount = Number(await retry(() => ccipRevenueReaderContract.verifierCount()));

// Use cached verifiers instead of fetching them each time
const verifiers = getCachedVerifiers();
let totalRevenue = 0;

// Iterate through all verifiers and get revenue from each chain
for (let i = 0; i < verifierCount; i++) {
// Iterate through all cached verifiers and get revenue from each chain
for (const verifier of verifiers) {
try {
// Get verifier info (ensName, targetContractAddress)
const [ensName, targetAddress] = await retry(() => ccipRevenueReaderContract.verifiers(i));

console.log(`[info] Getting revenue from ENS: ${ensName}, target: ${targetAddress}`);

// Resolve ENS name to get the verifier address
const verifierAddress = await retry(() => provider.resolveName(ensName));

if (!verifierAddress) {
console.error(`[error] Failed to resolve ENS name: ${ensName}`);
continue;
}

console.log(`[info] Resolved ${ensName} to verifier address: ${verifierAddress}`);
console.log(`[info] Getting revenue from ENS: ${verifier.ensName}, target: ${verifier.targetAddress}, verifier: ${verifier.verifierAddress}`);

// Get revenue from this specific chain using CCIP read
// Parameters: tokenId, target (L2 contract), verifier (resolved from ENS)
const revenue = await retry(() =>
ccipRevenueReaderContract.read(tokenId, targetAddress, verifierAddress, {
ccipRevenueReaderContract.read(tokenId, verifier.targetAddress, verifier.verifierAddress, {
enableCcipRead: true,
})
);
const revenueAmount = Number(revenue);

console.log(`[info] Revenue from ${ensName} (${verifierAddress}): ${revenueAmount}`);
console.log(`[info] Revenue from ${verifier.ensName} (${verifier.verifierAddress}): ${revenueAmount}`);
totalRevenue += revenueAmount;
} catch (error) {
console.error(`[error] Failed to get revenue from verifier ${i}:`, error);
console.error(`[error] Failed to get revenue from verifier ${verifier.ensName}:`, error);
// Continue with other verifiers even if one fails
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,9 @@ export interface DecodedPayload {
};
buf: Buffer;
}

export interface VerifierInfo {
ensName: string;
targetAddress: string;
verifierAddress: string;
}
15 changes: 0 additions & 15 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,3 @@ export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer):
return false;
}
}

export function getLocalIPv4() {
const nets = os.networkInterfaces();
console.log(nets);
for (const network of Object.values(nets)) {
if (network) {
for (const iface of network) {
if (iface.family === "IPv4" && !iface.internal) {
return iface.address;
}
}
}
}
return "127.0.0.1";
}
Loading