Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions src/fetchTelegramMessages.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import { Api, TelegramClient } from "telegram";
import { trackApiKeyUsage } from './utils/redisUtils';
import { TelegramAccount } from './services/telegramAccountManager';

export type TelegramMessages = { id: string; content: string; channelId: string };

export async function fetchTelegramMessages(
client: TelegramClient,
channel: string
account: TelegramAccount
): Promise<TelegramMessages[]> {
const channel = account.credentials.TELEGRAM_TG_CHANNEL;
if (!channel) {
throw new Error("TELEGRAM_TG_CHANNEL environment variable is not set.");
throw new Error("TELEGRAM_TG_CHANNEL is not set in account credentials.");
}

if (process.env.DEBUG_TELEGRAM === '1') {
console.log(`Using Telegram account: ${account.accountId} for channel: ${channel}`);
}
const apiId = process.env.TELEGRAM_API_ID;

// Fetch channel entity to get the actual channel ID
let entity: Api.Channel;
Expand Down Expand Up @@ -52,21 +57,6 @@ export async function fetchTelegramMessages(
console.log("No messages property found in response:", messages);
}

// Track API usage after successful fetch
if (apiId) {
let accountId: string;
try {
const me = await client.getMe();
if (me && me.id) {
accountId = String(me.id);
} else {
throw new Error('Unable to determine Telegram account ID');
}
} catch (e) {
throw new Error('Unable to determine Telegram account ID');
}
await trackApiKeyUsage({ accountId, platform: 'telegram' });
}

return out;
}
98 changes: 65 additions & 33 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,40 @@ import cron from 'node-cron';
import { TelegramClient } from "telegram";
import { StringSession } from "telegram/sessions";
import { fetchTelegramMessages } from './fetchTelegramMessages';
import { getApiKeyUsage } from './utils/redisUtils';
import { telegramAccountManager, TelegramAccount } from './services/telegramAccountManager';

// Replace these with your values
const apiId = Number(process.env.TELEGRAM_API_ID);
const apiHash = process.env.TELEGRAM_API_HASH ?? "";
if (!Number.isFinite(apiId)) {
throw new Error("API_ID environment variable is missing or not a valid number.");
}
if (!apiHash) {
throw new Error("API_HASH environment variable is not set.");
}
const stringSession = new StringSession(process.env.TG_SESSION ?? ""); // use existing session if available
// Create a map to store clients for each account
const clientMap = new Map<string, TelegramClient>();

async function startTelegramCron() {
console.log("Starting Telegram client...");
async function createTelegramClient(account: TelegramAccount): Promise<TelegramClient> {
// Check if we already have a client for this account
if (clientMap.has(account.accountId)) {
return clientMap.get(account.accountId)!;
}
Comment on lines +14 to +16
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Reuse only live clients; evict stale ones

If a client was disconnected, you’ll return a dead client. Check connectivity before reuse.

-  if (clientMap.has(account.accountId)) {
-    return clientMap.get(account.accountId)!;
-  }
+  const cached = clientMap.get(account.accountId);
+  if (cached && (cached as any).connected !== false) {
+    return cached;
+  }
+  if (cached && (cached as any).connected === false) {
+    clientMap.delete(account.accountId);
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (clientMap.has(account.accountId)) {
return clientMap.get(account.accountId)!;
}
const cached = clientMap.get(account.accountId);
if (cached && (cached as any).connected !== false) {
return cached;
}
if (cached && (cached as any).connected === false) {
clientMap.delete(account.accountId);
}
🤖 Prompt for AI Agents
In src/index.ts around lines 14 to 16, the code returns a cached client without
verifying its connectivity; change the logic to check the client's live status
before reusing it (e.g., call the client's connection/ready/ping method or
inspect a connected/readyState property), and if the client is not connected
remove it from clientMap and proceed to create or fetch a new live client;
ensure the connectivity check is synchronous/awaited as needed and that evicted
clients are properly cleaned up (closed) before replacement.


console.log(`Creating Telegram client for account: ${account.accountId}`);

const apiId = Number(account.credentials.TELEGRAM_API_ID);
const apiHash = account.credentials.TELEGRAM_API_HASH;

if (!Number.isFinite(apiId)) {
throw new Error(`Invalid API_ID for account ${account.accountId}`);
}
if (!apiHash) {
throw new Error(`API_HASH not set for account ${account.accountId}`);
}

// Fetch per-account session string from Redis (not env)
const redisClient = (telegramAccountManager as any).redisClient;
await redisClient.connect?.();
const sessionKey = `telegram_session:${account.accountId}`;
let sessionStr = await redisClient.get(sessionKey);
const isInteractive = Boolean(process.stdin.isTTY);
if (!sessionStr && !isInteractive) {
throw new Error(`Missing session in Redis for ${account.accountId} (key: ${sessionKey}). Generate and store a session string before running cron.`);
}
sessionStr = sessionStr || "";
const stringSession = new StringSession(sessionStr);
const client = new TelegramClient(stringSession, apiId, apiHash, {
Comment on lines +36 to 41
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Non‑interactive runs may hang on prompts; prefer connect+health‑check

When isInteractive is false and a bad/expired session is loaded, start() will try to prompt and hang. Use connect() and a light auth check; only fall back to start() if interactive.

-  await client.start({
-    phoneNumber: async () => await input.text("Enter your phone number: "),
-    password: async () => await input.text("Enter 2FA password (if enabled): "),
-    phoneCode: async () => await input.text("Enter code you received: "),
-    onError: (err) => console.log(err),
-  });
+  if (sessionStr) {
+    await client.connect();
+    try {
+      await client.getMe(); // basic auth/health check
+    } catch (e) {
+      if (!isInteractive) {
+        throw new Error(`Session for ${account.accountId} is invalid/expired; re-auth interactively to refresh.`);
+      }
+      await client.start({
+        phoneNumber: async () => await input.text("Enter your phone number: "),
+        password: async () => await input.text("Enter 2FA password (if enabled): "),
+        phoneCode: async () => await input.text("Enter code you received: "),
+        onError: (err) => console.log(err),
+      });
+    }
+  } else {
+    await client.start({
+      phoneNumber: async () => await input.text("Enter your phone number: "),
+      password: async () => await input.text("Enter 2FA password (if enabled): "),
+      phoneCode: async () => await input.text("Enter code you received: "),
+      onError: (err) => console.log(err),
+    });
+  }

Also applies to: 45-50

connectionRetries: 5,
});
Expand All @@ -30,37 +49,50 @@ async function startTelegramCron() {
onError: (err) => console.log(err),
});

console.log("Logged in successfully!");
if (process.env.PRINT_TG_SESSION === "1") {
console.log("Your session string:", client.session.save());
} else {
console.log(
"Session created. Set PRINT_TG_SESSION=1 to print it explicitly."
);
console.log(`Logged in successfully for account: ${account.accountId}`);
const saved = client.session.save();
if (process.env.PRINT_TG_SESSION === "1" && isInteractive) {
// Emit an export-ready line deliberately, instead of dumping secrets in logs
console.log(`export ${sessionKey}="${saved}"`);
}

// Store the client for reuse
clientMap.set(account.accountId, client);

return client;
}

async function startTelegramCron() {
console.log("Starting Telegram account rotation system...");

// Run once at startup
try {
await fetchTelegramMessages(client, process.env.TELEGRAM_TG_CHANNEL!);
// Print Telegram API usage by accountId (not API_ID)
const me = await client.getMe();
const accountId = String(me.id);
const usage = await getApiKeyUsage({ accountId, platform: 'telegram' });
console.log('Telegram API usage:', {
total_requests: usage.total_requests,
last_request: usage.last_request,
account_id: usage.account_id
const account = await telegramAccountManager.getEarliestUsedAccount();
const client = await createTelegramClient(account);

await fetchTelegramMessages(client, account);

// Show usage statistics for all accounts
const allAccounts = await telegramAccountManager.getAllAccountsUsage();
console.log('All Telegram accounts usage:');
allAccounts.forEach((acc, index) => {
console.log(` Account ${index + 1} (${acc.accountId}):`);
console.log(` Total requests: ${acc.totalRequests}`);
console.log(` Last used: ${acc.lastUsed || 'Never'}`);
Comment on lines +73 to +81
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Rotation does not update usage — account will never advance

You never call markAccountAsUsed; LRU will keep choosing the same account.

   await fetchTelegramMessages(client, account);
+  await telegramAccountManager.markAccountAsUsed(account.accountId);
@@
   await fetchTelegramMessages(client, account);
-  console.log(`Fetched messages using account: ${account.accountId}`);
+  await telegramAccountManager.markAccountAsUsed(account.accountId);
+  console.log(`Fetched messages using account: ${account.accountId}`);

Also applies to: 92-94

🤖 Prompt for AI Agents
In src/index.ts around lines 71 to 79 (and similarly around 92 to 94), the code
prints per-account usage but never updates each account's usage state, so the
LRU rotation won't advance; call
telegramAccountManager.markAccountAsUsed(account.accountId) (or the appropriate
method) after selecting or displaying an account to update its
lastUsed/totalRequests fields and persist the change; ensure you await the
markAccountAsUsed call where necessary and handle any errors so the usage store
reflects the update immediately.

});
} catch (err) {
console.error("Startup Telegram fetch failed:", err);
}

// Schedule to run every 5 minutes (no overlap guard)
// Schedule to run every 5 minutes with account rotation
cron.schedule('*/5 * * * *', async () => {
console.log('Refetching Telegram messages...');
console.log('Refetching Telegram messages with account rotation...');
try {
await fetchTelegramMessages(client, process.env.TELEGRAM_TG_CHANNEL!);
// No duplicate print of Telegram API usage
const account = await telegramAccountManager.getEarliestUsedAccount();
const client = await createTelegramClient(account);

await fetchTelegramMessages(client, account);
console.log(`Fetched messages using account: ${account.accountId}`);
} catch (err) {
console.error('Scheduled Telegram fetch failed:', err);
}
Expand Down
69 changes: 69 additions & 0 deletions src/services/BaseAccountManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { createClient, RedisClientType } from 'redis';

export interface BaseAccount {
accountId: string;
credentials: Record<string, string>;
lastUsed?: string;
totalRequests?: number;
}

export abstract class BaseAccountManager<T extends BaseAccount> {
protected redisClient: RedisClientType;
protected isConnected = false;
protected abstract platform: string;
protected abstract accountKey: string;
protected abstract usageKeyPrefix: string;

constructor(redisUrl?: string) {
this.redisClient = createClient({
url: redisUrl || process.env.REDIS_URL,
});
this.redisClient.on('error', (err) => {
console.error(`Redis Client Error in ${this.platform}AccountManager:`, err);
});
}

protected async ensureConnected(): Promise<void> {
if (!this.isConnected) {
await this.redisClient.connect();
this.isConnected = true;
}
}

protected abstract fetchAllAccounts(): Promise<T[]>;

async getEarliestUsedAccount(): Promise<T> {
await this.ensureConnected();
const accounts = await this.fetchAllAccounts();
accounts.sort((a, b) => {
if (!a.lastUsed && !b.lastUsed) return 0;
if (!a.lastUsed) return -1;
if (!b.lastUsed) return 1;
return new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime();
});
for (const acc of accounts) {
const lockKey = `lock:${this.platform}:${acc.accountId}`;
const ok = await this.redisClient.set(lockKey, '1', { NX: true, PX: 15000 });
if (ok === 'OK') {
console.debug(`[${this.platform}AccountManager] Selected account=${acc.accountId} lastUsed=${acc.lastUsed ?? 'Never'} totalRequests=${acc.totalRequests ?? 0}`);
Comment on lines +46 to +48
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unsafe lock release; use token‑based delete (Redlock semantics)
Plain DEL can remove another process’s lock if TTL expired and was reacquired. Store a random token as the lock value and conditionally delete if the token matches.

+    private lockTokens = new Map<string, string>();
@@
-            const ok = await this.redisClient.set(lockKey, '1', { NX: true, PX: 15000 });
+            const token = crypto.randomUUID();
+            const ok = await this.redisClient.set(lockKey, token, { NX: true, PX: 15000 });
             if (ok === 'OK') {
+                this.lockTokens.set(acc.accountId, token);
                 console.debug(`[${this.platform}AccountManager] Selected account=${acc.accountId} lastUsed=${acc.lastUsed ?? 'Never'} totalRequests=${acc.totalRequests ?? 0}`);
                 return acc;
             }
@@
-        await this.trackApiKeyUsageLocal(accountId);
-        try { await this.redisClient.del(`lock:${this.platform}:${accountId}`); } catch { }
+        await this.trackApiKeyUsageLocal(accountId);
+        const lockKey = `lock:${this.platform}:${accountId}`;
+        const token = this.lockTokens.get(accountId) ?? '';
+        // Delete only if value matches our token
+        try {
+            await this.redisClient.eval(
+                'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end',
+                { keys: [lockKey], arguments: [token] }
+            );
+        } catch {}
+        this.lockTokens.delete(accountId);

Alternatively, use the redlock package for robustness.

npm i redlock

Also applies to: 54-57

🤖 Prompt for AI Agents
In src/services/BaseAccountManager.ts around lines 45-47 (and similarly at
54-57), the code uses a plain DEL to release a lock which can remove another
process’s lock after TTL expiry; change the lock to store a
cryptographically-random token as the value when acquiring (SET key token NX PX
...), and when releasing perform an atomic check-and-delete (compare current
value to token and delete only if equal) — implement this using a small Redis
EVAL Lua script or replace the custom locking with the redlock package (npm i
redlock) to get correct token-based release semantics.

return acc;
}
}
throw new Error(`No available ${this.platform} accounts to claim (all locked).`);
}

async markAccountAsUsed(accountId: string): Promise<void> {
await this.ensureConnected();
await this.trackApiKeyUsageLocal(accountId);
try { await this.redisClient.del(`lock:${this.platform}:${accountId}`); } catch { }
}

protected abstract trackApiKeyUsageLocal(accountId: string): Promise<void>;

async disconnect(): Promise<void> {
if (this.isConnected) {
await this.redisClient.quit();
this.isConnected = false;
}
}
}
106 changes: 106 additions & 0 deletions src/services/telegramAccountManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { BaseAccountManager, BaseAccount } from './BaseAccountManager';
import { decrypt } from '../lib/encryption';
import { getApiKeyUsage, trackApiKeyUsage } from '../utils/redisUtils';

export interface TelegramAccount extends BaseAccount {
accountId: string;
credentials: {
TELEGRAM_API_ID: string;
TELEGRAM_API_HASH: string;
TELEGRAM_TG_CHANNEL: string;
};
lastUsed?: string;
totalRequests?: number;
}

export class TelegramAccountManager extends BaseAccountManager<TelegramAccount> {
protected platform = 'telegram';
protected accountKey = 'telegram-accounts';
protected usageKeyPrefix = 'telegram_accounts';

constructor(redisUrl?: string) {
super(redisUrl);
}

/**
* Fetch all Telegram accounts from Redis and decrypt their credentials
*/
protected async fetchAllAccounts(): Promise<TelegramAccount[]> {
await this.ensureConnected();

const raw = await this.redisClient.get(this.accountKey);
if (!raw) {
throw new Error('No Telegram accounts found in Redis');
}

let encryptedAccounts: Record<string, string>[];
try {
encryptedAccounts = JSON.parse(raw);
} catch (e) {
throw new Error('Failed to parse Telegram accounts from Redis');
}

const accounts: TelegramAccount[] = [];

for (let i = 0; i < encryptedAccounts.length; i++) {
const encryptedAccount = encryptedAccounts[i];

try {
// Decrypt credentials
const credentials = {
TELEGRAM_API_ID: decrypt(encryptedAccount.TELEGRAM_API_ID),
TELEGRAM_API_HASH: decrypt(encryptedAccount.TELEGRAM_API_HASH),
TELEGRAM_TG_CHANNEL: decrypt(encryptedAccount.TELEGRAM_TG_CHANNEL),
};

// Generate account ID from API ID (for uniqueness)
const accountId = `telegram_${credentials.TELEGRAM_API_ID}`;

// Get usage statistics from Redis
const usage = await getApiKeyUsage({ accountId, platform: 'telegram' });

accounts.push({
accountId,
credentials,
lastUsed: usage.last_request || undefined,
totalRequests: usage.total_requests
});
} catch (e) {
console.warn(`Failed to decrypt Telegram account ${i + 1}:`, e);
continue;
}
}

if (accounts.length === 0) {
throw new Error('No valid Telegram accounts could be decrypted');
}

return accounts;
}


/**
* Local usage tracking for Telegram accounts
*/
protected async trackApiKeyUsageLocal(accountId: string): Promise<void> {
await trackApiKeyUsage({ accountId, platform: 'telegram' });
}

/**
* Get usage statistics for all accounts (no credentials)
*/
async getAllAccountsUsage(): Promise<Array<{ accountId: string; lastUsed?: string; totalRequests?: number }>> {
const accounts = await this.fetchAllAccounts();
return accounts.map(({ accountId, lastUsed, totalRequests }) => ({ accountId, lastUsed, totalRequests }));
}

/**
* Get all accounts with credentials (full info)
*/
async getAllAccountsWithCredentials(): Promise<TelegramAccount[]> {
return await this.fetchAllAccounts();
}
}

// Export singleton instance
export const telegramAccountManager = new TelegramAccountManager();
Loading