Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 35 additions & 24 deletions .github/workflows/docker-build.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
name: Docker Build and Push

on:
workflow_dispatch:
push:
# branches: [ "main" ]
paths:
- 'apps/api/**'
- 'apps/worker/**'
- 'apps/public/**'
- 'packages/**'
- '!packages/sdks/**'
- '**Dockerfile'
- '.github/workflows/**'
- "apps/api/**"
- "apps/worker/**"
- "apps/public/**"
- "packages/**"
- "!packages/sdks/**"
- "**Dockerfile"
- ".github/workflows/**"

env:
repo_owner: 'openpanel-dev'
repo_owner: "openpanel-dev"

jobs:
changes:
Expand All @@ -27,7 +28,7 @@ jobs:
- uses: dorny/paths-filter@v2
id: filter
with:
base: 'main'
base: "main"
filters: |
api:
- 'apps/api/**'
Expand All @@ -46,17 +47,27 @@ jobs:
needs: changes
if: ${{ needs.changes.outputs.api == 'true' || needs.changes.outputs.worker == 'true' || needs.changes.outputs.public == 'true' }}
runs-on: ubuntu-latest
services:
redis:
image: redis:7-alpine
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping || exit 1"
--health-interval 5s
--health-timeout 3s
--health-retries 20
steps:
- uses: actions/checkout@v4

- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
node-version: "20"

- name: Install pnpm
uses: pnpm/action-setup@v4

- name: Get pnpm store directory
shell: bash
run: |
Expand All @@ -69,29 +80,29 @@ jobs:
key: ${{ runner.os }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-pnpm-store-

- name: Install dependencies
run: pnpm install

- name: Codegen
run: pnpm codegen

# - name: Run Biome
# run: pnpm lint

- name: Run TypeScript checks
run: pnpm typecheck
# - name: Run tests
# run: pnpm test

- name: Run tests
run: pnpm test

build-and-push-api:
permissions:
packages: write
needs: [changes, lint-and-test]
if: ${{ needs.changes.outputs.api == 'true' }}
runs-on: ubuntu-latest
steps:
steps:
- name: Checkout repository
uses: actions/checkout@v4

Expand All @@ -118,14 +129,14 @@ jobs:
ghcr.io/${{ env.repo_owner }}/api:${{ github.sha }}
build-args: |
DATABASE_URL=postgresql://dummy:dummy@localhost:5432/dummy

build-and-push-worker:
permissions:
packages: write
needs: [changes, lint-and-test]
if: ${{ needs.changes.outputs.worker == 'true' }}
runs-on: ubuntu-latest
steps:
steps:
- name: Checkout repository
uses: actions/checkout@v4

Expand All @@ -151,4 +162,4 @@ jobs:
ghcr.io/${{ env.repo_owner }}/worker:latest
ghcr.io/${{ env.repo_owner }}/worker:${{ github.sha }}
build-args: |
DATABASE_URL=postgresql://dummy:dummy@localhost:5432/dummy
DATABASE_URL=postgresql://dummy:dummy@localhost:5432/dummy
1 change: 1 addition & 0 deletions apps/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"fastify": "^5.2.1",
"fastify-metrics": "^12.1.0",
"fastify-raw-body": "^5.0.0",
"groupmq": "1.0.0-next.13",
"ico-to-png": "^0.2.2",
"jsonwebtoken": "^9.0.2",
"ramda": "^0.29.1",
Expand Down
104 changes: 87 additions & 17 deletions apps/api/scripts/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as faker from '@faker-js/faker';
import { generateId } from '@openpanel/common';
import { hashPassword } from '@openpanel/common/server';
import { ClientType, db } from '@openpanel/db';
import { getRedisCache } from '@openpanel/redis';
import { v4 as uuidv4 } from 'uuid';

const DOMAIN_COUNT = 5;
Expand Down Expand Up @@ -260,6 +261,8 @@ function insertFakeEvents(events: Event[]) {
}

async function simultaneousRequests() {
await getRedisCache().flushdb();
await new Promise((resolve) => setTimeout(resolve, 1000));
Comment on lines +264 to +265
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't flush the shared Redis by default.

flushdb() wipes every key in the selected Redis database. Running this helper script against a shared dev/staging instance would erase caches, queues, rate limits—everything—before the mock traffic even starts. Please gate the flush behind an explicit opt-in (env flag or CLI switch) so we cannot accidentally destroy state.

-  await getRedisCache().flushdb();
+  if (process.env.MOCK_FLUSH_REDIS === 'true') {
+    await getRedisCache().flushdb();
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await getRedisCache().flushdb();
await new Promise((resolve) => setTimeout(resolve, 1000));
if (process.env.MOCK_FLUSH_REDIS === 'true') {
await getRedisCache().flushdb();
}
await new Promise((resolve) => setTimeout(resolve, 1000));
🤖 Prompt for AI Agents
In apps/api/scripts/mock.ts around lines 264-265, the code unconditionally calls
getRedisCache().flushdb() which will wipe a shared Redis DB; change this to
require an explicit opt-in before flushing: read an env var (e.g.
MOCK_FLUSH_REDIS==='true') or a CLI flag, log a warning that a destructive flush
will be performed, and only call flushdb() when the flag is set; if not set,
skip the flush and keep the 1s delay (or remove it) so the script is safe to run
against shared dev/staging instances.

const sessions: {
ip: string;
referrer: string;
Expand All @@ -272,9 +275,11 @@ async function simultaneousRequests() {
userAgent:
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
track: [
{ name: 'screen_view', path: '/home' },
{ name: 'button_click', element: 'signup' },
{ name: 'screen_view', path: '/pricing' },
{ name: 'screen_view', path: '/home', parallel: '1' },
{ name: 'button_click', element: 'signup', parallel: '1' },
{ name: 'article_viewed', articleId: '123', parallel: '1' },
{ name: 'screen_view', path: '/pricing', parallel: '1' },
{ name: 'screen_view', path: '/blog', parallel: '1' },
],
},
{
Expand Down Expand Up @@ -361,8 +366,9 @@ async function simultaneousRequests() {
{ name: 'screen_view', path: '/landing' },
{ name: 'screen_view', path: '/pricing' },
{ name: 'screen_view', path: '/blog' },
{ name: 'screen_view', path: '/blog/post-1' },
{ name: 'screen_view', path: '/blog/post-2' },
{ name: 'screen_view', path: '/blog/post-1', parallel: '1' },
{ name: 'screen_view', path: '/blog/post-2', parallel: '1' },
{ name: 'button_click', element: 'learn_more', parallel: '1' },
{ name: 'screen_view', path: '/blog/post-3' },
{ name: 'screen_view', path: '/blog/post-4' },
],
Expand Down Expand Up @@ -396,21 +402,85 @@ async function simultaneousRequests() {
};

for (const session of sessions) {
// Group tracks by parallel flag
const trackGroups: { parallel?: string; tracks: any[] }[] = [];
let currentGroup: { parallel?: string; tracks: any[] } = { tracks: [] };

for (const track of session.track) {
const { name, ...properties } = track;
screenView.track.payload.name = name ?? '';
screenView.track.payload.properties.__referrer = session.referrer ?? '';
if (name === 'screen_view') {
screenView.track.payload.properties.__path =
(screenView.headers.origin ?? '') + (properties.path ?? '');
if (track.parallel) {
// If this track has a parallel flag
if (currentGroup.parallel === track.parallel) {
// Same parallel group, add to current group
currentGroup.tracks.push(track);
} else {
// Different parallel group, finish current group and start new one
if (currentGroup.tracks.length > 0) {
trackGroups.push(currentGroup);
}
currentGroup = { parallel: track.parallel, tracks: [track] };
}
} else {
screenView.track.payload.name = track.name ?? '';
screenView.track.payload.properties = properties;
// No parallel flag, finish any parallel group and start individual track
if (currentGroup.tracks.length > 0) {
trackGroups.push(currentGroup);
}
currentGroup = { tracks: [track] };
}
screenView.headers['x-client-ip'] = session.ip;
screenView.headers['user-agent'] = session.userAgent;
await trackit(screenView);
await new Promise((resolve) => setTimeout(resolve, Math.random() * 5000));
}

// Add the last group
if (currentGroup.tracks.length > 0) {
trackGroups.push(currentGroup);
}

// Process each group
for (const group of trackGroups) {
if (group.parallel && group.tracks.length > 1) {
// Parallel execution for same-flagged tracks
console.log(
`Firing ${group.tracks.length} parallel requests with flag '${group.parallel}'`,
);
const promises = group.tracks.map(async (track) => {
const { name, parallel, ...properties } = track;
const event = JSON.parse(JSON.stringify(screenView));
event.track.payload.name = name ?? '';
event.track.payload.properties.__referrer = session.referrer ?? '';
if (name === 'screen_view') {
event.track.payload.properties.__path =
(event.headers.origin ?? '') + (properties.path ?? '');
} else {
event.track.payload.name = track.name ?? '';
event.track.payload.properties = properties;
}
event.headers['x-client-ip'] = session.ip;
event.headers['user-agent'] = session.userAgent;
return trackit(event);
});

await Promise.all(promises);
console.log(`Completed ${group.tracks.length} parallel requests`);
} else {
// Sequential execution for individual tracks
for (const track of group.tracks) {
const { name, parallel, ...properties } = track;
screenView.track.payload.name = name ?? '';
screenView.track.payload.properties.__referrer =
session.referrer ?? '';
if (name === 'screen_view') {
screenView.track.payload.properties.__path =
(screenView.headers.origin ?? '') + (properties.path ?? '');
} else {
screenView.track.payload.name = track.name ?? '';
screenView.track.payload.properties = properties;
}
screenView.headers['x-client-ip'] = session.ip;
screenView.headers['user-agent'] = session.userAgent;
await trackit(screenView);
}
Comment on lines +463 to +479
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reset event payload per sequential track.

The sequential path keeps mutating the shared screenView template. After a non-screen event runs, its custom properties (e.g. element, articleId) remain on the object, so the next screen_view inherits and sends those stray keys. Clone the template per iteration—like the parallel branch already does—to isolate payloads.

-        for (const track of group.tracks) {
-          const { name, parallel, ...properties } = track;
-          screenView.track.payload.name = name ?? '';
-          screenView.track.payload.properties.__referrer =
-            session.referrer ?? '';
-          if (name === 'screen_view') {
-            screenView.track.payload.properties.__path =
-              (screenView.headers.origin ?? '') + (properties.path ?? '');
-          } else {
-            screenView.track.payload.name = track.name ?? '';
-            screenView.track.payload.properties = properties;
-          }
-          screenView.headers['x-client-ip'] = session.ip;
-          screenView.headers['user-agent'] = session.userAgent;
-          await trackit(screenView);
-        }
+        for (const track of group.tracks) {
+          const { name, parallel: _parallel, ...properties } = track;
+          const event = JSON.parse(JSON.stringify(screenView));
+          event.track.payload.name = name ?? '';
+          event.track.payload.properties.__referrer = session.referrer ?? '';
+          if (name === 'screen_view') {
+            event.track.payload.properties.__path =
+              (event.headers.origin ?? '') + (properties.path ?? '');
+          } else {
+            event.track.payload.properties = properties;
+          }
+          event.headers['x-client-ip'] = session.ip;
+          event.headers['user-agent'] = session.userAgent;
+          await trackit(event);
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Sequential execution for individual tracks
for (const track of group.tracks) {
const { name, parallel, ...properties } = track;
screenView.track.payload.name = name ?? '';
screenView.track.payload.properties.__referrer =
session.referrer ?? '';
if (name === 'screen_view') {
screenView.track.payload.properties.__path =
(screenView.headers.origin ?? '') + (properties.path ?? '');
} else {
screenView.track.payload.name = track.name ?? '';
screenView.track.payload.properties = properties;
}
screenView.headers['x-client-ip'] = session.ip;
screenView.headers['user-agent'] = session.userAgent;
await trackit(screenView);
}
// Sequential execution for individual tracks
for (const track of group.tracks) {
const { name, parallel: _parallel, ...properties } = track;
const event = JSON.parse(JSON.stringify(screenView));
event.track.payload.name = name ?? '';
event.track.payload.properties.__referrer = session.referrer ?? '';
if (name === 'screen_view') {
event.track.payload.properties.__path =
(event.headers.origin ?? '') + (properties.path ?? '');
} else {
event.track.payload.properties = properties;
}
event.headers['x-client-ip'] = session.ip;
event.headers['user-agent'] = session.userAgent;
await trackit(event);
}
🤖 Prompt for AI Agents
In apps/api/scripts/mock.ts around lines 463 to 479 the sequential loop mutates
the shared screenView template so non-screen_event properties persist into
subsequent iterations; fix by creating a fresh clone of the screenView template
at the top of each loop iteration (use a deep clone method consistent with the
project, e.g. structuredClone or JSON.parse(JSON.stringify(...)) if supported),
then set name, properties.__referrer, headers (x-client-ip and user-agent) and
for screen_view set properties.__path on that cloned object before calling await
trackit(clonedScreenView); this ensures each track uses an isolated payload and
prevents stray keys from leaking between iterations.

}

// Add delay between groups (not within parallel groups)
// await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
}
}
}
Expand Down
65 changes: 46 additions & 19 deletions apps/api/src/controllers/event.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import type { FastifyReply, FastifyRequest } from 'fastify';

import { generateDeviceId } from '@openpanel/common/server';
import { getSalts } from '@openpanel/db';
import { eventsQueue } from '@openpanel/queue';
import { getLock } from '@openpanel/redis';
import { eventsGroupQueue, eventsQueue } from '@openpanel/queue';
import { getLock, getRedisCache } from '@openpanel/redis';
import type { PostEventPayload } from '@openpanel/sdk';

import { checkDuplicatedEvent } from '@/utils/deduplicate';
Expand All @@ -17,10 +17,14 @@ export async function postEvent(
}>,
reply: FastifyReply,
) {
const timestamp = getTimestamp(request.timestamp, request.body);
const { timestamp, isTimestampFromThePast } = getTimestamp(
request.timestamp,
request.body,
);
const ip = getClientIp(request)!;
const ua = request.headers['user-agent']!;
const projectId = request.client?.projectId;
const headers = getStringHeaders(request.headers);

if (!projectId) {
reply.status(400).send('missing origin');
Expand Down Expand Up @@ -56,31 +60,54 @@ export async function postEvent(
return;
}

await eventsQueue.add(
'event',
{
type: 'incomingEvent',
payload: {
const isGroupQueue = await getRedisCache().exists('group_queue');
if (isGroupQueue) {
const groupId = request.body?.profileId
? `${projectId}:${request.body?.profileId}`
: currentDeviceId;
await eventsGroupQueue.add({
orderMs: new Date(timestamp).getTime(),
data: {
projectId,
headers: getStringHeaders(request.headers),
headers,
event: {
...request.body,
timestamp: timestamp.timestamp,
isTimestampFromThePast: timestamp.isTimestampFromThePast,
timestamp,
isTimestampFromThePast,
},
geo,
currentDeviceId,
previousDeviceId,
},
},
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 200,
groupId,
});
} else {
await eventsQueue.add(
'event',
{
type: 'incomingEvent',
payload: {
projectId,
headers,
event: {
...request.body,
timestamp,
isTimestampFromThePast,
},
geo,
currentDeviceId,
previousDeviceId,
},
},
},
);
{
attempts: 3,
backoff: {
type: 'exponential',
delay: 200,
},
},
);
}

reply.status(202).send('ok');
}
Loading