Skip to content
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
emulate.ts
test-database.ts
script.*.ts
script.*.js

# Database files
*.db
Expand Down
18 changes: 16 additions & 2 deletions src/logic/arweave.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ export async function interact(m3terId: number, decoded: DecodedPayload) {

const contractLabel = process.env.CONTRACT_LABEL || "M3ters";

const byteLength = transactionHex.length;
const byteLength = Buffer.byteLength(transactionHex.toString("hex"), "utf8");

return await turbo.uploadFile({
fileStreamFactory: () => Readable.from([transactionHex.toString("hex")], { encoding: "utf8" }),
fileStreamFactory: () => Readable.from(transactionHex.toString("hex"), { encoding: "utf8" }),
fileSizeFactory: () => byteLength,
dataItemOpts: {
paidBy: await arweave.wallets.jwkToAddress(key),
Expand All @@ -42,5 +42,19 @@ export async function interact(m3terId: number, decoded: DecodedPayload) {
{ name: "Latitude", value: decoded.extensions?.latitude?.toString() ?? "" },
],
},
events: {
onUploadProgress: (progress) => {
console.log("[arweave] Upload progress:", progress);
},
onError: (error) => {
console.error("[arweave] Upload error:", error);
},
onSuccess(event) {
console.log("[arweave] Upload successful! Transaction ID:", event);
},
onUploadSuccess(event) {
console.log("[arweave] Upload completed! Transaction ID:", event);
},
},
});
}
2 changes: 1 addition & 1 deletion src/logic/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ app.listen(port, () => {
const provider = new JsonRpcProvider(process.env.MAINNET_RPC);

export const m3ter = new Contract(
"0x7c6FEF064603B91bE9d739fE981c28Fd82a6D62b", // "0x40a36C0eF29A49D1B1c1fA45fab63762f8FC423F",
"0x7c6FEF064603B91bE9d739fE981c28Fd82a6D62b", // "0x40a36C0eF29A49D1B1c1fA45fab63762f8FC423F"
[
"function publicKey(uint256) view returns (bytes32)",
"function tokenID(bytes32) view returns (uint256)",
Expand Down
52 changes: 36 additions & 16 deletions src/logic/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
import { State, TransactionRecord } from "../types";
import { getProverURL, sendPendingTransactionsToProver } from "./verify";
import { decodePayload } from "./decode";
import { verifyPayloadSignature } from "../utils";

export function handleUplinks() {
const client = connect({
Expand Down Expand Up @@ -57,10 +58,31 @@ export async function handleMessage(blob: Buffer) {
const transactionHex = payload;
const decoded = decodePayload(transactionHex);
let publicKey = decoded.extensions.deviceId;
let payloadHadPublicKey = !!publicKey;

console.log("[info] Decoded payload:", decoded);

if (publicKey) {
if (!publicKey) {
// try to find public key by DevEui
const devEui = message["deviceInfo"]["devEui"];
const meterByDevEui = getMeterByDevEui(devEui);

if (!meterByDevEui) {
throw new Error("Device EUI not associated with any meter: " + devEui);
}

publicKey = meterByDevEui.publicKey.replace("0x", "");
}

// verify transaction signature
const isValid = verifyPayloadSignature(transactionHex, Buffer.from(publicKey!, "hex"));
if (!isValid) {
throw new Error("Invalid transaction signature for meter with public key: " + publicKey);
}

console.log("[info] Verified signature");

if (payloadHadPublicKey) {
// save public key with device EUI mapping if not already saved
const existingMeter = getMeterByPublicKey(`0x${publicKey}`);

Expand All @@ -86,16 +108,6 @@ export async function handleMessage(blob: Buffer) {
updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]);
console.log("[info] Updated meter with DevEui:", existingMeter.tokenId);
}
} else {
// try to find meter by DevEui
const devEui = message["deviceInfo"]["devEui"];
const meterByDevEui = getMeterByDevEui(devEui);

if (!meterByDevEui) {
throw new Error("Device EUI not associated with any meter: " + devEui);
}

publicKey = meterByDevEui.publicKey.replace("0x", "");
}

const m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null;
Expand All @@ -118,6 +130,7 @@ export async function handleMessage(blob: Buffer) {

if (decoded.nonce === expectedNonce) {
console.log("[info] Nonce is valid:", decoded.nonce);

// Upload to arweave
await interact(m3ter.tokenId, decoded);

Expand All @@ -126,8 +139,7 @@ export async function handleMessage(blob: Buffer) {
// save transaction to local store
const transactionRecord = {
nonce: decoded.nonce,
verified: false,
identifier: m3ter.tokenId.toString(),
identifier: m3ter.tokenId,
receivedAt: Date.now(),
raw: transactionHex.toString("hex"),
} as TransactionRecord;
Expand All @@ -140,7 +152,7 @@ export async function handleMessage(blob: Buffer) {
console.error("Error inserting transaction:", error);
}

updateMeterNonce(publicKey, expectedNonce);
updateMeterNonce(`0x${publicKey}`, expectedNonce);

console.log("[info] Updated meter nonce to:", expectedNonce);

Expand All @@ -158,17 +170,25 @@ export async function handleMessage(blob: Buffer) {
}

const state =
decoded.nonce === m3ter.latestNonce + 1 || decoded.nonce === 0
decoded.nonce === m3ter.latestNonce + 1 || (decoded.nonce === 0 && m3ter.latestNonce === 0)
? { is_on: true }
: { nonce: m3ter.latestNonce, is_on: true };

// TODO: remove the following block after testing
// if transaction nonce is 0 and the latest nonce is 0
// update the latest nonce to 1, respond with 1
if (decoded.nonce === 0 && m3ter.latestNonce === 0) {
updateMeterNonce(`0x${publicKey}`, 1);
state.nonce = 1;
}

console.log("[info] Enqueuing state:", state);

enqueue(
message["deviceInfo"]["devEui"],
encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0)
);
} catch (error) {
console.log(error);
console.error("❌ Error handling MQTT message:", error);
}
}
21 changes: 21 additions & 0 deletions src/logic/sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { getAllMeterRecords, pruneTransactionsBefore, updateMeterNonce } from "../store/sqlite";
import { rollup as rollupContract } from "./context";

export async function pruneAndSyncWithBlockchain() {
// Get all meter records from the local database
const meters = getAllMeterRecords();

for (const meter of meters) {
const { publicKey, latestNonce } = meter;

// Check the latest nonce on the blockchain
const blockchainNonce = Number(await rollupContract.nonce(meter.tokenId));

if (blockchainNonce > latestNonce) {
// If the blockchain nonce is greater, update the local record
updateMeterNonce(publicKey, blockchainNonce);
// prune transactions with nonce less than or equal to blockchainNonce
pruneTransactionsBefore(meter.tokenId, blockchainNonce);
}
}
}
7 changes: 5 additions & 2 deletions src/logic/verify.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { BatchTransactionPayload, TransactionRecord } from "../types";
import { rollup } from "./context";
import { getUnverifiedTransactionRecords } from "../store/sqlite";
import { buildBatchPayload } from "../utils";
import { getAllTransactionRecords } from "../store/sqlite";

const PREFERRED_PROVER_NODE = process.env.PREFERRED_PROVER_NODE || "https://prover.m3ter.ing";

Expand Down Expand Up @@ -130,7 +130,7 @@ export async function getProverURL(): Promise<string | null> {
}

export async function sendPendingTransactionsToProver(proverURL: string) {
const pendingTransactions = getUnverifiedTransactionRecords();
const pendingTransactions = getAllTransactionRecords();

if (!proverURL) {
console.error("No active prover node available");
Expand All @@ -139,5 +139,8 @@ export async function sendPendingTransactionsToProver(proverURL: string) {

const requestPayload = buildBatchPayload(pendingTransactions);

console.log("[info] Sending", requestPayload.length, "transactions to prover at", proverURL);
console.log("[info] Request payload:", requestPayload);

return await sendTransactionsToProver(proverURL, requestPayload);
}
70 changes: 14 additions & 56 deletions src/store/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ let updateMeterDevEuiQuery: DatabaseStatementType;
// transaction queries
let createTransactionQuery: DatabaseStatementType;
let getTransactionByNonceQuery: DatabaseStatementType;
let getUnverifiedTransactionRecordsQuery: DatabaseStatementType;
let markTransactionAsVerifiedQuery: DatabaseStatementType;
let deleteVerifiedTransactionRecordsQuery: DatabaseStatementType;

/**
* setup database
Expand Down Expand Up @@ -53,9 +50,8 @@ function initializeTransactionsTable() {
return db.exec(`
CREATE TABLE IF NOT EXISTS transactions (
nonce INTEGER,
identifier TEXT,
identifier INTEGER, -- Meter token ID
receivedAt INTEGER,
verified BOOLEAN DEFAULT FALSE,
raw TEXT,

UNIQUE(nonce, identifier)
Expand Down Expand Up @@ -117,25 +113,13 @@ function prepareQueries() {

// transaction queries
createTransactionQuery = db.prepare(`
INSERT INTO transactions (nonce, identifier, verified, receivedAt, raw)
VALUES (@nonce, @identifier, @verified, @receivedAt, @raw)
INSERT INTO transactions (nonce, identifier, receivedAt, raw)
VALUES (@nonce, @identifier, @receivedAt, @raw)
`);

getTransactionByNonceQuery = db.prepare(`
SELECT * FROM transactions WHERE nonce = ? AND identifier = ?
`);

getUnverifiedTransactionRecordsQuery = db.prepare(`
SELECT * FROM transactions WHERE verified = FALSE
`);

markTransactionAsVerifiedQuery = db.prepare(`
UPDATE transactions SET verified = TRUE WHERE nonce = ?
`);

deleteVerifiedTransactionRecordsQuery = db.prepare(`
DELETE FROM transactions WHERE verified = TRUE
`);
}

// Meter management functions
Expand Down Expand Up @@ -249,58 +233,32 @@ export function insertTransaction(transactionData: TransactionRecord): void {
throw new Error(`Transaction with nonce ${transactionData.nonce} already exists`);
}

transactionData.verified = +Boolean(transactionData.verified) as 0 | 1; // Ensure verified is set
createTransactionQuery.run(transactionData);
} catch (err: any) {
console.error("Failed to insert transaction:", err);
throw err;
}
}

export function getTransactionByNonce(nonce: number): TransactionRecord | null {
export function getAllTransactionRecords(): TransactionRecord[] {
try {
const result = getTransactionByNonceQuery.get(nonce) as TransactionRecord | undefined;
return result || null;
} catch (err: any) {
console.error("Failed to get transaction by nonce:", err);
return null;
}
}

// Transaction verification functions
export function getUnverifiedTransactionRecords(): TransactionRecord[] {
try {
const results = getUnverifiedTransactionRecordsQuery.all() as TransactionRecord[];
const results = db.prepare(`SELECT * FROM transactions`).all() as TransactionRecord[];
return results;
} catch (err: any) {
console.error("Failed to get unverified transactions:", err);
console.error("Failed to get all transactions:", err);
return [];
}
}

export function markTransactionAsVerified(nonce: number): boolean {
try {
const result = markTransactionAsVerifiedQuery.run(nonce);
const updated = result.changes > 0;
if (!updated) {
console.log("Transaction not found for verification:", {
nonce,
});
}
return updated;
} catch (err: any) {
console.error("Failed to mark transaction as verified:", err);
return false;
}
}

export function deleteVerifiedTransactionRecords(): number {
export function pruneTransactionsBefore(nonce: number, meterNumber: number) {
try {
const result = deleteVerifiedTransactionRecordsQuery.run();
const deletedCount = result.changes;
return deletedCount;
const result = db
.prepare(`DELETE FROM transactions WHERE identifier = ? AND nonce < ?`)
.run(meterNumber, nonce);
console.log(
`Pruned ${result.changes} transactions for meter ${meterNumber} with nonce < ${nonce}`
);
} catch (err: any) {
console.error("Failed to delete verified transactions:", err);
return 0;
console.error("Failed to prune transactions:", err);
}
}
8 changes: 1 addition & 7 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ export interface MeterRecord {
// transaction database record
export interface TransactionRecord {
nonce: number;
identifier: string;
verified: boolean | 0 | 1; // Optional field to indicate if the transaction is verified
identifier: number; // Meter token ID
receivedAt: number;
raw: string; // Raw transaction data in hex format
}
Expand All @@ -34,11 +33,6 @@ export interface State {
token_id: number;
}

export interface M3terPayload {
0: string; // encoded format: nonce | energy | signature | voltage | device_id | longitude | latitude
1: string;
}

export interface DecodedPayload {
nonce: number;
energy: number;
Expand Down
26 changes: 26 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,34 @@
import { TransactionRecord, BatchTransactionPayload } from "./types";
import { createPublicKey, verify } from "crypto";

export function buildBatchPayload(transactions: TransactionRecord[]): BatchTransactionPayload[] {
return transactions.map((transaction) => ({
m3ter_id: Number(transaction.identifier),
message: transaction.raw,
}));
}

export function verifyPayloadSignature(transaction: Buffer, rawPubKey: Buffer): boolean {
try {
const message = transaction.subarray(0, 8);
const signature = transaction.subarray(8, 72);

// Wrap raw key in SPKI DER
const spkiPrefix = Buffer.from("302a300506032b6570032100", "hex");
const derKey = Buffer.concat([spkiPrefix, rawPubKey]);

const publicKey = createPublicKey({
key: derKey,
format: "der",
type: "spki",
});

// Verify
const ok = verify(null, message, publicKey, signature);

return ok;
} catch (error) {
console.error("Error verifying signature:", error);
return false;
}
}
Loading