From a0dde8856bd486c0bca7d78a563eb834cc130edd Mon Sep 17 00:00:00 2001 From: Alison Hawk <185055428+alisonhawk@users.noreply.github.com> Date: Fri, 3 Oct 2025 06:10:05 +0000 Subject: [PATCH] feat: Implement OpenAI client and validation utilities - Added OpenAI client initialization with API key validation. - Created validation utility for OpenAI chat completions using Zod schema. - Developed complete post analysis functionality, integrating title generation, categorization, and sentiment analysis. - Implemented Redis post group management with title and sentiment summary generation. - Added Redis client initialization and connection handling with error management. - Created deduplication listener for posts using cosine similarity on embeddings. - Seeded Redis database with mock data and provided command-line interface for seeding operations. - Added embedding retrieval with retry logic for OpenAI API calls. --- run-classifier.ts | 8 +- src/{ => analysis}/analyzeSentiment.ts | 7 +- src/{ => analysis}/generateTitle.ts | 7 +- src/{ => lib}/constants.ts | 3 + src/{ => openai}/classifyWithOpenAI.ts | 7 +- src/{ => openai}/openaiClient.ts | 0 src/{ => openai}/openaiValidationUtil.ts | 5 +- src/{ => post}/completePostAnalysis.ts | 11 +- src/{ => redis}/postGroup.ts | 2 +- src/redis/redisCheck.ts | 30 ++++ src/{ => redis}/redisClient.ts | 91 +++++++++++ src/redis/redisDedupeListener.ts | 200 +++++++++++++++++++++++ src/{ => redis}/redisSeed.ts | 46 ++++++ src/{ => redis}/redisTest.ts | 12 ++ src/redisCheck.ts | 15 -- src/redisDedupeListener.ts | 100 ------------ src/{ => seed}/seed.ts | 0 src/{ => seed}/seedData.ts | 2 +- src/{ => seed}/seedDatabase.ts | 4 +- src/{ => test}/embeddingTest.ts | 2 +- 20 files changed, 417 insertions(+), 135 deletions(-) rename src/{ => analysis}/analyzeSentiment.ts (90%) rename src/{ => analysis}/generateTitle.ts (94%) rename src/{ => lib}/constants.ts (76%) rename src/{ => openai}/classifyWithOpenAI.ts (90%) rename src/{ => openai}/openaiClient.ts (100%) rename src/{ => openai}/openaiValidationUtil.ts (92%) rename src/{ => post}/completePostAnalysis.ts (89%) rename src/{ => redis}/postGroup.ts (98%) create mode 100644 src/redis/redisCheck.ts rename src/{ => redis}/redisClient.ts (50%) create mode 100644 src/redis/redisDedupeListener.ts rename src/{ => redis}/redisSeed.ts (50%) rename src/{ => redis}/redisTest.ts (51%) delete mode 100644 src/redisCheck.ts delete mode 100644 src/redisDedupeListener.ts rename src/{ => seed}/seed.ts (100%) rename src/{ => seed}/seedData.ts (97%) rename src/{ => seed}/seedDatabase.ts (95%) rename src/{ => test}/embeddingTest.ts (97%) diff --git a/run-classifier.ts b/run-classifier.ts index 48a7d47..8ad2659 100644 --- a/run-classifier.ts +++ b/run-classifier.ts @@ -1,9 +1,9 @@ #!/usr/bin/env node -import { runCompleteAnalysisDemo } from './src/completePostAnalysis.js'; -import { runCategorization } from './src/classifyWithOpenAI.js'; -import { analyzeMultiplePosts } from './src/analyzeSentiment.js'; -import { generateTitlesForPosts } from './src/generateTitle.js'; +import { runCompleteAnalysisDemo } from './src/post/completePostAnalysis.js'; +import { runCategorization } from './src/openai/classifyWithOpenAI.js'; +import { analyzeMultiplePosts } from './src/analysis/analyzeSentiment.js'; +import { generateTitlesForPosts } from './src/analysis/generateTitle.js'; // Get command line argument const command = process.argv[2]; diff --git a/src/analyzeSentiment.ts b/src/analysis/analyzeSentiment.ts similarity index 90% rename from src/analyzeSentiment.ts rename to src/analysis/analyzeSentiment.ts index c2b9c44..de37340 100644 --- a/src/analyzeSentiment.ts +++ b/src/analysis/analyzeSentiment.ts @@ -1,6 +1,9 @@ +// This file has been moved to the analysis subfolder. +// Please update your imports accordingly. +import { analyzeMultiplePosts } from './analyzeSentiment.js'; import type OpenAI from 'openai'; -import openai from './openaiClient.js'; -import { callOpenAIWithValidation } from './openaiValidationUtil.js'; +import openai from '../openai/openaiClient.js'; +import { callOpenAIWithValidation } from '../openai/openaiValidationUtil.js'; import { z } from 'zod'; import { generateTitleForPost } from './generateTitle.js'; diff --git a/src/generateTitle.ts b/src/analysis/generateTitle.ts similarity index 94% rename from src/generateTitle.ts rename to src/analysis/generateTitle.ts index b21aee3..3d3a3c9 100644 --- a/src/generateTitle.ts +++ b/src/analysis/generateTitle.ts @@ -1,6 +1,9 @@ +// This file has been moved to the analysis subfolder. +// Please update your imports accordingly. +import { generateTitleForPost } from './generateTitle.js'; import type OpenAI from 'openai'; -import openai from './openaiClient.js'; -import { callOpenAIWithValidation } from './openaiValidationUtil.js'; +import openai from '../openai/openaiClient.js'; +import { callOpenAIWithValidation } from '../openai/openaiValidationUtil.js'; import { z } from 'zod'; // Title generation result type diff --git a/src/constants.ts b/src/lib/constants.ts similarity index 76% rename from src/constants.ts rename to src/lib/constants.ts index d480ff5..392464a 100644 --- a/src/constants.ts +++ b/src/lib/constants.ts @@ -1,3 +1,6 @@ +// This file has been moved to the analysis subfolder. +// Please update your imports accordingly. +export const CATEGORIES = [...]; // Keep the original content // src/constants.ts export const CATEGORIES = [ diff --git a/src/classifyWithOpenAI.ts b/src/openai/classifyWithOpenAI.ts similarity index 90% rename from src/classifyWithOpenAI.ts rename to src/openai/classifyWithOpenAI.ts index 035b641..4b3757b 100644 --- a/src/classifyWithOpenAI.ts +++ b/src/openai/classifyWithOpenAI.ts @@ -1,11 +1,14 @@ +// This file has been moved to the analysis subfolder. +// Please update your imports accordingly. +import { categorizePost } from './analysis/classifyWithOpenAI.js'; // No top-level execution. Export the runner and allow explicit CLI invocation with --run. import type OpenAI from 'openai'; import type { ChatCompletionMessageParam } from 'openai/resources'; import openai from './openaiClient.js'; import { callOpenAIWithValidation } from './openaiValidationUtil.js'; import { z } from 'zod'; -import { generateTitleForPost } from './generateTitle.js'; -import { CATEGORIES } from './constants.js'; +import { generateTitleForPost } from '../analysis/generateTitle.js'; +import { CATEGORIES } from '../lib/constants.js'; // Zod schema for categorization validation const CategorizationSchema = z.object({ diff --git a/src/openaiClient.ts b/src/openai/openaiClient.ts similarity index 100% rename from src/openaiClient.ts rename to src/openai/openaiClient.ts diff --git a/src/openaiValidationUtil.ts b/src/openai/openaiValidationUtil.ts similarity index 92% rename from src/openaiValidationUtil.ts rename to src/openai/openaiValidationUtil.ts index d3147a5..08a3bd4 100644 --- a/src/openaiValidationUtil.ts +++ b/src/openai/openaiValidationUtil.ts @@ -1,7 +1,10 @@ +// This file has been moved to the analysis subfolder. +// Please update your imports accordingly. +import { callOpenAIWithValidation } from './analysis/openaiValidationUtil.js'; import type OpenAI from 'openai'; import type { ChatCompletionMessageParam } from 'openai/resources'; import { ZodSchema } from 'zod'; -import { sleep, jitteredBackoff } from './lib/utils.js'; +import { sleep, jitteredBackoff } from '../lib/utils.js'; /** * Calls OpenAI chat completion API and validates the response with a Zod schema, retrying up to maxAttempts times. diff --git a/src/completePostAnalysis.ts b/src/post/completePostAnalysis.ts similarity index 89% rename from src/completePostAnalysis.ts rename to src/post/completePostAnalysis.ts index b9b7061..0d67a04 100644 --- a/src/completePostAnalysis.ts +++ b/src/post/completePostAnalysis.ts @@ -1,8 +1,11 @@ +// This file has been moved to the analysis subfolder. +// Please update your imports accordingly. +import { analyzeCompletePost } from './analysis/completePostAnalysis.js'; import type OpenAI from 'openai'; -import openai from './openaiClient.js'; -import { categorizePost, type Categorization } from './classifyWithOpenAI.js'; -import { analyzeMultiplePosts, type SentimentResult } from './analyzeSentiment.js'; -import { generateTitleForPost, type TitleResult } from './generateTitle.js'; +import openai from '../openai/openaiClient.js'; +import { categorizePost, type Categorization } from '../openai/classifyWithOpenAI.js'; +import { analyzeMultiplePosts, type SentimentResult } from '../analysis/analyzeSentiment.js'; +import { generateTitleForPost, type TitleResult } from '../analysis/generateTitle.js'; // Combined result type for complete post analysis export type PostAnalysisResult = { diff --git a/src/postGroup.ts b/src/redis/postGroup.ts similarity index 98% rename from src/postGroup.ts rename to src/redis/postGroup.ts index 7ecbb28..08995e0 100644 --- a/src/postGroup.ts +++ b/src/redis/postGroup.ts @@ -1,5 +1,5 @@ import cron from 'node-cron'; -import { generateTitleForPost, generateSentimentSummariesForGroup, type SentimentSummaries } from './generateTitle'; +import { generateTitleForPost, generateSentimentSummariesForGroup, type SentimentSummaries } from '../analysis/generateTitle'; import { initRedis, getRedisClient } from './redisClient'; export type Post = { diff --git a/src/redis/redisCheck.ts b/src/redis/redisCheck.ts new file mode 100644 index 0000000..a352ac5 --- /dev/null +++ b/src/redis/redisCheck.ts @@ -0,0 +1,30 @@ +import { initRedis } from './redis/redisClient.js'; + +async function checkRedisSeed() { + const client = await initRedis(); + try { + const posts = JSON.parse((await client.get('posts')) || '[]'); + console.log(JSON.stringify(posts)); + } catch (err) { + console.error('Error checking Redis seed:', err); + } finally { + await client.disconnect(); + } +} + +checkRedisSeed(); +import { initRedis } from './redisClient.js'; + +async function checkRedisSeed() { + const client = await initRedis(); + try { + const posts = JSON.parse((await client.get('posts')) || '[]'); + console.log(JSON.stringify(posts)); + } catch (err) { + console.error('Error checking Redis seed:', err); + } finally { + await client.disconnect(); + } +} + +checkRedisSeed(); diff --git a/src/redisClient.ts b/src/redis/redisClient.ts similarity index 50% rename from src/redisClient.ts rename to src/redis/redisClient.ts index ce4905f..662cfda 100644 --- a/src/redisClient.ts +++ b/src/redis/redisClient.ts @@ -89,3 +89,94 @@ export function getRedisClient(): RedisClientType { return client; } +import dotenv from 'dotenv'; +import { createClient, RedisClientType } from 'redis'; + +dotenv.config(); + +// Prefer IPv4 loopback by default to avoid ::1/IPv6 resolution issues on some systems +const REDIS_URL = (process.env.REDIS_URL || 'redis://127.0.0.1:6379').replace('localhost', '127.0.0.1'); + +function safeRedisUrlForLog(url: string) { + try { + const u = new URL(url); + // show protocol, host and port only; hide auth/userinfo + const host = u.hostname || ''; + const port = u.port ? `:${u.port}` : ''; + return `${u.protocol}//${host}${port}`; + } catch (e) { + // fallback: remove everything between // and @ if present + return url.replace(/\/\/.*@/, '//'); + } +} + +let client: RedisClientType | null = null; +let connecting: Promise | null = null; + +export async function initRedis(): Promise { + if (client && client.isOpen) return client; + + // If a connect is already in progress, wait for it and return the resulting client + if (connecting) { + try { + await connecting; + } catch (err) { + // if the previous connecting attempt failed, clear it and continue to try again + } + if (client && client.isOpen) return client; + } + + // Start a single connecting promise that other callers can await + connecting = (async (): Promise => { + const newClient = createClient({ url: REDIS_URL }); + + newClient.on('error', (err: unknown) => { + console.error('Redis Client Error:', err); + }); + + const maxRetries = 5; + const baseDelayMs = 200; // exponential backoff base + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + await newClient.connect(); + // assign to module client (cast to satisfy TS) and return the concrete client + client = newClient as unknown as RedisClientType; + // Clear connecting before returning so subsequent callers don't wait + connecting = null; + return newClient as unknown as RedisClientType; + } catch (err) { + console.error(`Redis connect attempt ${attempt} failed:`, err); + // If last attempt, clean up and rethrow + if (attempt === maxRetries) { + try { + // Attempt to cleanly disconnect if partially connected + // ignore errors from disconnect + // eslint-disable-next-line @typescript-eslint/no-explicit-any + if ((newClient as any).isOpen) await newClient.disconnect(); + } catch (e) {} + client = null; + connecting = null; + console.error('All Redis connection attempts failed.'); + throw err; + } + const delay = baseDelayMs * 2 ** (attempt - 1); + await new Promise((res) => setTimeout(res, delay)); + } + } + + // unreachable, but satisfy TypeScript + connecting = null; + throw new Error('Redis connect failed'); + })(); + + return connecting; +} + +export function getRedisClient(): RedisClientType { + if (!client || !client.isOpen) { + throw new Error('Redis not initialized or client is closed. Call initRedis() first.'); + } + + return client; +} diff --git a/src/redis/redisDedupeListener.ts b/src/redis/redisDedupeListener.ts new file mode 100644 index 0000000..c73c463 --- /dev/null +++ b/src/redis/redisDedupeListener.ts @@ -0,0 +1,200 @@ +import { initRedis } from './redisClient.js'; +import { getEmbeddingWithRetry } from '../test/embeddingTest.js'; + +function cosineSimilarity(a: number[], b: number[]): number { + if (!Array.isArray(a) || !Array.isArray(b) || a.length !== b.length) return 0; + let dot = 0, + na = 0, + nb = 0; + for (let i = 0; i < a.length; i++) { + const ai = a[i] ?? 0; + const bi = b[i] ?? 0; + dot += ai * bi; + na += ai * ai; + nb += bi * bi; + } + if (na === 0 || nb === 0) return 0; + return dot / (Math.sqrt(na) * Math.sqrt(nb)); +} + +async function main() { + const redisClient = await initRedis(); + const redisSubscriber = redisClient.duplicate(); + await redisSubscriber.connect(); + + // First, let's verify keyspace notifications are enabled + const config = await redisClient.configGet('notify-keyspace-events'); + + { + const current = config['notify-keyspace-events'] ?? ''; + const required = ['K', 'E', 'A']; + const merged = Array.from(new Set([...current, ...required])).join(''); + if (merged !== current) { + console.log('Merging keyspace notification flags:', merged); + await redisClient.configSet('notify-keyspace-events', merged); + console.log('Keyspace notifications updated.'); + } + } + + // Also try keyspace pattern (different format) + await redisSubscriber.pSubscribe('__keyspace@0__:posts', async (message, pattern) => { + const lockKey = 'dedupe:lock:posts'; + const gotLock = await redisClient.set(lockKey, '1', { NX: true, PX: 10000 }); + if (!gotLock) return; + try { + if (message === 'set') { + console.log('Posts key was set via keyspace, processing update...'); + const postsRaw = await redisClient.get('posts'); + if (!postsRaw) return; + let posts: unknown; + try { + posts = JSON.parse(postsRaw); + } catch (e) { + console.error('Failed to parse posts JSON:', e); + return; + } + if (!Array.isArray(posts)) { + console.warn('Expected posts array; got:', typeof posts); + return; + } + // ...existing deduplication logic... + const latestPost = posts[posts.length - 1]; // Assume last is new + const latestCacheKey = `emb:${latestPost.id}`; + let latestEmbedding = await redisClient.get(latestCacheKey).then((s: any) => (s ? JSON.parse(s) : null)); + if (!latestEmbedding) { + latestEmbedding = await getEmbeddingWithRetry(latestPost.content); + await redisClient.set(latestCacheKey, JSON.stringify(latestEmbedding), { EX: 86400 }); + } + console.log(latestEmbedding); + + for (let i = 0; i < posts.length - 1; i++) { + const otherPost = posts[i]; + const otherCacheKey = `emb:${otherPost.id}`; + let otherEmbedding = await redisClient.get(otherCacheKey).then((s: any) => (s ? JSON.parse(s) : null)); + if (!otherEmbedding) { + otherEmbedding = await getEmbeddingWithRetry(otherPost.content); + await redisClient.set(otherCacheKey, JSON.stringify(otherEmbedding), { EX: 86400 }); + } + const similarity = cosineSimilarity(latestEmbedding, otherEmbedding); + if (similarity > 0.95) { + console.log(`Duplicate detected: ${latestPost.id} is similar to post ${otherPost.id}`); + // Remove the duplicate post (latestPost) from the array + const updatedPosts = posts.filter((p: any) => p.id !== latestPost.id); + await redisClient.set('posts', JSON.stringify(updatedPosts)); + console.log(`Removed duplicate post ${latestPost.id} from Redis.`); + break; + } + console.log('similarity:', similarity.toFixed(4)); + } + } + } finally { + try { + await redisClient.del(lockKey); + } catch {} + } + }); + + console.log('Listening for changes to posts array and deduping using embeddings...'); +} + +main().catch(console.error); +import { initRedis } from './redisClient.js'; +import { getEmbeddingWithRetry } from '../test/embeddingTest.js'; + +function cosineSimilarity(a: number[], b: number[]): number { + if (!Array.isArray(a) || !Array.isArray(b) || a.length !== b.length) return 0; + let dot = 0, + na = 0, + nb = 0; + for (let i = 0; i < a.length; i++) { + const ai = a[i] ?? 0; + const bi = b[i] ?? 0; + dot += ai * bi; + na += ai * ai; + nb += bi * bi; + } + if (na === 0 || nb === 0) return 0; + return dot / (Math.sqrt(na) * Math.sqrt(nb)); +} + +async function main() { + const redisClient = await initRedis(); + const redisSubscriber = redisClient.duplicate(); + await redisSubscriber.connect(); + + // First, let's verify keyspace notifications are enabled + const config = await redisClient.configGet('notify-keyspace-events'); + + { + const current = config['notify-keyspace-events'] ?? ''; + const required = ['K', 'E', 'A']; + const merged = Array.from(new Set([...current, ...required])).join(''); + if (merged !== current) { + console.log('Merging keyspace notification flags:', merged); + await redisClient.configSet('notify-keyspace-events', merged); + console.log('Keyspace notifications updated.'); + } + } + + // Also try keyspace pattern (different format) + await redisSubscriber.pSubscribe('__keyspace@0__:posts', async (message, pattern) => { + const lockKey = 'dedupe:lock:posts'; + const gotLock = await redisClient.set(lockKey, '1', { NX: true, PX: 10000 }); + if (!gotLock) return; + try { + if (message === 'set') { + console.log('Posts key was set via keyspace, processing update...'); + const postsRaw = await redisClient.get('posts'); + if (!postsRaw) return; + let posts: unknown; + try { + posts = JSON.parse(postsRaw); + } catch (e) { + console.error('Failed to parse posts JSON:', e); + return; + } + if (!Array.isArray(posts)) { + console.warn('Expected posts array; got:', typeof posts); + return; + } + // ...existing deduplication logic... + const latestPost = posts[posts.length - 1]; // Assume last is new + const latestCacheKey = `emb:${latestPost.id}`; + let latestEmbedding = await redisClient.get(latestCacheKey).then((s: any) => (s ? JSON.parse(s) : null)); + if (!latestEmbedding) { + latestEmbedding = await getEmbeddingWithRetry(latestPost.content); + await redisClient.set(latestCacheKey, JSON.stringify(latestEmbedding), { EX: 86400 }); + } + console.log(latestEmbedding); + + for (let i = 0; i < posts.length - 1; i++) { + const otherPost = posts[i]; + const otherCacheKey = `emb:${otherPost.id}`; + let otherEmbedding = await redisClient.get(otherCacheKey).then((s: any) => (s ? JSON.parse(s) : null)); + if (!otherEmbedding) { + otherEmbedding = await getEmbeddingWithRetry(otherPost.content); + await redisClient.set(otherCacheKey, JSON.stringify(otherEmbedding), { EX: 86400 }); + } + const similarity = cosineSimilarity(latestEmbedding, otherEmbedding); + if (similarity > 0.95) { + console.log(`Duplicate detected: ${latestPost.id} is similar to post ${otherPost.id}`); + // Remove the duplicate post (latestPost) from the array + const updatedPosts = posts.filter((p: any) => p.id !== latestPost.id); + await redisClient.set('posts', JSON.stringify(updatedPosts)); + console.log(`Removed duplicate post ${latestPost.id} from Redis.`); + break; + } + console.log('similarity:', similarity.toFixed(4)); + } + } + } finally { + try { + await redisClient.del(lockKey); + } catch {} + } + }); + + console.log('Listening for changes to posts array and deduping using embeddings...'); +} + +main().catch(console.error); diff --git a/src/redisSeed.ts b/src/redis/redisSeed.ts similarity index 50% rename from src/redisSeed.ts rename to src/redis/redisSeed.ts index 4f66fd5..7ab7325 100644 --- a/src/redisSeed.ts +++ b/src/redis/redisSeed.ts @@ -44,3 +44,49 @@ async function seedRedis() { } seedRedis(); +import { initRedis } from './redisClient.js'; +import * as fs from 'fs'; +import * as path from 'path'; + +type InputPost = { + id: string; + content: string; + url: string; + created_at: string; + author: { name: string; handle: string; pfpUrl: string } | null; +}; + +async function seedRedis() { + const redisClient = await initRedis(); + + // Try to read user-provided input from data/sample_posts.json (project root) + const dataPath = path.resolve(process.cwd(), 'data', 'sample_posts.json'); + let inputPosts: InputPost[] = []; + + if (fs.existsSync(dataPath)) { + try { + const raw = fs.readFileSync(dataPath, 'utf8'); + inputPosts = JSON.parse(raw); + console.log(`Loaded ${inputPosts.length} posts from data/sample_posts.json`); + } catch (err) { + console.error('Failed to parse data/sample_posts.json, falling back to bundled sample:', err); + } + } + + try { + await redisClient.set('posts', JSON.stringify(inputPosts)); + console.log(`Redis seeding done! Seeded ${inputPosts.length} posts.`); + } finally { + // Always attempt to close the client. If disconnect fails, surface the error + // after attempting to close (don't swallow fatal errors silently). + try { + await redisClient.quit(); + } catch (err) { + console.error('Failed to disconnect Redis client:', err); + // rethrow so callers see the original failure if needed + throw err; + } + } +} + +seedRedis(); diff --git a/src/redisTest.ts b/src/redis/redisTest.ts similarity index 51% rename from src/redisTest.ts rename to src/redis/redisTest.ts index 204f18e..bb9318c 100644 --- a/src/redisTest.ts +++ b/src/redis/redisTest.ts @@ -1,3 +1,15 @@ +import { initRedis } from './redisClient.js'; + +async function testRedis() { + const client = await initRedis(); + + const pong = await client.ping(); + console.log('Ping Response:', pong); // Output: "PONG" means connection success + + await client.disconnect(); +} + +testRedis(); import { initRedis, getRedisClient } from './redisClient.js'; async function testRedis() { diff --git a/src/redisCheck.ts b/src/redisCheck.ts deleted file mode 100644 index 99dbbab..0000000 --- a/src/redisCheck.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { initRedis } from './redisClient.js'; - -async function checkRedisSeed() { - const client = await initRedis(); - try { - const posts = JSON.parse((await client.get('posts')) || '[]'); - console.log(JSON.stringify(posts)); - } catch (err) { - console.error('Error checking Redis seed:', err); - } finally { - await client.disconnect(); - } -} - -checkRedisSeed(); diff --git a/src/redisDedupeListener.ts b/src/redisDedupeListener.ts deleted file mode 100644 index 743c670..0000000 --- a/src/redisDedupeListener.ts +++ /dev/null @@ -1,100 +0,0 @@ -import { initRedis } from './redisClient.js'; -import { getEmbeddingWithRetry } from './embeddingTest.js'; - -function cosineSimilarity(a: number[], b: number[]): number { - if (!Array.isArray(a) || !Array.isArray(b) || a.length !== b.length) return 0; - let dot = 0, - na = 0, - nb = 0; - for (let i = 0; i < a.length; i++) { - const ai = a[i] ?? 0; - const bi = b[i] ?? 0; - dot += ai * bi; - na += ai * ai; - nb += bi * bi; - } - if (na === 0 || nb === 0) return 0; - return dot / (Math.sqrt(na) * Math.sqrt(nb)); -} - -async function main() { - const redisClient = await initRedis(); - const redisSubscriber = redisClient.duplicate(); - await redisSubscriber.connect(); - - // First, let's verify keyspace notifications are enabled - const config = await redisClient.configGet('notify-keyspace-events'); - - { - const current = config['notify-keyspace-events'] ?? ''; - const required = ['K', 'E', 'A']; - const merged = Array.from(new Set([...current, ...required])).join(''); - if (merged !== current) { - console.log('Merging keyspace notification flags:', merged); - await redisClient.configSet('notify-keyspace-events', merged); - console.log('Keyspace notifications updated.'); - } - } - - // Also try keyspace pattern (different format) - await redisSubscriber.pSubscribe('__keyspace@0__:posts', async (message, pattern) => { - const lockKey = 'dedupe:lock:posts'; - const gotLock = await redisClient.set(lockKey, '1', { NX: true, PX: 10000 }); - if (!gotLock) return; - try { - if (message === 'set') { - console.log('Posts key was set via keyspace, processing update...'); - const postsRaw = await redisClient.get('posts'); - if (!postsRaw) return; - let posts: unknown; - try { - posts = JSON.parse(postsRaw); - } catch (e) { - console.error('Failed to parse posts JSON:', e); - return; - } - if (!Array.isArray(posts)) { - console.warn('Expected posts array; got:', typeof posts); - return; - } - // ...existing deduplication logic... - const latestPost = posts[posts.length - 1]; // Assume last is new - const latestCacheKey = `emb:${latestPost.id}`; - let latestEmbedding = await redisClient.get(latestCacheKey).then((s: any) => (s ? JSON.parse(s) : null)); - if (!latestEmbedding) { - latestEmbedding = await getEmbeddingWithRetry(latestPost.content); - await redisClient.set(latestCacheKey, JSON.stringify(latestEmbedding), { EX: 86400 }); - } - console.log(latestEmbedding); - - for (let i = 0; i < posts.length - 1; i++) { - const otherPost = posts[i]; - const otherCacheKey = `emb:${otherPost.id}`; - let otherEmbedding = await redisClient.get(otherCacheKey).then((s: any) => (s ? JSON.parse(s) : null)); - if (!otherEmbedding) { - otherEmbedding = await getEmbeddingWithRetry(otherPost.content); - await redisClient.set(otherCacheKey, JSON.stringify(otherEmbedding), { EX: 86400 }); - } - const similarity = cosineSimilarity(latestEmbedding, otherEmbedding); - if (similarity > 0.95) { - console.log(`Duplicate detected: ${latestPost.id} is similar to post ${otherPost.id}`); - // Remove the duplicate post (latestPost) from the array - const updatedPosts = posts.filter((p: any) => p.id !== latestPost.id); - await redisClient.set('posts', JSON.stringify(updatedPosts)); - console.log(`Removed duplicate post ${latestPost.id} from Redis.`); - break; - } - console.log('similarity:', similarity.toFixed(4)); - } - } - } finally { - try { - await redisClient.del(lockKey); - } catch {} - } - }); - - console.log('Listening for changes to posts array and deduping using embeddings...'); -} - -main().catch(console.error); diff --git a/src/seed.ts b/src/seed/seed.ts similarity index 100% rename from src/seed.ts rename to src/seed/seed.ts diff --git a/src/seedData.ts b/src/seed/seedData.ts similarity index 97% rename from src/seedData.ts rename to src/seed/seedData.ts index c836481..272bde4 100644 --- a/src/seedData.ts +++ b/src/seed/seedData.ts @@ -1,4 +1,4 @@ -import { PostGroup } from './postGroup'; +import { PostGroup } from '../redis/postGroup'; export const seedData: PostGroup[] = [ { diff --git a/src/seedDatabase.ts b/src/seed/seedDatabase.ts similarity index 95% rename from src/seedDatabase.ts rename to src/seed/seedDatabase.ts index 28538e5..3d0b614 100644 --- a/src/seedDatabase.ts +++ b/src/seed/seedDatabase.ts @@ -1,6 +1,6 @@ -import { initRedis, getRedisClient } from './redisClient'; +import { initRedis, getRedisClient } from '../redis/redisClient'; import { seedData } from './seedData'; -import { PostGroup } from './postGroup'; +import { PostGroup } from '../redis/postGroup'; /** * Seeds the Upstash Redis database with mock data for PostGroups. diff --git a/src/embeddingTest.ts b/src/test/embeddingTest.ts similarity index 97% rename from src/embeddingTest.ts rename to src/test/embeddingTest.ts index 771725c..3488c03 100644 --- a/src/embeddingTest.ts +++ b/src/test/embeddingTest.ts @@ -1,4 +1,4 @@ -import openai from './openaiClient.js'; +import openai from '../openai/openaiClient.js'; export async function getEmbeddingWithRetry(input: string, opts?: { model?: string; maxRetries?: number }) { const model = opts?.model ?? 'text-embedding-3-small';