Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"printWidth": 120,
"trailingComma": "none",
"singleQuote": true,
"semi": true,
"useTabs": false,
"tabWidth": 2,
"arrowParens": "always",
"jsxSingleQuote": true
}
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# ingestion-engine

Data ingestion service for fetching real-time content from social media APIs.
16 changes: 16 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
"redis": "^5.8.2",
"telegram": "^2.26.22"
},
"scripts": {
"format": "prettier --write ."
},
"devDependencies": {
"@types/node": "^24.3.0",
"prettier": "^3.6.2",
"esbuild": "^0.25.9",
"tsx": "^4.20.5",
"typescript": "^5.9.2"
Expand Down
11 changes: 5 additions & 6 deletions src/fetchTelegramMessages.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Api, TelegramClient } from "telegram";
import { Api, TelegramClient } from 'telegram';
import { trackApiKeyUsage } from './utils/redisUtils';
import { TelegramAccount } from './services/telegramAccountManager';

Expand All @@ -10,7 +10,7 @@ export async function fetchTelegramMessages(
): Promise<TelegramMessages[]> {
const channel = account.credentials.TELEGRAM_TG_CHANNEL;
if (!channel) {
throw new Error("TELEGRAM_TG_CHANNEL is not set in account credentials.");
throw new Error('TELEGRAM_TG_CHANNEL is not set in account credentials.');
}

if (process.env.DEBUG_TELEGRAM === '1') {
Expand Down Expand Up @@ -38,13 +38,13 @@ export async function fetchTelegramMessages(
const messages = await client.invoke(
new Api.messages.GetHistory({
peer: entity,
limit: 10,
limit: 10
})
);

const out: TelegramMessages[] = [];

if ("messages" in messages) {
if ('messages' in messages) {
for (const msg of messages.messages as any[]) {
const id = typeof msg?.id === 'number' || typeof msg?.id === 'string' ? String(msg.id) : null;
const content = typeof msg?.message === 'string' ? msg.message : '';
Expand All @@ -54,9 +54,8 @@ export async function fetchTelegramMessages(
console.log(formatted);
}
} else {
console.log("No messages property found in response:", messages);
console.log('No messages property found in response:', messages);
}


return out;
}
34 changes: 16 additions & 18 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import 'dotenv/config';
import input from "input"; // interactive input for login
import input from 'input'; // interactive input for login
import cron from 'node-cron';
import { TelegramClient } from "telegram";
import { StringSession } from "telegram/sessions";
import { TelegramClient } from 'telegram';
import { StringSession } from 'telegram/sessions';
import { fetchTelegramMessages } from './fetchTelegramMessages';
import { telegramAccountManager, TelegramAccount } from './services/telegramAccountManager';

Expand Down Expand Up @@ -34,24 +34,26 @@ async function createTelegramClient(account: TelegramAccount): Promise<TelegramC
let sessionStr = await redisClient.get(sessionKey);
const isInteractive = Boolean(process.stdin.isTTY);
if (!sessionStr && !isInteractive) {
throw new Error(`Missing session in Redis for ${account.accountId} (key: ${sessionKey}). Generate and store a session string before running cron.`);
throw new Error(
`Missing session in Redis for ${account.accountId} (key: ${sessionKey}). Generate and store a session string before running cron.`
);
}
sessionStr = sessionStr || "";
sessionStr = sessionStr || '';
const stringSession = new StringSession(sessionStr);
const client = new TelegramClient(stringSession, apiId, apiHash, {
connectionRetries: 5,
connectionRetries: 5
});

await client.start({
phoneNumber: async () => await input.text("Enter your phone number: "),
password: async () => await input.text("Enter 2FA password (if enabled): "),
phoneCode: async () => await input.text("Enter code you received: "),
onError: (err) => console.log(err),
phoneNumber: async () => await input.text('Enter your phone number: '),
password: async () => await input.text('Enter 2FA password (if enabled): '),
phoneCode: async () => await input.text('Enter code you received: '),
onError: (err) => console.log(err)
});

console.log(`Logged in successfully for account: ${account.accountId}`);
const saved = client.session.save();
if (process.env.PRINT_TG_SESSION === "1" && isInteractive) {
if (process.env.PRINT_TG_SESSION === '1' && isInteractive) {
// Emit an export-ready line deliberately, instead of dumping secrets in logs
console.log(`export ${sessionKey}="${saved}"`);
}
Expand All @@ -63,7 +65,7 @@ async function createTelegramClient(account: TelegramAccount): Promise<TelegramC
}

async function startTelegramCron() {
console.log("Starting Telegram account rotation system...");
console.log('Starting Telegram account rotation system...');

// Run once at startup
try {
Expand All @@ -81,7 +83,7 @@ async function startTelegramCron() {
console.log(` Last used: ${acc.lastUsed || 'Never'}`);
});
} catch (err) {
console.error("Startup Telegram fetch failed:", err);
console.error('Startup Telegram fetch failed:', err);
}

// Schedule to run every 5 minutes with account rotation
Expand All @@ -102,9 +104,5 @@ async function startTelegramCron() {
}

startTelegramCron().catch((err) => {
console.error("Failed to start Telegram cron:", err);
console.error('Failed to start Telegram cron:', err);
});




48 changes: 23 additions & 25 deletions src/lib/encryption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,33 @@ const IV_LENGTH = 12; // per NIST recommendation for GCM
// Use a strong key in production, ideally from a secure source
let KEY: Buffer | null = null;
function getKey(): Buffer {
const k = process.env.ENCRYPTION_KEY;
if (!k) throw new Error('ENCRYPTION_KEY environment variable must be set');
if (!/^[0-9a-fA-F]{64}$/.test(k)) {
throw new Error('ENCRYPTION_KEY must be a 64-hex-char (256-bit) value');
}
if (!KEY) KEY = Buffer.from(k, 'hex');
return KEY;
const k = process.env.ENCRYPTION_KEY;
if (!k) throw new Error('ENCRYPTION_KEY environment variable must be set');
if (!/^[0-9a-fA-F]{64}$/.test(k)) {
throw new Error('ENCRYPTION_KEY must be a 64-hex-char (256-bit) value');
}
if (!KEY) KEY = Buffer.from(k, 'hex');
return KEY;
}


export function encrypt(text: string): string {
const iv = crypto.randomBytes(IV_LENGTH);
const cipher = crypto.createCipheriv(ALGORITHM, getKey(), iv);
const ciphertext = Buffer.concat([cipher.update(text, 'utf8'), cipher.final()]);
const tag = cipher.getAuthTag();
return `${SCHEME}:${iv.toString('hex')}:${tag.toString('hex')}:${ciphertext.toString('hex')}`;
const iv = crypto.randomBytes(IV_LENGTH);
const cipher = crypto.createCipheriv(ALGORITHM, getKey(), iv);
const ciphertext = Buffer.concat([cipher.update(text, 'utf8'), cipher.final()]);
const tag = cipher.getAuthTag();
return `${SCHEME}:${iv.toString('hex')}:${tag.toString('hex')}:${ciphertext.toString('hex')}`;
}


export function decrypt(text: string): string {
const parts = text.split(':');
if (parts.length !== 4) throw new Error('Invalid payload format');
const [scheme, ivHex, tagHex, dataHex] = parts;
if (scheme !== SCHEME) throw new Error(`Unsupported scheme: ${scheme}`);
const iv = Buffer.from(ivHex, 'hex');
const tag = Buffer.from(tagHex, 'hex');
const data = Buffer.from(dataHex, 'hex');
const decipher = crypto.createDecipheriv(ALGORITHM, getKey(), iv);
decipher.setAuthTag(tag);
const plaintext = Buffer.concat([decipher.update(data), decipher.final()]);
return plaintext.toString('utf8');
const parts = text.split(':');
if (parts.length !== 4) throw new Error('Invalid payload format');
const [scheme, ivHex, tagHex, dataHex] = parts;
if (scheme !== SCHEME) throw new Error(`Unsupported scheme: ${scheme}`);
const iv = Buffer.from(ivHex, 'hex');
const tag = Buffer.from(tagHex, 'hex');
const data = Buffer.from(dataHex, 'hex');
const decipher = crypto.createDecipheriv(ALGORITHM, getKey(), iv);
decipher.setAuthTag(tag);
const plaintext = Buffer.concat([decipher.update(data), decipher.final()]);
return plaintext.toString('utf8');
}
4 changes: 2 additions & 2 deletions src/lib/utils/string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
* Example: abcd1234efgh5678 -> abcd…5678
*/
export function mask(v: string): string {
if (!v) return '';
return v.length <= 8 ? '********' : `${v.slice(0, 4)}…${v.slice(-4)}`;
if (!v) return '';
return v.length <= 8 ? '********' : `${v.slice(0, 4)}…${v.slice(-4)}`;
}
108 changes: 56 additions & 52 deletions src/services/BaseAccountManager.ts
Original file line number Diff line number Diff line change
@@ -1,69 +1,73 @@
import { createClient, RedisClientType } from 'redis';

export interface BaseAccount {
accountId: string;
credentials: Record<string, string>;
lastUsed?: string;
totalRequests?: number;
accountId: string;
credentials: Record<string, string>;
lastUsed?: string;
totalRequests?: number;
}

export abstract class BaseAccountManager<T extends BaseAccount> {
protected redisClient: RedisClientType;
protected isConnected = false;
protected abstract platform: string;
protected abstract accountKey: string;
protected abstract usageKeyPrefix: string;
protected redisClient: RedisClientType;
protected isConnected = false;
protected abstract platform: string;
protected abstract accountKey: string;
protected abstract usageKeyPrefix: string;

constructor(redisUrl?: string) {
this.redisClient = createClient({
url: redisUrl || process.env.REDIS_URL,
});
this.redisClient.on('error', (err) => {
console.error(`Redis Client Error in ${this.platform}AccountManager:`, err);
});
}
constructor(redisUrl?: string) {
this.redisClient = createClient({
url: redisUrl || process.env.REDIS_URL
});
this.redisClient.on('error', (err) => {
console.error(`Redis Client Error in ${this.platform}AccountManager:`, err);
});
}

protected async ensureConnected(): Promise<void> {
if (!this.isConnected) {
await this.redisClient.connect();
this.isConnected = true;
}
protected async ensureConnected(): Promise<void> {
if (!this.isConnected) {
await this.redisClient.connect();
this.isConnected = true;
}
}

protected abstract fetchAllAccounts(): Promise<T[]>;
protected abstract fetchAllAccounts(): Promise<T[]>;

async getEarliestUsedAccount(): Promise<T> {
await this.ensureConnected();
const accounts = await this.fetchAllAccounts();
accounts.sort((a, b) => {
if (!a.lastUsed && !b.lastUsed) return 0;
if (!a.lastUsed) return -1;
if (!b.lastUsed) return 1;
return new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime();
});
for (const acc of accounts) {
const lockKey = `lock:${this.platform}:${acc.accountId}`;
const ok = await this.redisClient.set(lockKey, '1', { NX: true, PX: 15000 });
if (ok === 'OK') {
console.debug(`[${this.platform}AccountManager] Selected account=${acc.accountId} lastUsed=${acc.lastUsed ?? 'Never'} totalRequests=${acc.totalRequests ?? 0}`);
return acc;
}
}
throw new Error(`No available ${this.platform} accounts to claim (all locked).`);
async getEarliestUsedAccount(): Promise<T> {
await this.ensureConnected();
const accounts = await this.fetchAllAccounts();
accounts.sort((a, b) => {
if (!a.lastUsed && !b.lastUsed) return 0;
if (!a.lastUsed) return -1;
if (!b.lastUsed) return 1;
return new Date(a.lastUsed).getTime() - new Date(b.lastUsed).getTime();
});
for (const acc of accounts) {
const lockKey = `lock:${this.platform}:${acc.accountId}`;
const ok = await this.redisClient.set(lockKey, '1', { NX: true, PX: 15000 });
if (ok === 'OK') {
console.debug(
`[${this.platform}AccountManager] Selected account=${acc.accountId} lastUsed=${acc.lastUsed ?? 'Never'} totalRequests=${acc.totalRequests ?? 0}`
);
return acc;
}
}
throw new Error(`No available ${this.platform} accounts to claim (all locked).`);
}

async markAccountAsUsed(accountId: string): Promise<void> {
await this.ensureConnected();
await this.trackApiKeyUsageLocal(accountId);
try { await this.redisClient.del(`lock:${this.platform}:${accountId}`); } catch { }
}
async markAccountAsUsed(accountId: string): Promise<void> {
await this.ensureConnected();
await this.trackApiKeyUsageLocal(accountId);
try {
await this.redisClient.del(`lock:${this.platform}:${accountId}`);
} catch {}
}

protected abstract trackApiKeyUsageLocal(accountId: string): Promise<void>;
protected abstract trackApiKeyUsageLocal(accountId: string): Promise<void>;

async disconnect(): Promise<void> {
if (this.isConnected) {
await this.redisClient.quit();
this.isConnected = false;
}
async disconnect(): Promise<void> {
if (this.isConnected) {
await this.redisClient.quit();
this.isConnected = false;
}
}
}
Loading