From 92b923980b513af623777f869dffc836eb17c7d8 Mon Sep 17 00:00:00 2001 From: Alison Hawk Date: Thu, 18 Sep 2025 11:17:42 -0600 Subject: [PATCH 1/4] feat: Implement Telegram and Twitter account rotation with Redis integration - Added TelegramAccountManager and TwitterAccountManager for managing account credentials and usage tracking. - Refactored fetchTelegramMessages and fetchHomeTimeline to utilize account rotation logic. - Introduced batch API key usage tracking for multiple accounts. - Created demo scripts for simulating account rotation for both Telegram and Twitter. - Updated moveEnvToRedis script to handle new environment variable mappings. - Enhanced error handling and logging for better debugging and monitoring. --- src/fetchTelegramMessages.ts | 24 ++-- src/index.ts | 82 ++++++++------ src/services/telegramAccountManager.ts | 145 ++++++++++++++++++++++++ src/services/twitterAccountManager.ts | 145 ++++++++++++++++++++++++ src/tests/rotationDemo.ts | 96 ++++++++++++++++ src/tests/telegramRotationDemo.ts | 99 +++++++++++++++++ src/tests/testRotation.ts | 145 ++++++++++++++++++++++++ src/tests/testTelegramRotation.ts | 147 +++++++++++++++++++++++++ src/twitterApi.ts | 76 ++++++------- src/utils/moveEnvToRedis.ts | 14 ++- src/utils/redisUtils.ts | 43 ++++++++ 11 files changed, 929 insertions(+), 87 deletions(-) create mode 100644 src/services/telegramAccountManager.ts create mode 100644 src/services/twitterAccountManager.ts create mode 100644 src/tests/rotationDemo.ts create mode 100644 src/tests/telegramRotationDemo.ts create mode 100644 src/tests/testRotation.ts create mode 100644 src/tests/testTelegramRotation.ts diff --git a/src/fetchTelegramMessages.ts b/src/fetchTelegramMessages.ts index 9e563d2..efd2d73 100644 --- a/src/fetchTelegramMessages.ts +++ b/src/fetchTelegramMessages.ts @@ -1,16 +1,19 @@ import { Api, TelegramClient } from "telegram"; import { trackApiKeyUsage } from './utils/redisUtils'; +import { TelegramAccount } from './services/telegramAccountManager'; export type TelegramMessages = { id: string; content: string; channelId: string }; export async function fetchTelegramMessages( client: TelegramClient, - channel: string + account: TelegramAccount ): Promise { + const channel = account.credentials.TELEGRAM_TG_CHANNEL; if (!channel) { - throw new Error("TELEGRAM_TG_CHANNEL environment variable is not set."); + throw new Error("TELEGRAM_TG_CHANNEL is not set in account credentials."); } - const apiId = process.env.TELEGRAM_API_ID; + + console.log(`Using Telegram account: ${account.accountId} for channel: ${channel}`); // Fetch channel entity to get the actual channel ID let entity: Api.Channel; @@ -53,20 +56,7 @@ export async function fetchTelegramMessages( } // Track API usage after successful fetch - if (apiId) { - let accountId: string; - try { - const me = await client.getMe(); - if (me && me.id) { - accountId = String(me.id); - } else { - throw new Error('Unable to determine Telegram account ID'); - } - } catch (e) { - throw new Error('Unable to determine Telegram account ID'); - } - await trackApiKeyUsage({ accountId, platform: 'telegram' }); - } + await trackApiKeyUsage({ accountId: account.accountId, platform: 'telegram' }); return out; } diff --git a/src/index.ts b/src/index.ts index da2448e..ef812ce 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,21 +4,30 @@ import cron from 'node-cron'; import { TelegramClient } from "telegram"; import { StringSession } from "telegram/sessions"; import { fetchTelegramMessages } from './fetchTelegramMessages'; -import { getApiKeyUsage } from './utils/redisUtils'; +import { telegramAccountManager, TelegramAccount } from './services/telegramAccountManager'; -// Replace these with your values -const apiId = Number(process.env.TELEGRAM_API_ID); -const apiHash = process.env.TELEGRAM_API_HASH ?? ""; -if (!Number.isFinite(apiId)) { - throw new Error("API_ID environment variable is missing or not a valid number."); -} -if (!apiHash) { - throw new Error("API_HASH environment variable is not set."); -} -const stringSession = new StringSession(process.env.TG_SESSION ?? ""); // use existing session if available +// Create a map to store clients for each account +const clientMap = new Map(); -async function startTelegramCron() { - console.log("Starting Telegram client..."); +async function createTelegramClient(account: TelegramAccount): Promise { + // Check if we already have a client for this account + if (clientMap.has(account.accountId)) { + return clientMap.get(account.accountId)!; + } + + console.log(`Creating Telegram client for account: ${account.accountId}`); + + const apiId = Number(account.credentials.TELEGRAM_API_ID); + const apiHash = account.credentials.TELEGRAM_API_HASH; + + if (!Number.isFinite(apiId)) { + throw new Error(`Invalid API_ID for account ${account.accountId}`); + } + if (!apiHash) { + throw new Error(`API_HASH not set for account ${account.accountId}`); + } + + const stringSession = new StringSession(process.env.TG_SESSION ?? ""); // use existing session if available const client = new TelegramClient(stringSession, apiId, apiHash, { connectionRetries: 5, }); @@ -30,37 +39,48 @@ async function startTelegramCron() { onError: (err) => console.log(err), }); - console.log("Logged in successfully!"); + console.log(`Logged in successfully for account: ${account.accountId}`); if (process.env.PRINT_TG_SESSION === "1") { console.log("Your session string:", client.session.save()); - } else { - console.log( - "Session created. Set PRINT_TG_SESSION=1 to print it explicitly." - ); } + // Store the client for reuse + clientMap.set(account.accountId, client); + + return client; +} + +async function startTelegramCron() { + console.log("Starting Telegram account rotation system..."); + // Run once at startup try { - await fetchTelegramMessages(client, process.env.TELEGRAM_TG_CHANNEL!); - // Print Telegram API usage by accountId (not API_ID) - const me = await client.getMe(); - const accountId = String(me.id); - const usage = await getApiKeyUsage({ accountId, platform: 'telegram' }); - console.log('Telegram API usage:', { - total_requests: usage.total_requests, - last_request: usage.last_request, - account_id: usage.account_id + const account = await telegramAccountManager.getEarliestUsedAccount(); + const client = await createTelegramClient(account); + + await fetchTelegramMessages(client, account); + + // Show usage statistics for all accounts + const allAccounts = await telegramAccountManager.getAllAccountsUsage(); + console.log('All Telegram accounts usage:'); + allAccounts.forEach((acc, index) => { + console.log(` Account ${index + 1} (${acc.accountId}):`); + console.log(` Total requests: ${acc.totalRequests}`); + console.log(` Last used: ${acc.lastUsed || 'Never'}`); }); } catch (err) { console.error("Startup Telegram fetch failed:", err); } - // Schedule to run every 5 minutes (no overlap guard) + // Schedule to run every 5 minutes with account rotation cron.schedule('*/5 * * * *', async () => { - console.log('Refetching Telegram messages...'); + console.log('Refetching Telegram messages with account rotation...'); try { - await fetchTelegramMessages(client, process.env.TELEGRAM_TG_CHANNEL!); - // No duplicate print of Telegram API usage + const account = await telegramAccountManager.getEarliestUsedAccount(); + const client = await createTelegramClient(account); + + await fetchTelegramMessages(client, account); + console.log(`Fetched messages using account: ${account.accountId}`); } catch (err) { console.error('Scheduled Telegram fetch failed:', err); } diff --git a/src/services/telegramAccountManager.ts b/src/services/telegramAccountManager.ts new file mode 100644 index 0000000..c84557c --- /dev/null +++ b/src/services/telegramAccountManager.ts @@ -0,0 +1,145 @@ +import { createClient } from 'redis'; +import { decrypt } from '../lib/encryption'; +import { getApiKeyUsage, trackApiKeyUsage } from '../utils/redisUtils'; + +export interface TelegramAccount { + accountId: string; + credentials: { + TELEGRAM_API_ID: string; + TELEGRAM_API_HASH: string; + TELEGRAM_TG_CHANNEL: string; + }; + lastUsed?: string; + totalRequests?: number; +} + +export class TelegramAccountManager { + private redisClient: ReturnType; + private isConnected = false; + + constructor(redisUrl?: string) { + this.redisClient = createClient({ + url: redisUrl || process.env.REDIS_URL || 'redis://localhost:6379' + }); + + this.redisClient.on('error', (err) => { + console.error('Redis Client Error in TelegramAccountManager:', err); + }); + } + + private async ensureConnected(): Promise { + if (!this.isConnected) { + await this.redisClient.connect(); + this.isConnected = true; + } + } + + /** + * Fetch all Telegram accounts from Redis and decrypt their credentials + */ + private async fetchAllAccounts(): Promise { + await this.ensureConnected(); + + const raw = await this.redisClient.get('telegram-accounts'); + if (!raw) { + throw new Error('No Telegram accounts found in Redis'); + } + + let encryptedAccounts: Record[]; + try { + encryptedAccounts = JSON.parse(raw); + } catch (e) { + throw new Error('Failed to parse Telegram accounts from Redis'); + } + + const accounts: TelegramAccount[] = []; + + for (let i = 0; i < encryptedAccounts.length; i++) { + const encryptedAccount = encryptedAccounts[i]; + + try { + // Decrypt credentials + const credentials = { + TELEGRAM_API_ID: decrypt(encryptedAccount.TELEGRAM_API_ID), + TELEGRAM_API_HASH: decrypt(encryptedAccount.TELEGRAM_API_HASH), + TELEGRAM_TG_CHANNEL: decrypt(encryptedAccount.TELEGRAM_TG_CHANNEL), + }; + + // Generate account ID from API ID (for uniqueness) + const accountId = `telegram_${credentials.TELEGRAM_API_ID}`; + + // Get usage statistics from Redis + const usage = await getApiKeyUsage({ accountId, platform: 'telegram' }); + + accounts.push({ + accountId, + credentials, + lastUsed: usage.last_request || undefined, + totalRequests: usage.total_requests + }); + } catch (e) { + console.warn(`Failed to decrypt Telegram account ${i + 1}:`, e); + continue; + } + } + + if (accounts.length === 0) { + throw new Error('No valid Telegram accounts could be decrypted'); + } + + return accounts; + } + + /** + * Get the Telegram account that was used earliest (least recently used) + */ + async getEarliestUsedAccount(): Promise { + const accounts = await this.fetchAllAccounts(); + + // Sort accounts by last_request timestamp (earliest first) + // Accounts with no last_request (never used) come first + accounts.sort((a, b) => { + if (!a.lastUsed && !b.lastUsed) return 0; + if (!a.lastUsed) return -1; // a comes first (never used) + if (!b.lastUsed) return 1; // b comes first (never used) + + // Both have lastUsed dates, compare them + return new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime(); + }); + + const selectedAccount = accounts[0]; + + console.log(`Selected Telegram account: ${selectedAccount.accountId}`); + console.log(`Last used: ${selectedAccount.lastUsed || 'Never'}`); + console.log(`Total requests: ${selectedAccount.totalRequests || 0}`); + + return selectedAccount; + } + + /** + * Mark an account as used (updates the tracking in Redis) + */ + async markAccountAsUsed(accountId: string): Promise { + await trackApiKeyUsage({ accountId, platform: 'telegram' }); + } + + /** + * Get usage statistics for all accounts + */ + async getAllAccountsUsage(): Promise { + return await this.fetchAllAccounts(); + } + + /** + * Close the Redis connection + */ + async disconnect(): Promise { + if (this.isConnected) { + await this.redisClient.quit(); + this.isConnected = false; + } + } +} + +// Export singleton instance +export const telegramAccountManager = new TelegramAccountManager(); \ No newline at end of file diff --git a/src/services/twitterAccountManager.ts b/src/services/twitterAccountManager.ts new file mode 100644 index 0000000..9627bb9 --- /dev/null +++ b/src/services/twitterAccountManager.ts @@ -0,0 +1,145 @@ +import { createClient } from 'redis'; +import { decrypt } from '../lib/encryption'; +import { getApiKeyUsage, trackApiKeyUsage } from '../utils/redisUtils'; + +export interface TwitterAccount { + accountId: string; + credentials: { + TWITTER_AUTH_TOKEN: string; + TWITTER_BEARER: string; + TWITTER_CSRF_TOKEN: string; + }; + lastUsed?: string; + totalRequests?: number; +} + +export class TwitterAccountManager { + private redisClient: ReturnType; + private isConnected = false; + + constructor(redisUrl?: string) { + this.redisClient = createClient({ + url: redisUrl || process.env.REDIS_URL + }); + + this.redisClient.on('error', (err) => { + console.error('Redis Client Error in TwitterAccountManager:', err); + }); + } + + private async ensureConnected(): Promise { + if (!this.isConnected) { + await this.redisClient.connect(); + this.isConnected = true; + } + } + + /** + * Fetch all Twitter accounts from Redis and decrypt their credentials + */ + private async fetchAllAccounts(): Promise { + await this.ensureConnected(); + + const raw = await this.redisClient.get('twitter-accounts'); + if (!raw) { + throw new Error('No Twitter accounts found in Redis'); + } + + let encryptedAccounts: Record[]; + try { + encryptedAccounts = JSON.parse(raw); + } catch (e) { + throw new Error('Failed to parse Twitter accounts from Redis'); + } + + const accounts: TwitterAccount[] = []; + + for (let i = 0; i < encryptedAccounts.length; i++) { + const encryptedAccount = encryptedAccounts[i]; + + try { + // Decrypt credentials + const credentials = { + TWITTER_AUTH_TOKEN: decrypt(encryptedAccount.TWITTER_AUTH_TOKEN), + TWITTER_BEARER: decrypt(encryptedAccount.TWITTER_BEARER), + TWITTER_CSRF_TOKEN: decrypt(encryptedAccount.TWITTER_CSRF_TOKEN), + }; + + // Generate account ID from auth token (first 8 chars for uniqueness) + const accountId = `twitter_${credentials.TWITTER_AUTH_TOKEN.substring(0, 8)}`; + + // Get usage statistics from Redis + const usage = await getApiKeyUsage({ accountId, platform: 'twitter' }); + + accounts.push({ + accountId, + credentials, + lastUsed: usage.last_request || undefined, + totalRequests: usage.total_requests + }); + } catch (e) { + console.warn(`Failed to decrypt account ${i + 1}:`, e); + continue; + } + } + + if (accounts.length === 0) { + throw new Error('No valid Twitter accounts could be decrypted'); + } + + return accounts; + } + + /** + * Get the Twitter account that was used earliest (least recently used) + */ + async getEarliestUsedAccount(): Promise { + const accounts = await this.fetchAllAccounts(); + + // Sort accounts by last_request timestamp (earliest first) + // Accounts with no last_request (never used) come first + accounts.sort((a, b) => { + if (!a.lastUsed && !b.lastUsed) return 0; + if (!a.lastUsed) return -1; // a comes first (never used) + if (!b.lastUsed) return 1; // b comes first (never used) + + // Both have lastUsed dates, compare them + return new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime(); + }); + + const selectedAccount = accounts[0]; + + console.log(`Selected Twitter account: ${selectedAccount.accountId}`); + console.log(`Last used: ${selectedAccount.lastUsed || 'Never'}`); + console.log(`Total requests: ${selectedAccount.totalRequests || 0}`); + + return selectedAccount; + } + + /** + * Mark an account as used (updates the tracking in Redis) + */ + async markAccountAsUsed(accountId: string): Promise { + await trackApiKeyUsage({ accountId, platform: 'twitter' }); + } + + /** + * Get usage statistics for all accounts + */ + async getAllAccountsUsage(): Promise { + return await this.fetchAllAccounts(); + } + + /** + * Close the Redis connection + */ + async disconnect(): Promise { + if (this.isConnected) { + await this.redisClient.quit(); + this.isConnected = false; + } + } +} + +// Export singleton instance +export const twitterAccountManager = new TwitterAccountManager(); \ No newline at end of file diff --git a/src/tests/rotationDemo.ts b/src/tests/rotationDemo.ts new file mode 100644 index 0000000..4d56564 --- /dev/null +++ b/src/tests/rotationDemo.ts @@ -0,0 +1,96 @@ +#!/usr/bin/env node +/** + * Demo script showing Twitter account rotation in action + * + * This simulates the 5-minute rotation behavior by: + * 1. Getting the earliest used account + * 2. Using it for a "fetch operation" (simulated) + * 3. Marking it as used + * 4. Repeating to show rotation + */ + +import { twitterAccountManager } from '../services/twitterAccountManager'; +import 'dotenv/config'; + +async function simulateTwitterFetch() { + console.log('🐦 Twitter Account Rotation Demo'); + console.log('================================\n'); + + try { + // Show initial state + console.log('šŸ“Š Initial account usage state:'); + const initialAccounts = await twitterAccountManager.getAllAccountsUsage(); + initialAccounts.forEach((account, index) => { + console.log(` ${index + 1}. ${account.accountId.slice(0, 20)}...`); + console.log(` Last used: ${account.lastUsed || 'Never'}`); + console.log(` Total requests: ${account.totalRequests || 0}`); + }); + + console.log('\nšŸ”„ Simulating 5-minute rotation cycles...\n'); + + // Simulate 5 fetch cycles (representing 5-minute intervals) + for (let cycle = 1; cycle <= 5; cycle++) { + console.log(`--- Cycle ${cycle} (${cycle * 5} minutes) ---`); + + // Get the account that should be used (earliest used) + const selectedAccount = await twitterAccountManager.getEarliestUsedAccount(); + + console.log(`šŸŽÆ Selected: ${selectedAccount.accountId.slice(0, 20)}...`); + console.log(` Last used: ${selectedAccount.lastUsed || 'Never'}`); + console.log(` Total requests: ${selectedAccount.totalRequests || 0}`); + + // Simulate using the account for Twitter API calls + console.log(' šŸ“” Simulating Twitter API fetch...'); + + // Mark the account as used (this updates the timestamp) + await twitterAccountManager.markAccountAsUsed(selectedAccount.accountId); + + console.log(' āœ… Account marked as used\n'); + + // Wait a moment to ensure timestamp differences + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + console.log('šŸ“ˆ Final account usage state:'); + const finalAccounts = await twitterAccountManager.getAllAccountsUsage(); + + // Sort by last used to show the rotation order + finalAccounts.sort((a, b) => { + if (!a.lastUsed && !b.lastUsed) return 0; + if (!a.lastUsed) return -1; + if (!b.lastUsed) return 1; + return new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime(); + }); + + finalAccounts.forEach((account, index) => { + const isNext = index === 0; + const prefix = isNext ? 'šŸ‘‰' : ' '; + + console.log(`${prefix} ${account.accountId.slice(0, 20)}...`); + console.log(` Last used: ${account.lastUsed || 'Never'}`); + console.log(` Total requests: ${account.totalRequests || 0}`); + + if (isNext) { + console.log(` ā­ļø Will be used next`); + } + }); + + console.log('\n✨ Demo completed! The system will automatically rotate accounts every 5 minutes.'); + console.log(' The account with the earliest "last_request" timestamp gets selected next.'); + + } catch (error) { + console.error('āŒ Demo failed:', error); + + if (error instanceof Error && error.message.includes('No Twitter accounts found')) { + console.log('\nšŸ’” To set up accounts:'); + console.log(' 1. Add Twitter credentials to your .env file'); + console.log(' 2. Run: npm run move-env-to-redis'); + console.log(' 3. Run this demo again'); + } + } finally { + await twitterAccountManager.disconnect(); + } +} + +// Run the demo +simulateTwitterFetch().catch(console.error); \ No newline at end of file diff --git a/src/tests/telegramRotationDemo.ts b/src/tests/telegramRotationDemo.ts new file mode 100644 index 0000000..b79f819 --- /dev/null +++ b/src/tests/telegramRotationDemo.ts @@ -0,0 +1,99 @@ +#!/usr/bin/env node +/** + * Demo script showing Telegram account rotation in action + * + * This simulates the 5-minute rotation behavior by: + * 1. Getting the earliest used Telegram account + * 2. Using it for a "fetch operation" (simulated) + * 3. Marking it as used + * 4. Repeating to show rotation + */ + +import 'dotenv/config'; +import { telegramAccountManager } from '../services/telegramAccountManager'; + +async function simulateTelegramFetch() { + console.log('šŸ“± Telegram Account Rotation Demo'); + console.log('=================================\n'); + + try { + // Show initial state + console.log('šŸ“Š Initial account usage state:'); + const initialAccounts = await telegramAccountManager.getAllAccountsUsage(); + initialAccounts.forEach((account, index) => { + console.log(` ${index + 1}. ${account.accountId}`); + console.log(` Last used: ${account.lastUsed || 'Never'}`); + console.log(` Total requests: ${account.totalRequests || 0}`); + console.log(` Channel: ${account.credentials.TELEGRAM_TG_CHANNEL}`); + }); + + console.log('\nšŸ”„ Simulating 5-minute rotation cycles...\n'); + + // Simulate 5 fetch cycles (representing 5-minute intervals) + for (let cycle = 1; cycle <= 5; cycle++) { + console.log(`--- Cycle ${cycle} (${cycle * 5} minutes) ---`); + + // Get the account that should be used (earliest used) + const selectedAccount = await telegramAccountManager.getEarliestUsedAccount(); + + console.log(`šŸŽÆ Selected: ${selectedAccount.accountId}`); + console.log(` Last used: ${selectedAccount.lastUsed || 'Never'}`); + console.log(` Total requests: ${selectedAccount.totalRequests || 0}`); + console.log(` Channel: ${selectedAccount.credentials.TELEGRAM_TG_CHANNEL}`); + + // Simulate using the account for Telegram API calls + console.log(' šŸ“” Simulating Telegram API fetch...'); + + // Mark the account as used (this updates the timestamp) + await telegramAccountManager.markAccountAsUsed(selectedAccount.accountId); + + console.log(' āœ… Account marked as used\n'); + + // Wait a moment to ensure timestamp differences + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + console.log('šŸ“ˆ Final account usage state:'); + const finalAccounts = await telegramAccountManager.getAllAccountsUsage(); + + // Sort by last used to show the rotation order + finalAccounts.sort((a, b) => { + if (!a.lastUsed && !b.lastUsed) return 0; + if (!a.lastUsed) return -1; + if (!b.lastUsed) return 1; + return new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime(); + }); + + finalAccounts.forEach((account, index) => { + const isNext = index === 0; + const prefix = isNext ? 'šŸ‘‰' : ' '; + + console.log(`${prefix} ${account.accountId}`); + console.log(` Last used: ${account.lastUsed || 'Never'}`); + console.log(` Total requests: ${account.totalRequests || 0}`); + console.log(` Channel: ${account.credentials.TELEGRAM_TG_CHANNEL}`); + + if (isNext) { + console.log(` ā­ļø Will be used next`); + } + }); + + console.log('\n✨ Demo completed! The system will automatically rotate Telegram accounts every 5 minutes.'); + console.log(' The account with the earliest "last_request" timestamp gets selected next.'); + + } catch (error) { + console.error('āŒ Demo failed:', error); + + if (error instanceof Error && error.message.includes('No Telegram accounts found')) { + console.log('\nšŸ’” To set up Telegram accounts:'); + console.log(' 1. Add Telegram credentials to your .env file'); + console.log(' 2. Run: npm run move-env-to-redis'); + console.log(' 3. Run this demo again'); + } + } finally { + await telegramAccountManager.disconnect(); + } +} + +// Run the demo +simulateTelegramFetch().catch(console.error); \ No newline at end of file diff --git a/src/tests/testRotation.ts b/src/tests/testRotation.ts new file mode 100644 index 0000000..b054a58 --- /dev/null +++ b/src/tests/testRotation.ts @@ -0,0 +1,145 @@ +#!/usr/bin/env node +/** + * Test script for Twitter account rotation system + * + * This script tests: + * 1. Fetching accounts from Redis + * 2. Account rotation logic (earliest used first) + * 3. Usage tracking + * 4. Error handling + */ + +import { twitterAccountManager, TwitterAccount } from '../services/twitterAccountManager'; +import { createClient } from 'redis'; +import 'dotenv/config'; + +async function testAccountRotation() { + console.log('šŸ” Testing Twitter Account Rotation System\n'); + + try { + // Test 1: Get all accounts and show their usage + console.log('šŸ“Š Test 1: Fetching all Twitter accounts...'); + const allAccounts = await twitterAccountManager.getAllAccountsUsage(); + + if (allAccounts.length === 0) { + console.log('āŒ No Twitter accounts found in Redis'); + console.log(' Make sure you have run the moveEnvToRedis script first'); + return; + } + + console.log(`āœ… Found ${allAccounts.length} Twitter accounts:`); + allAccounts.forEach((account, index) => { + console.log(` Account ${index + 1}: ${account.accountId}`); + console.log(` Last used: ${account.lastUsed || 'Never'}`); + console.log(` Total requests: ${account.totalRequests || 0}`); + }); + + console.log('\n'); + + // Test 2: Get earliest used account multiple times + console.log('šŸ”„ Test 2: Testing account rotation logic...'); + + for (let i = 1; i <= 3; i++) { + console.log(`\nIteration ${i}:`); + + const earliestAccount = await twitterAccountManager.getEarliestUsedAccount(); + console.log(` Selected account: ${earliestAccount.accountId}`); + console.log(` Last used: ${earliestAccount.lastUsed || 'Never'}`); + + // Mark the account as used + console.log(` Marking account as used...`); + await twitterAccountManager.markAccountAsUsed(earliestAccount.accountId); + + // Wait a moment for the timestamp to change + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + console.log('\n'); + + // Test 3: Show final state + console.log('šŸ“ˆ Test 3: Final usage state after rotation...'); + const finalAccounts = await twitterAccountManager.getAllAccountsUsage(); + finalAccounts.forEach((account, index) => { + console.log(` Account ${index + 1}: ${account.accountId}`); + console.log(` Last used: ${account.lastUsed || 'Never'}`); + console.log(` Total requests: ${account.totalRequests || 0}`); + }); + + console.log('\n'); + + // Test 4: Verify rotation order + console.log('šŸŽÆ Test 4: Verifying rotation order...'); + const nextAccount = await twitterAccountManager.getEarliestUsedAccount(); + console.log(` Next account to be used: ${nextAccount.accountId}`); + console.log(` Last used: ${nextAccount.lastUsed || 'Never'}`); + + console.log('\nāœ… All tests completed successfully!'); + + } catch (error) { + console.error('āŒ Test failed:', error); + } finally { + await twitterAccountManager.disconnect(); + } +} + +async function testRedisConnection() { + console.log('šŸ”— Testing Redis connection...'); + + const redisClient = createClient({ + url: process.env.REDIS_URL || 'redis://localhost:6379' + }); + + try { + await redisClient.connect(); + + // Check if twitter-accounts key exists + const twitterAccounts = await redisClient.get('twitter-accounts'); + if (!twitterAccounts) { + console.log('āš ļø No twitter-accounts found in Redis'); + console.log(' Run this first: npm run move-env-to-redis'); + return false; + } + + // Try to parse the accounts + const accounts = JSON.parse(twitterAccounts); + console.log(`āœ… Found ${accounts.length} encrypted Twitter accounts in Redis`); + + return true; + } catch (error) { + console.error('āŒ Redis connection failed:', error); + return false; + } finally { + await redisClient.quit(); + } +} + +async function main() { + console.log('Twitter Account Rotation Test Suite'); + console.log('====================================\n'); + + // Check Redis connection and data first + const redisOk = await testRedisConnection(); + if (!redisOk) { + console.log('\nāŒ Please fix Redis issues before continuing'); + process.exit(1); + } + + // Check encryption key + if (!process.env.ENCRYPTION_KEY) { + console.log('āŒ ENCRYPTION_KEY environment variable not set'); + process.exit(1); + } + + console.log('\n'); + + // Run rotation tests + await testAccountRotation(); +} + +// Export for use in other scripts +export { testAccountRotation, testRedisConnection }; + +// Run if executed directly +if (require.main === module) { + main().catch(console.error); +} \ No newline at end of file diff --git a/src/tests/testTelegramRotation.ts b/src/tests/testTelegramRotation.ts new file mode 100644 index 0000000..6fc77e0 --- /dev/null +++ b/src/tests/testTelegramRotation.ts @@ -0,0 +1,147 @@ +#!/usr/bin/env node +/** + * Test script for Telegram account rotation system + * + * This script tests: + * 1. Fetching Telegram accounts from Redis + * 2. Account rotation logic (earliest used first) + * 3. Usage tracking + * 4. Error handling + */ + +import 'dotenv/config'; +import { telegramAccountManager, TelegramAccount } from '../services/telegramAccountManager'; +import { createClient } from 'redis'; + +async function testTelegramAccountRotation() { + console.log('šŸ“± Testing Telegram Account Rotation System\n'); + + try { + // Test 1: Get all accounts and show their usage + console.log('šŸ“Š Test 1: Fetching all Telegram accounts...'); + const allAccounts = await telegramAccountManager.getAllAccountsUsage(); + + if (allAccounts.length === 0) { + console.log('āŒ No Telegram accounts found in Redis'); + console.log(' Make sure you have run the moveEnvToRedis script first'); + return; + } + + console.log(`āœ… Found ${allAccounts.length} Telegram accounts:`); + allAccounts.forEach((account, index) => { + console.log(` Account ${index + 1}: ${account.accountId}`); + console.log(` Last used: ${account.lastUsed || 'Never'}`); + console.log(` Total requests: ${account.totalRequests || 0}`); + console.log(` Channel: ${account.credentials.TELEGRAM_TG_CHANNEL}`); + }); + + console.log('\n'); + + // Test 2: Get earliest used account multiple times + console.log('šŸ”„ Test 2: Testing account rotation logic...'); + + for (let i = 1; i <= 3; i++) { + console.log(`\nIteration ${i}:`); + + const earliestAccount = await telegramAccountManager.getEarliestUsedAccount(); + console.log(` Selected account: ${earliestAccount.accountId}`); + console.log(` Last used: ${earliestAccount.lastUsed || 'Never'}`); + console.log(` Channel: ${earliestAccount.credentials.TELEGRAM_TG_CHANNEL}`); + + // Mark the account as used + console.log(` Marking account as used...`); + await telegramAccountManager.markAccountAsUsed(earliestAccount.accountId); + + // Wait a moment for the timestamp to change + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + console.log('\n'); + + // Test 3: Show final state + console.log('šŸ“ˆ Test 3: Final usage state after rotation...'); + const finalAccounts = await telegramAccountManager.getAllAccountsUsage(); + finalAccounts.forEach((account, index) => { + console.log(` Account ${index + 1}: ${account.accountId}`); + console.log(` Last used: ${account.lastUsed || 'Never'}`); + console.log(` Total requests: ${account.totalRequests || 0}`); + }); + + console.log('\n'); + + // Test 4: Verify rotation order + console.log('šŸŽÆ Test 4: Verifying rotation order...'); + const nextAccount = await telegramAccountManager.getEarliestUsedAccount(); + console.log(` Next account to be used: ${nextAccount.accountId}`); + console.log(` Last used: ${nextAccount.lastUsed || 'Never'}`); + + console.log('\nāœ… All Telegram tests completed successfully!'); + + } catch (error) { + console.error('āŒ Test failed:', error); + } finally { + await telegramAccountManager.disconnect(); + } +} + +async function testTelegramRedisConnection() { + console.log('šŸ”— Testing Redis connection for Telegram accounts...'); + + const redisClient = createClient({ + url: process.env.REDIS_URL || 'redis://localhost:6379' + }); + + try { + await redisClient.connect(); + + // Check if telegram-accounts key exists + const telegramAccounts = await redisClient.get('telegram-accounts'); + if (!telegramAccounts) { + console.log('āš ļø No telegram-accounts found in Redis'); + console.log(' Run this first: npm run move-env-to-redis'); + return false; + } + + // Try to parse the accounts + const accounts = JSON.parse(telegramAccounts); + console.log(`āœ… Found ${accounts.length} encrypted Telegram accounts in Redis`); + + return true; + } catch (error) { + console.error('āŒ Redis connection failed:', error); + return false; + } finally { + await redisClient.quit(); + } +} + +async function main() { + console.log('Telegram Account Rotation Test Suite'); + console.log('=====================================\n'); + + // Check Redis connection and data first + const redisOk = await testTelegramRedisConnection(); + if (!redisOk) { + console.log('\nāŒ Please fix Redis issues before continuing'); + process.exit(1); + } + + // Check encryption key + if (!process.env.ENCRYPTION_KEY) { + console.log('āŒ ENCRYPTION_KEY environment variable not set'); + process.exit(1); + } + + console.log('\n'); + + // Run rotation tests + await testTelegramAccountRotation(); +} + +// Export for use in other scripts +export { testTelegramAccountRotation, testTelegramRedisConnection }; + +// Run if executed directly +if (require.main === module) { + main().catch(console.error); +} \ No newline at end of file diff --git a/src/twitterApi.ts b/src/twitterApi.ts index 3537ffe..a8bd177 100644 --- a/src/twitterApi.ts +++ b/src/twitterApi.ts @@ -1,17 +1,16 @@ import dotenv from 'dotenv'; import cron from 'node-cron'; -import { getApiKeyUsage } from './utils/redisUtils'; -import { trackApiKeyUsage } from './utils/redisUtils'; -const AUTH_TOKEN = process.env.TWITTER_AUTH_TOKEN; +import { twitterAccountManager, TwitterAccount } from './services/twitterAccountManager'; + dotenv.config(); -async function fetchViewerAccount(): Promise<{ screenName: string; userId: string } | null> { +async function fetchViewerAccount(account: TwitterAccount): Promise<{ screenName: string; userId: string } | null> { const url = "https://x.com/i/api/graphql/jMaTSZ5dqXctUg5f97R6xw/Viewer"; const headers = { - "authorization": `Bearer ${process.env.TWITTER_BEARER}`, - "x-csrf-token": process.env.TWITTER_CSRF_TOKEN as string, - "cookie": `auth_token=${process.env.TWITTER_AUTH_TOKEN}; ct0=${process.env.TWITTER_CSRF_TOKEN}`, + "authorization": `Bearer ${account.credentials.TWITTER_BEARER}`, + "x-csrf-token": account.credentials.TWITTER_CSRF_TOKEN, + "cookie": `auth_token=${account.credentials.TWITTER_AUTH_TOKEN}; ct0=${account.credentials.TWITTER_CSRF_TOKEN}`, }; const res = await fetch(url, { method: "GET", headers }); @@ -31,25 +30,24 @@ async function fetchViewerAccount(): Promise<{ screenName: string; userId: strin } export async function fetchHomeTimeline( - seenTweetIds: string[] = [] + seenTweetIds: string[] = [], + providedAccount?: TwitterAccount ): Promise> { const queryId = "wEpbv0WrfwV6y2Wlf0fxBQ"; const url = `https://x.com/i/api/graphql/${queryId}/HomeTimeline`; - // Check required environment variables - const requiredTokens = ['TWITTER_BEARER', 'TWITTER_CSRF_TOKEN', 'TWITTER_AUTH_TOKEN']; - const missing = requiredTokens.filter(k => !process.env[k]); - if (missing.length) { - throw new Error(`Missing required tokens: ${missing.join(', ')}`); - } + // Get the account to use (either provided or fetch the earliest used one) + const account = providedAccount || await twitterAccountManager.getEarliestUsedAccount(); - // Setup headers with cookies - const cookie = `auth_token=${process.env.TWITTER_AUTH_TOKEN};ct0=${process.env.TWITTER_CSRF_TOKEN}`; + console.log(`Using Twitter account: ${account.accountId} for timeline fetch`); + + // Setup headers with the account's credentials + const cookie = `auth_token=${account.credentials.TWITTER_AUTH_TOKEN};ct0=${account.credentials.TWITTER_CSRF_TOKEN}`; const headers = { - "authorization": `Bearer ${process.env.TWITTER_BEARER}`, + "authorization": `Bearer ${account.credentials.TWITTER_BEARER}`, "content-type": "application/json", - "x-csrf-token": `${process.env.TWITTER_CSRF_TOKEN}`, + "x-csrf-token": account.credentials.TWITTER_CSRF_TOKEN, "cookie": cookie, }; @@ -154,29 +152,33 @@ export async function fetchHomeTimeline( } // Track API usage after successful fetch - if (process.env.TWITTER_ID) { - const accountId = process.env.TWITTER_ID; - console.log("Authenticated account:", accountId); - await trackApiKeyUsage({ accountId, platform: 'twitter' }); - } + await twitterAccountManager.markAccountAsUsed(account.accountId); return tweets; } // Test runner - when file is executed directly async function main() { - console.log('Starting fetchHomeTimeline...'); + console.log('Starting fetchHomeTimeline with account rotation...'); try { - const data = await fetchHomeTimeline(); - - const viewer = await fetchViewerAccount(); - const accountId = viewer?.userId ?? process.env.TWITTER_ACCOUNT_ID; - if (!accountId) throw new Error('Missing TWITTER_ACCOUNT_ID and Viewer lookup failed.'); - const usage = await getApiKeyUsage({ accountId, platform: 'twitter' }); - console.log('Twitter API usage:', { - total_requests: usage.total_requests, - last_request: usage.last_request, - account_id: usage.account_id + // Get the earliest used account + const account = await twitterAccountManager.getEarliestUsedAccount(); + + // Fetch timeline data + const data = await fetchHomeTimeline([], account); + console.log(`Fetched ${data.length} tweets using account: ${account.accountId}`); + + // Get viewer info for the account + const viewer = await fetchViewerAccount(account); + console.log('Account info:', viewer); + + // Show usage statistics for all accounts + const allAccounts = await twitterAccountManager.getAllAccountsUsage(); + console.log('All Twitter accounts usage:'); + allAccounts.forEach((acc, index) => { + console.log(` Account ${index + 1} (${acc.accountId}):`); + console.log(` Total requests: ${acc.totalRequests}`); + console.log(` Last used: ${acc.lastUsed || 'Never'}`); }); } catch (err) { console.error('fetchHomeTimeline failed:', err instanceof Error ? err.message : err); @@ -187,12 +189,12 @@ async function main() { // Run once at startup main(); -// Schedule to run every 5 minutes +// Schedule to run every 5 minutes - automatically rotates to earliest used account cron.schedule('*/5 * * * *', async () => { - console.log('Refetching Twitter timeline...'); + console.log('Refetching Twitter timeline with account rotation...'); try { const timeline = await fetchHomeTimeline(); - console.log('Fetched timeline:', timeline); + console.log(`Fetched ${timeline.length} tweets`); } catch (err) { console.error('Scheduled Twitter timeline fetch failed:', err); } diff --git a/src/utils/moveEnvToRedis.ts b/src/utils/moveEnvToRedis.ts index c396972..dd2187d 100644 --- a/src/utils/moveEnvToRedis.ts +++ b/src/utils/moveEnvToRedis.ts @@ -2,6 +2,7 @@ import { encrypt } from '../lib/encryption'; import { createClient } from 'redis'; import fs from 'fs'; import path from 'path'; +import 'dotenv/config'; const redisClient = createClient({ url: process.env.REDIS_URL }); @@ -21,7 +22,14 @@ async function moveEnvToRedis() { await ensureRedisConnected(); // Define which keys belong to which service const twitterKeys = ['TWITTER_AUTH_TOKEN', 'TWITTER_BEARER', 'TWITTER_CSRF_TOKEN']; - const telegramKeys = ['TELEGRAM_API_ID', 'TELEGRAM_API_HASH', 'TELEGRAM_TG_CHANNEL']; + const telegramKeys = ['API_ID', 'API_HASH', 'TG_CHANNEL']; // Map to original env names + + // Map original env names to expected names + const telegramKeyMap = { + 'API_ID': 'TELEGRAM_API_ID', + 'API_HASH': 'TELEGRAM_API_HASH', + 'TG_CHANNEL': 'TELEGRAM_TG_CHANNEL' + }; // Encrypt each value individually and store as an object const twitterAccount: Record = {}; @@ -32,7 +40,9 @@ async function moveEnvToRedis() { if (twitterKeys.includes(key)) { twitterAccount[key] = encrypt(value); } else if (telegramKeys.includes(key)) { - telegramAccount[key] = encrypt(value); + // Map the original key to the expected key name + const mappedKey = telegramKeyMap[key as keyof typeof telegramKeyMap]; + telegramAccount[mappedKey] = encrypt(value); } else { otherVars[key] = encrypt(value); } diff --git a/src/utils/redisUtils.ts b/src/utils/redisUtils.ts index fd38d19..5b51f37 100644 --- a/src/utils/redisUtils.ts +++ b/src/utils/redisUtils.ts @@ -87,4 +87,47 @@ export async function getApiKeyUsage(data: dataType): Promise<{ total_requests: return result; } +/** + * Get API usage stats for multiple accounts in batch + * @param accountIds Array of account IDs to query + * @param platform The platform ('telegram' or 'twitter') + * @returns Array of usage objects with account IDs + */ +export async function getBatchApiKeyUsage( + accountIds: string[], + platform: 'telegram' | 'twitter' +): Promise> { + if (platform !== 'twitter' && platform !== 'telegram') { + throw new Error('getBatchApiKeyUsage: platform must be "twitter" or "telegram"'); + } + + const results: Array<{ accountId: string; total_requests: number; last_request: string | null }> = []; + + try { + await ensureRedisConnected(); + + for (const accountId of accountIds) { + if (!accountId?.trim()) continue; + + let key: string; + if (platform === 'twitter') { + key = `twitter_accounts:${accountId}`; + } else { + key = `telegram_accounts:${accountId}`; + } + + const data = await redisClient.hGetAll(key); + results.push({ + accountId, + total_requests: data.total_requests ? parseInt(data.total_requests) : 0, + last_request: data.last_request ? data.last_request : null, + }); + } + } catch (err) { + console.error('getBatchApiKeyUsage: Redis operation failed:', err); + } + + return results; +} + From 9e855596ebe321c30fedf56b0925f5f025effc8d Mon Sep 17 00:00:00 2001 From: Alison Hawk Date: Thu, 18 Sep 2025 11:49:01 -0600 Subject: [PATCH 2/4] feat: Enhance Telegram and Twitter account management with improved session handling and usage tracking --- src/fetchTelegramMessages.ts | 6 +-- src/index.ts | 16 ++++++-- src/services/telegramAccountManager.ts | 23 +++++++++-- src/services/twitterAccountManager.ts | 57 +++++++++++++++++++------- src/tests/telegramRotationDemo.ts | 4 +- src/tests/testTelegramRotation.ts | 4 +- 6 files changed, 83 insertions(+), 27 deletions(-) diff --git a/src/fetchTelegramMessages.ts b/src/fetchTelegramMessages.ts index efd2d73..ab11bdb 100644 --- a/src/fetchTelegramMessages.ts +++ b/src/fetchTelegramMessages.ts @@ -13,7 +13,9 @@ export async function fetchTelegramMessages( throw new Error("TELEGRAM_TG_CHANNEL is not set in account credentials."); } - console.log(`Using Telegram account: ${account.accountId} for channel: ${channel}`); + if (process.env.DEBUG_TELEGRAM === '1') { + console.log(`Using Telegram account: ${account.accountId} for channel: ${channel}`); + } // Fetch channel entity to get the actual channel ID let entity: Api.Channel; @@ -55,8 +57,6 @@ export async function fetchTelegramMessages( console.log("No messages property found in response:", messages); } - // Track API usage after successful fetch - await trackApiKeyUsage({ accountId: account.accountId, platform: 'telegram' }); return out; } diff --git a/src/index.ts b/src/index.ts index ef812ce..241f48d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -27,7 +27,15 @@ async function createTelegramClient(account: TelegramAccount): Promise { + await this.trackApiKeyUsageLocal(accountId); + // Best-effort unlock; lock has TTL as a safety net + try { await this.redisClient.del(`lock:telegram:${accountId}`); } catch { } + } + + /** + * Local usage tracking for Telegram accounts + */ + private async trackApiKeyUsageLocal(accountId: string): Promise { await trackApiKeyUsage({ accountId, platform: 'telegram' }); } /** - * Get usage statistics for all accounts + * Get usage statistics for all accounts (no credentials) + */ + async getAllAccountsUsage(): Promise> { + const accounts = await this.fetchAllAccounts(); + return accounts.map(({ accountId, lastUsed, totalRequests }) => ({ accountId, lastUsed, totalRequests })); + } + + /** + * Get all accounts with credentials (full info) */ - async getAllAccountsUsage(): Promise { + async getAllAccountsWithCredentials(): Promise { return await this.fetchAllAccounts(); } diff --git a/src/services/twitterAccountManager.ts b/src/services/twitterAccountManager.ts index 9627bb9..d994105 100644 --- a/src/services/twitterAccountManager.ts +++ b/src/services/twitterAccountManager.ts @@ -1,6 +1,6 @@ import { createClient } from 'redis'; import { decrypt } from '../lib/encryption'; -import { getApiKeyUsage, trackApiKeyUsage } from '../utils/redisUtils'; +import { createHash } from 'crypto'; export interface TwitterAccount { accountId: string; @@ -65,11 +65,12 @@ export class TwitterAccountManager { TWITTER_CSRF_TOKEN: decrypt(encryptedAccount.TWITTER_CSRF_TOKEN), }; - // Generate account ID from auth token (first 8 chars for uniqueness) - const accountId = `twitter_${credentials.TWITTER_AUTH_TOKEN.substring(0, 8)}`; + // Generate stable, non-reversible account ID (SHA-256, 12 hex chars) + const token = credentials.TWITTER_AUTH_TOKEN; + const accountId = `twitter_${createHash('sha256').update(token).digest('hex').slice(0, 12)}`; - // Get usage statistics from Redis - const usage = await getApiKeyUsage({ accountId, platform: 'twitter' }); + // Get usage statistics from Redis (same client) + const usage = await this.getApiKeyUsageLocal(accountId); accounts.push({ accountId, @@ -107,20 +108,48 @@ export class TwitterAccountManager { return new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime(); }); - const selectedAccount = accounts[0]; - - console.log(`Selected Twitter account: ${selectedAccount.accountId}`); - console.log(`Last used: ${selectedAccount.lastUsed || 'Never'}`); - console.log(`Total requests: ${selectedAccount.totalRequests || 0}`); - - return selectedAccount; + for (const acc of accounts) { + const lockKey = `lock:twitter:${acc.accountId}`; + const ok = await this.redisClient.set(lockKey, '1', { NX: true, PX: 15000 }); + if (ok === 'OK') { + console.debug(`[TwitterAccountManager] Selected account=${acc.accountId} lastUsed=${acc.lastUsed ?? 'Never'} totalRequests=${acc.totalRequests ?? 0}`); + return acc; + } + } + throw new Error('No available Twitter accounts to claim (all locked).'); } /** - * Mark an account as used (updates the tracking in Redis) + * Mark an account as used (updates the tracking in Redis and releases the selection lock) */ async markAccountAsUsed(accountId: string): Promise { - await trackApiKeyUsage({ accountId, platform: 'twitter' }); + await this.trackApiKeyUsageLocal(accountId); + // Best-effort unlock; lock has TTL as a safety net + try { await this.redisClient.del(`lock:twitter:${accountId}`); } catch { } + } + + /** + * Local usage tracking for Twitter accounts + */ + private async getApiKeyUsageLocal(accountId: string): Promise<{ total_requests: number; last_request: string | null }> { + await this.ensureConnected(); + const key = `twitter_accounts:${accountId}`; + const data = await this.redisClient.hGetAll(key); + return { + total_requests: data?.total_requests ? parseInt(data.total_requests, 10) : 0, + last_request: data?.last_request ?? null, + }; + } + + private async trackApiKeyUsageLocal(accountId: string): Promise { + await this.ensureConnected(); + const key = `twitter_accounts:${accountId}`; + const now = new Date().toISOString(); + await this.redisClient + .multi() + .hIncrBy(key, 'total_requests', 1) + .hSet(key, { last_request: now, account_id: accountId }) + .exec(); } /** diff --git a/src/tests/telegramRotationDemo.ts b/src/tests/telegramRotationDemo.ts index b79f819..2bc79ef 100644 --- a/src/tests/telegramRotationDemo.ts +++ b/src/tests/telegramRotationDemo.ts @@ -19,7 +19,7 @@ async function simulateTelegramFetch() { try { // Show initial state console.log('šŸ“Š Initial account usage state:'); - const initialAccounts = await telegramAccountManager.getAllAccountsUsage(); + const initialAccounts = await telegramAccountManager.getAllAccountsWithCredentials(); initialAccounts.forEach((account, index) => { console.log(` ${index + 1}. ${account.accountId}`); console.log(` Last used: ${account.lastUsed || 'Never'}`); @@ -54,7 +54,7 @@ async function simulateTelegramFetch() { } console.log('šŸ“ˆ Final account usage state:'); - const finalAccounts = await telegramAccountManager.getAllAccountsUsage(); + const finalAccounts = await telegramAccountManager.getAllAccountsWithCredentials(); // Sort by last used to show the rotation order finalAccounts.sort((a, b) => { diff --git a/src/tests/testTelegramRotation.ts b/src/tests/testTelegramRotation.ts index 6fc77e0..af8d4e6 100644 --- a/src/tests/testTelegramRotation.ts +++ b/src/tests/testTelegramRotation.ts @@ -19,7 +19,7 @@ async function testTelegramAccountRotation() { try { // Test 1: Get all accounts and show their usage console.log('šŸ“Š Test 1: Fetching all Telegram accounts...'); - const allAccounts = await telegramAccountManager.getAllAccountsUsage(); + const allAccounts = await telegramAccountManager.getAllAccountsWithCredentials(); if (allAccounts.length === 0) { console.log('āŒ No Telegram accounts found in Redis'); @@ -60,7 +60,7 @@ async function testTelegramAccountRotation() { // Test 3: Show final state console.log('šŸ“ˆ Test 3: Final usage state after rotation...'); - const finalAccounts = await telegramAccountManager.getAllAccountsUsage(); + const finalAccounts = await telegramAccountManager.getAllAccountsWithCredentials(); finalAccounts.forEach((account, index) => { console.log(` Account ${index + 1}: ${account.accountId}`); console.log(` Last used: ${account.lastUsed || 'Never'}`); From 5d212e0e0c4b23e48c04dc67d95c15acbf0850be Mon Sep 17 00:00:00 2001 From: Alison Hawk Date: Thu, 18 Sep 2025 12:29:06 -0600 Subject: [PATCH 3/4] feat: Refactor Telegram client session handling to use Redis for session storage and retrieval --- src/index.ts | 14 ++++--- src/services/BaseAccountManager.ts | 67 ++++++++++++++++++++++++++++++ src/utils/moveEnvToRedis.ts | 13 +----- 3 files changed, 77 insertions(+), 17 deletions(-) create mode 100644 src/services/BaseAccountManager.ts diff --git a/src/index.ts b/src/index.ts index 241f48d..2a018a0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -27,14 +27,16 @@ async function createTelegramClient(account: TelegramAccount): Promise; + lastUsed?: string; + totalRequests?: number; +} + +export abstract class BaseAccountManager { + protected redisClient: RedisClientType; + protected isConnected = false; + protected abstract platform: string; + protected abstract accountKey: string; + protected abstract usageKeyPrefix: string; + + constructor(redisUrl?: string) { + this.redisClient = createClient({ + url: redisUrl || process.env.REDIS_URL, + }); + this.redisClient.on('error', (err) => { + console.error(`Redis Client Error in ${this.platform}AccountManager:`, err); + }); + } + + protected async ensureConnected(): Promise { + if (!this.isConnected) { + await this.redisClient.connect(); + this.isConnected = true; + } + } + + protected abstract fetchAllAccounts(): Promise; + + async getEarliestUsedAccount(): Promise { + const accounts = await this.fetchAllAccounts(); + accounts.sort((a, b) => { + if (!a.lastUsed && !b.lastUsed) return 0; + if (!a.lastUsed) return -1; + if (!b.lastUsed) return 1; + return new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime(); + }); + for (const acc of accounts) { + const lockKey = `lock:${this.platform}:${acc.accountId}`; + const ok = await this.redisClient.set(lockKey, '1', { NX: true, PX: 15000 }); + if (ok === 'OK') { + console.debug(`[${this.platform}AccountManager] Selected account=${acc.accountId} lastUsed=${acc.lastUsed ?? 'Never'} totalRequests=${acc.totalRequests ?? 0}`); + return acc; + } + } + throw new Error(`No available ${this.platform} accounts to claim (all locked).`); + } + + async markAccountAsUsed(accountId: string): Promise { + await this.trackApiKeyUsageLocal(accountId); + try { await this.redisClient.del(`lock:${this.platform}:${accountId}`); } catch { } + } + + protected abstract trackApiKeyUsageLocal(accountId: string): Promise; + + async disconnect(): Promise { + if (this.isConnected) { + await this.redisClient.quit(); + this.isConnected = false; + } + } +} diff --git a/src/utils/moveEnvToRedis.ts b/src/utils/moveEnvToRedis.ts index dd2187d..c838b74 100644 --- a/src/utils/moveEnvToRedis.ts +++ b/src/utils/moveEnvToRedis.ts @@ -22,14 +22,7 @@ async function moveEnvToRedis() { await ensureRedisConnected(); // Define which keys belong to which service const twitterKeys = ['TWITTER_AUTH_TOKEN', 'TWITTER_BEARER', 'TWITTER_CSRF_TOKEN']; - const telegramKeys = ['API_ID', 'API_HASH', 'TG_CHANNEL']; // Map to original env names - - // Map original env names to expected names - const telegramKeyMap = { - 'API_ID': 'TELEGRAM_API_ID', - 'API_HASH': 'TELEGRAM_API_HASH', - 'TG_CHANNEL': 'TELEGRAM_TG_CHANNEL' - }; + const telegramKeys = ['TELEGRAM_API_ID', 'TELEGRAM_API_HASH', 'TELEGRAM_TG_CHANNEL']; // Encrypt each value individually and store as an object const twitterAccount: Record = {}; @@ -40,9 +33,7 @@ async function moveEnvToRedis() { if (twitterKeys.includes(key)) { twitterAccount[key] = encrypt(value); } else if (telegramKeys.includes(key)) { - // Map the original key to the expected key name - const mappedKey = telegramKeyMap[key as keyof typeof telegramKeyMap]; - telegramAccount[mappedKey] = encrypt(value); + telegramAccount[key] = encrypt(value); } else { otherVars[key] = encrypt(value); } From 2f2f23c8243c439497e4fcef68dd3545d9c1a4e8 Mon Sep 17 00:00:00 2001 From: Alison Hawk Date: Fri, 19 Sep 2025 03:26:41 -0600 Subject: [PATCH 4/4] feat: Refactor Telegram and Twitter account managers to extend BaseAccountManager for improved Redis integration and connection handling --- src/services/BaseAccountManager.ts | 2 + src/services/telegramAccountManager.ts | 76 ++++-------------------- src/services/twitterAccountManager.ts | 80 ++++---------------------- 3 files changed, 23 insertions(+), 135 deletions(-) diff --git a/src/services/BaseAccountManager.ts b/src/services/BaseAccountManager.ts index 67b5d46..6705cdd 100644 --- a/src/services/BaseAccountManager.ts +++ b/src/services/BaseAccountManager.ts @@ -33,6 +33,7 @@ export abstract class BaseAccountManager { protected abstract fetchAllAccounts(): Promise; async getEarliestUsedAccount(): Promise { + await this.ensureConnected(); const accounts = await this.fetchAllAccounts(); accounts.sort((a, b) => { if (!a.lastUsed && !b.lastUsed) return 0; @@ -52,6 +53,7 @@ export abstract class BaseAccountManager { } async markAccountAsUsed(accountId: string): Promise { + await this.ensureConnected(); await this.trackApiKeyUsageLocal(accountId); try { await this.redisClient.del(`lock:${this.platform}:${accountId}`); } catch { } } diff --git a/src/services/telegramAccountManager.ts b/src/services/telegramAccountManager.ts index 6fb084a..2e8567c 100644 --- a/src/services/telegramAccountManager.ts +++ b/src/services/telegramAccountManager.ts @@ -1,8 +1,8 @@ -import { createClient } from 'redis'; +import { BaseAccountManager, BaseAccount } from './BaseAccountManager'; import { decrypt } from '../lib/encryption'; import { getApiKeyUsage, trackApiKeyUsage } from '../utils/redisUtils'; -export interface TelegramAccount { +export interface TelegramAccount extends BaseAccount { accountId: string; credentials: { TELEGRAM_API_ID: string; @@ -13,34 +13,22 @@ export interface TelegramAccount { totalRequests?: number; } -export class TelegramAccountManager { - private redisClient: ReturnType; - private isConnected = false; +export class TelegramAccountManager extends BaseAccountManager { + protected platform = 'telegram'; + protected accountKey = 'telegram-accounts'; + protected usageKeyPrefix = 'telegram_accounts'; constructor(redisUrl?: string) { - this.redisClient = createClient({ - url: redisUrl || process.env.REDIS_URL || 'redis://localhost:6379' - }); - - this.redisClient.on('error', (err) => { - console.error('Redis Client Error in TelegramAccountManager:', err); - }); - } - - private async ensureConnected(): Promise { - if (!this.isConnected) { - await this.redisClient.connect(); - this.isConnected = true; - } + super(redisUrl); } /** * Fetch all Telegram accounts from Redis and decrypt their credentials */ - private async fetchAllAccounts(): Promise { + protected async fetchAllAccounts(): Promise { await this.ensureConnected(); - const raw = await this.redisClient.get('telegram-accounts'); + const raw = await this.redisClient.get(this.accountKey); if (!raw) { throw new Error('No Telegram accounts found in Redis'); } @@ -90,45 +78,11 @@ export class TelegramAccountManager { return accounts; } - /** - * Get the Telegram account that was used earliest (least recently used) - */ - async getEarliestUsedAccount(): Promise { - const accounts = await this.fetchAllAccounts(); - - // Sort accounts by last_request timestamp (earliest first) - // Accounts with no last_request (never used) come first - accounts.sort((a, b) => { - if (!a.lastUsed && !b.lastUsed) return 0; - if (!a.lastUsed) return -1; // a comes first (never used) - if (!b.lastUsed) return 1; // b comes first (never used) - - // Both have lastUsed dates, compare them - return new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime(); - }); - - const selectedAccount = accounts[0]; - - console.log(`Selected Telegram account: ${selectedAccount.accountId}`); - console.log(`Last used: ${selectedAccount.lastUsed || 'Never'}`); - console.log(`Total requests: ${selectedAccount.totalRequests || 0}`); - - return selectedAccount; - } - - /** - * Mark an account as used (updates the tracking in Redis and releases the selection lock) - */ - async markAccountAsUsed(accountId: string): Promise { - await this.trackApiKeyUsageLocal(accountId); - // Best-effort unlock; lock has TTL as a safety net - try { await this.redisClient.del(`lock:telegram:${accountId}`); } catch { } - } /** * Local usage tracking for Telegram accounts */ - private async trackApiKeyUsageLocal(accountId: string): Promise { + protected async trackApiKeyUsageLocal(accountId: string): Promise { await trackApiKeyUsage({ accountId, platform: 'telegram' }); } @@ -146,16 +100,6 @@ export class TelegramAccountManager { async getAllAccountsWithCredentials(): Promise { return await this.fetchAllAccounts(); } - - /** - * Close the Redis connection - */ - async disconnect(): Promise { - if (this.isConnected) { - await this.redisClient.quit(); - this.isConnected = false; - } - } } // Export singleton instance diff --git a/src/services/twitterAccountManager.ts b/src/services/twitterAccountManager.ts index d994105..980e7db 100644 --- a/src/services/twitterAccountManager.ts +++ b/src/services/twitterAccountManager.ts @@ -1,8 +1,8 @@ -import { createClient } from 'redis'; +import { BaseAccountManager, BaseAccount } from './BaseAccountManager'; import { decrypt } from '../lib/encryption'; import { createHash } from 'crypto'; -export interface TwitterAccount { +export interface TwitterAccount extends BaseAccount { accountId: string; credentials: { TWITTER_AUTH_TOKEN: string; @@ -13,34 +13,22 @@ export interface TwitterAccount { totalRequests?: number; } -export class TwitterAccountManager { - private redisClient: ReturnType; - private isConnected = false; +export class TwitterAccountManager extends BaseAccountManager { + protected platform = 'twitter'; + protected accountKey = 'twitter-accounts'; + protected usageKeyPrefix = 'twitter_accounts'; constructor(redisUrl?: string) { - this.redisClient = createClient({ - url: redisUrl || process.env.REDIS_URL - }); - - this.redisClient.on('error', (err) => { - console.error('Redis Client Error in TwitterAccountManager:', err); - }); - } - - private async ensureConnected(): Promise { - if (!this.isConnected) { - await this.redisClient.connect(); - this.isConnected = true; - } + super(redisUrl); } /** * Fetch all Twitter accounts from Redis and decrypt their credentials */ - private async fetchAllAccounts(): Promise { + protected async fetchAllAccounts(): Promise { await this.ensureConnected(); - const raw = await this.redisClient.get('twitter-accounts'); + const raw = await this.redisClient.get(this.accountKey); if (!raw) { throw new Error('No Twitter accounts found in Redis'); } @@ -91,45 +79,9 @@ export class TwitterAccountManager { return accounts; } - /** - * Get the Twitter account that was used earliest (least recently used) - */ - async getEarliestUsedAccount(): Promise { - const accounts = await this.fetchAllAccounts(); - - // Sort accounts by last_request timestamp (earliest first) - // Accounts with no last_request (never used) come first - accounts.sort((a, b) => { - if (!a.lastUsed && !b.lastUsed) return 0; - if (!a.lastUsed) return -1; // a comes first (never used) - if (!b.lastUsed) return 1; // b comes first (never used) - - // Both have lastUsed dates, compare them - return new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime(); - }); - - for (const acc of accounts) { - const lockKey = `lock:twitter:${acc.accountId}`; - const ok = await this.redisClient.set(lockKey, '1', { NX: true, PX: 15000 }); - if (ok === 'OK') { - console.debug(`[TwitterAccountManager] Selected account=${acc.accountId} lastUsed=${acc.lastUsed ?? 'Never'} totalRequests=${acc.totalRequests ?? 0}`); - return acc; - } - } - throw new Error('No available Twitter accounts to claim (all locked).'); - } - - /** - * Mark an account as used (updates the tracking in Redis and releases the selection lock) - */ - async markAccountAsUsed(accountId: string): Promise { - await this.trackApiKeyUsageLocal(accountId); - // Best-effort unlock; lock has TTL as a safety net - try { await this.redisClient.del(`lock:twitter:${accountId}`); } catch { } - } /** - * Local usage tracking for Twitter accounts + * Local usage read for Twitter accounts (using the same Redis client) */ private async getApiKeyUsageLocal(accountId: string): Promise<{ total_requests: number; last_request: string | null }> { await this.ensureConnected(); @@ -141,7 +93,7 @@ export class TwitterAccountManager { }; } - private async trackApiKeyUsageLocal(accountId: string): Promise { + protected async trackApiKeyUsageLocal(accountId: string): Promise { await this.ensureConnected(); const key = `twitter_accounts:${accountId}`; const now = new Date().toISOString(); @@ -158,16 +110,6 @@ export class TwitterAccountManager { async getAllAccountsUsage(): Promise { return await this.fetchAllAccounts(); } - - /** - * Close the Redis connection - */ - async disconnect(): Promise { - if (this.isConnected) { - await this.redisClient.quit(); - this.isConnected = false; - } - } } // Export singleton instance