diff --git a/.gitignore b/.gitignore index 69b4b6b..d03d680 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ #test scripts emulate.ts test-database.ts +script.*.ts # Database files *.db diff --git a/src/index.ts b/src/index.ts index fe21b09..d581250 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,7 @@ import "dotenv/config"; import { handleUplinks } from "./logic/mqtt"; import { Request, Response } from "express"; -import { app, m3ter } from "./logic/context"; +import { app, m3ter, rollup } from "./logic/context"; import setupDatabase, { getAllMeterRecords, saveMeter, @@ -13,8 +13,6 @@ handleUplinks(); // Initialize database tables and jobs setupDatabase(); -console.log("[server]: Server is starting...", process.env); // todo: remove - app.get("/", async (req: Request, res: Response) => { const m3ters = getAllMeterRecords(); res.render("index", { m3ters }); @@ -25,10 +23,12 @@ app.post("/", async (req: Request, res: Response) => { try { const tokenId = (await req.body).tokenId; const publicKey = await m3ter.publicKey(tokenId); + const latestNonce = await rollup.nonce(tokenId); saveMeter({ publicKey, tokenId, - latestNonce: 0, // Initialize latestNonce to 0 + latestNonce: Number(latestNonce), + devEui: (await req.body).devEui ?? null, }); } catch (err) { console.error(err); diff --git a/src/logic/context.ts b/src/logic/context.ts index aa38f6e..407287e 100644 --- a/src/logic/context.ts +++ b/src/logic/context.ts @@ -32,8 +32,11 @@ app.listen(port, () => { const provider = new JsonRpcProvider(process.env.MAINNET_RPC); export const m3ter = new Contract( - "0x40a36C0eF29A49D1B1c1fA45fab63762f8FC423F", - ["function publicKey(uint256) view returns (bytes32)"], + "0x7c6FEF064603B91bE9d739fE981c28Fd82a6D62b", // "0x40a36C0eF29A49D1B1c1fA45fab63762f8FC423F", + [ + "function publicKey(uint256) view returns (bytes32)", + "function tokenID(bytes32) view returns (uint256)", + ], provider ); diff --git a/src/logic/mqtt.ts b/src/logic/mqtt.ts index a8833bc..139dbab 100644 --- a/src/logic/mqtt.ts +++ b/src/logic/mqtt.ts @@ -2,7 +2,16 @@ import { connect } from "mqtt"; import { enqueue } from "./grpc"; import { interact } from "./arweave"; import { encode } from "./encode"; -import { getMeterByPublicKey, insertTransaction, updateMeterNonce } from "../store/sqlite"; +import { m3ter as m3terContract, rollup as rollupContract } from "./context"; +import { + getAllMeterRecords, + getMeterByDevEui, + getMeterByPublicKey, + insertTransaction, + saveMeter, + updateMeterDevEui, + updateMeterNonce, +} from "../store/sqlite"; import { State, TransactionRecord } from "../types"; import { getProverURL, sendPendingTransactionsToProver } from "./verify"; import { decodePayload } from "./decode"; @@ -36,30 +45,63 @@ export function handleUplinks() { }); } -async function handleMessage(blob: Buffer) { +export async function handleMessage(blob: Buffer) { try { const message = JSON.parse(blob.toString()); console.log("[info] Received uplink from device:", JSON.stringify(message)); - const payload = Buffer.from(message["data"], "hex"); + const payload = Buffer.from(message["data"], "base64"); // encode transaction into standard format (payload is hex string) // format: nonce | energy | signature | voltage | device_id | longitude | latitude const transactionHex = payload; const decoded = decodePayload(transactionHex); - const publicKey = decoded.extensions.deviceId; + let publicKey = decoded.extensions.deviceId; console.log("[info] Decoded payload:", decoded); - if (!publicKey) { - throw new Error("Invalid Public Key"); + if (publicKey) { + // save public key with device EUI mapping if not already saved + const existingMeter = getMeterByPublicKey(`0x${publicKey}`); + + if (!existingMeter) { + const tokenId = Number(await m3terContract.tokenID(`0x${publicKey}`)); + if (tokenId === 0) { + throw new Error("Token ID not found for public key: " + publicKey); + } + + const latestNonce = Number(await rollupContract.nonce(tokenId)); + + // save new meter with devEui + const newMeter = { + publicKey: `0x${publicKey}`, + devEui: message["deviceInfo"]["devEui"], + tokenId, + latestNonce, + }; + saveMeter(newMeter); + console.log("[info] Saved new meter:", newMeter); + } else if (existingMeter && !existingMeter.devEui) { + // update existing meter with devEui if not already set + updateMeterDevEui(`0x${publicKey}`, message["deviceInfo"]["devEui"]); + console.log("[info] Updated meter with DevEui:", existingMeter.tokenId); + } + } else { + // try to find meter by DevEui + const devEui = message["deviceInfo"]["devEui"]; + const meterByDevEui = getMeterByDevEui(devEui); + + if (!meterByDevEui) { + throw new Error("Device EUI not associated with any meter: " + devEui); + } + + publicKey = meterByDevEui.publicKey.replace("0x", ""); } - const m3ter = getMeterByPublicKey(publicKey ?? ""); + const m3ter = getMeterByPublicKey(`0x${publicKey}`) ?? null; if (!m3ter) { - console.error("Meter not found for public key:", publicKey); - return; + throw new Error("Meter not found for public key: " + publicKey); } console.log( @@ -74,10 +116,7 @@ async function handleMessage(blob: Buffer) { // if device nonce is correct const expectedNonce = m3ter.latestNonce + 1; - let state; if (decoded.nonce === expectedNonce) { - state = { is_on: true }; - console.log("[info] Nonce is valid:", decoded.nonce); // Upload to arweave await interact(m3ter.tokenId, decoded); @@ -118,13 +157,16 @@ async function handleMessage(blob: Buffer) { } } + const state = + decoded.nonce === m3ter.latestNonce + 1 || decoded.nonce === 0 + ? { is_on: true } + : { nonce: m3ter.latestNonce, is_on: true }; + + console.log("[info] Enqueuing state:", state); + enqueue( message["deviceInfo"]["devEui"], - encode( - (state ? state : { nonce: expectedNonce, is_on: true }) as State, - decoded.extensions.latitude ?? 0, - decoded.extensions.longitude ?? 0 - ) + encode(state as State, decoded.extensions.latitude ?? 0, decoded.extensions.longitude ?? 0) ); } catch (error) { console.log(error); diff --git a/src/store/sqlite.ts b/src/store/sqlite.ts index 3cbe41d..5a0ad33 100644 --- a/src/store/sqlite.ts +++ b/src/store/sqlite.ts @@ -7,10 +7,12 @@ import { MeterRecord, TransactionRecord } from "../types"; let db: DatabaseType; let insertMeterQuery: DatabaseStatementType; let getMeterByPublicKeyQuery: DatabaseStatementType; +let getMeterByDevEuiQuery: DatabaseStatementType; let getMeterByTokenIdQuery: DatabaseStatementType; let getAllMetersQuery: DatabaseStatementType; let deleteMeterByPublicKeyQuery: DatabaseStatementType; let updateMeterNonceQuery: DatabaseStatementType; +let updateMeterDevEuiQuery: DatabaseStatementType; // transaction queries let createTransactionQuery: DatabaseStatementType; let getTransactionByNonceQuery: DatabaseStatementType; @@ -68,8 +70,11 @@ function initializeMetersTable() { return db.exec(` CREATE TABLE IF NOT EXISTS meters ( publicKey TEXT, + devEui TEXT, tokenId INTEGER, - latestNonce INTEGER DEFAULT -1 + latestNonce INTEGER DEFAULT -1, + + UNIQUE(publicKey, tokenId) ) `); } @@ -78,20 +83,24 @@ function initializeMetersTable() { function prepareQueries() { // meter queries insertMeterQuery = db.prepare(` - INSERT OR REPLACE INTO meters (publicKey, tokenId, latestNonce) - VALUES (@publicKey, @tokenId, @latestNonce) + INSERT OR REPLACE INTO meters (publicKey, devEui, tokenId, latestNonce) + VALUES (@publicKey, @devEui, @tokenId, @latestNonce) `); getMeterByPublicKeyQuery = db.prepare(` - SELECT publicKey, tokenId, latestNonce FROM meters WHERE publicKey = ? + SELECT publicKey, devEui, tokenId, latestNonce FROM meters WHERE publicKey = ? + `); + + getMeterByDevEuiQuery = db.prepare(` + SELECT publicKey, devEui, tokenId, latestNonce FROM meters WHERE devEui = ? `); getMeterByTokenIdQuery = db.prepare(` - SELECT publicKey, tokenId, latestNonce FROM meters WHERE tokenId = ? + SELECT publicKey, devEui, tokenId, latestNonce FROM meters WHERE tokenId = ? `); getAllMetersQuery = db.prepare(` - SELECT publicKey, tokenId, latestNonce FROM meters + SELECT publicKey, devEui, tokenId, latestNonce FROM meters `); deleteMeterByPublicKeyQuery = db.prepare(` @@ -102,6 +111,10 @@ function prepareQueries() { UPDATE meters SET latestNonce = ? WHERE publicKey = ? `); + updateMeterDevEuiQuery = db.prepare(` + UPDATE meters SET devEui = ? WHERE publicKey = ? + `); + // transaction queries createTransactionQuery = db.prepare(` INSERT INTO transactions (nonce, identifier, verified, receivedAt, raw) @@ -134,6 +147,7 @@ export function saveMeter(meterData: MeterRecord): void { publicKey: meterData.publicKey, tokenId: meterData.tokenId, latestNonce: meterData.latestNonce, + devEui: meterData.devEui ?? null, }); } catch (err: any) { console.error("Failed to save meter:", err); @@ -151,6 +165,16 @@ export function getMeterByPublicKey(publicKey: string): MeterRecord | null { } } +export function getMeterByDevEui(devEui: string): MeterRecord | null { + try { + const result = getMeterByDevEuiQuery.get(devEui) as MeterRecord | undefined; + return result || null; + } catch (err: any) { + console.error("Failed to get meter by DevEui:", err); + return null; + } +} + export function getMeterByTokenId(tokenId: string): MeterRecord | null { try { const result = getMeterByTokenIdQuery.get(tokenId) as MeterRecord | undefined; @@ -199,15 +223,32 @@ export function updateMeterNonce(publicKey: string, nonce: number): boolean { } } +export function updateMeterDevEui(publicKey: string, devEui: string): boolean { + try { + const result = updateMeterDevEuiQuery.run(devEui, publicKey); + const updated = result.changes > 0; + if (!updated) { + console.log("Meter not found for DevEui update:", { publicKey }); + } + return updated; + } catch (err: any) { + console.error("Failed to update meter DevEui:", err); + return false; + } +} + // Transaction insertion function export function insertTransaction(transactionData: TransactionRecord): void { try { - const existingTransaction = getTransactionByNonceQuery.get(transactionData.nonce, transactionData.identifier) as TransactionRecord | undefined; + const existingTransaction = getTransactionByNonceQuery.get( + transactionData.nonce, + transactionData.identifier + ) as TransactionRecord | undefined; if (existingTransaction) { throw new Error(`Transaction with nonce ${transactionData.nonce} already exists`); } - + transactionData.verified = +Boolean(transactionData.verified) as 0 | 1; // Ensure verified is set createTransactionQuery.run(transactionData); } catch (err: any) { diff --git a/src/types.ts b/src/types.ts index 0662823..2023423 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,6 +1,7 @@ // Meter interface for database operations export interface MeterRecord { publicKey: string; + devEui: string | null; // Optional field for device EUI tokenId: number; latestNonce: number; // Optional field for tracking latest nonce } diff --git a/tests/store/sqlite.test.ts b/tests/store/sqlite.test.ts index 53f29ce..e2e0d13 100644 --- a/tests/store/sqlite.test.ts +++ b/tests/store/sqlite.test.ts @@ -10,6 +10,8 @@ import setupDatabase, { deleteVerifiedTransactionRecords, insertTransaction, deleteDatabase, + getMeterByDevEui, + updateMeterDevEui, } from "../../src/store/sqlite"; beforeEach(() => { @@ -28,11 +30,12 @@ it("should have no meters and transactions", () => { expect(transactions).toHaveLength(0); }); -it("should insert meter", () => { +it("should insert meter without devEui", () => { const meterData = { publicKey: "test_public_key", tokenId: 1, latestNonce: 0, + devEui: null, }; saveMeter(meterData); @@ -40,11 +43,26 @@ it("should insert meter", () => { expect(meters).toHaveLength(1); }); +it("should insert meter with devEui", () => { + const meterData = { + publicKey: "test_public_key", + tokenId: 1, + latestNonce: 0, + devEui: "test_dev_eui", + }; + saveMeter(meterData); + + const meters = getAllMeterRecords(); + expect(meters).toHaveLength(1); + expect(meters[0].devEui).toBe("test_dev_eui"); +}); + it("should get meter by public key", () => { const meterData = { publicKey: "test_public_key", tokenId: 1, latestNonce: 0, + devEui: "test_dev_eui", }; saveMeter(meterData); @@ -52,11 +70,26 @@ it("should get meter by public key", () => { expect(retrievedMeter).toEqual(meterData); }); +it("should get meter by device EUI", () => { + const meterData = { + publicKey: "test_public_key", + tokenId: 1, + latestNonce: 0, + devEui: "test_dev_eui", + }; + saveMeter(meterData); + + const retrievedMeter = getMeterByDevEui(meterData.devEui); + expect(retrievedMeter).toEqual(meterData); +}); + + it("should delete meter", () => { const meterData = { publicKey: "test_public_key", tokenId: 1, latestNonce: 0, + devEui: "test_dev_eui", }; saveMeter(meterData); const deleted = deleteMeterByPublicKey(meterData.publicKey); @@ -70,6 +103,7 @@ it("should update meter nonce", () => { publicKey: "test_public_key", tokenId: 1, latestNonce: 0, + devEui: "test_dev_eui", }; saveMeter(meterData); @@ -80,6 +114,22 @@ it("should update meter nonce", () => { expect(updatedMeter?.latestNonce).toBe(5); }); +it("should update meter devEui", () => { + const meterData = { + publicKey: "test_public_key", + tokenId: 1, + latestNonce: 0, + devEui: "test_dev_eui", + }; + saveMeter(meterData); + + const updated = updateMeterDevEui(meterData.publicKey, "new_dev_eui"); + expect(updated).toBe(true); + + const updatedMeter = getMeterByPublicKey(meterData.publicKey); + expect(updatedMeter?.devEui).toBe("new_dev_eui"); +}); + it("should insert transaction", () => { const transactionData = { nonce: 1,