Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/backend/scripts/clickhouse-migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export async function runClickhouseMigrations() {
await client.exec({ query: TOKEN_REFRESH_EVENT_ROW_FORMAT_MUTATION_SQL });
await client.exec({ query: BACKFILL_REFRESH_TOKEN_ID_COLUMN_SQL });
await client.exec({ query: SIGN_UP_RULE_TRIGGER_EVENT_ROW_FORMAT_MUTATION_SQL });
// Recreate the events view so SELECT * picks up columns added by EVENTS_ADD_REPLAY_COLUMNS_SQL
await client.exec({ query: EVENTS_VIEW_SQL });
const queries = [
"REVOKE ALL PRIVILEGES ON *.* FROM limited_user;",
"REVOKE ALL FROM limited_user;",
Expand Down
98 changes: 98 additions & 0 deletions apps/backend/src/app/api/latest/analytics/events/batch/route.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { getClickhouseAdminClient } from "@/lib/clickhouse";
import { findRecentSessionReplay } from "@/lib/session-replays";
import { getPrismaClientForTenancy } from "@/prisma-client";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { KnownErrors } from "@stackframe/stack-shared";
import { adaptSchema, clientOrHigherAuthTypeSchema, yupArray, yupMixed, yupNumber, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields";
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";

const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[089ab][0-9a-f]{3}-[0-9a-f]{12}$/i;

const MAX_EVENTS = 500;

export const POST = createSmartRouteHandler({
metadata: {
summary: "Upload analytics event batch",
description: "Uploads a batch of auto-captured analytics events ($page-view, $click).",
tags: ["Analytics Events"],
hidden: true,
},
request: yupObject({
auth: yupObject({
type: clientOrHigherAuthTypeSchema,
tenancy: adaptSchema,
user: adaptSchema,
refreshTokenId: adaptSchema,
}).defined(),
body: yupObject({
session_replay_segment_id: yupString().defined().matches(UUID_RE, "Invalid session_replay_segment_id"),
batch_id: yupString().defined().matches(UUID_RE, "Invalid batch_id"),
sent_at_ms: yupNumber().defined().integer().min(0),
events: yupArray(
yupObject({
event_type: yupString().defined().oneOf(["$page-view", "$click"]),
event_at_ms: yupNumber().defined().integer().min(0),
data: yupMixed().defined(),
}).defined(),
).defined().min(1).max(MAX_EVENTS),
}).defined(),
Comment thread
BilalG1 marked this conversation as resolved.
}),
response: yupObject({
statusCode: yupNumber().oneOf([200]).defined(),
bodyType: yupString().oneOf(["json"]).defined(),
body: yupObject({
inserted: yupNumber().defined(),
}).defined(),
}),
async handler({ auth, body }) {
if (!auth.tenancy.config.apps.installed["analytics"]?.enabled) {
throw new KnownErrors.AnalyticsNotEnabled();
}
if (!auth.user) {
throw new KnownErrors.UserAuthenticationRequired();
}
if (!auth.refreshTokenId) {
throw new StatusError(StatusError.BadRequest, "A refresh token is required for analytics events");
}
Comment thread
BilalG1 marked this conversation as resolved.

const projectId = auth.tenancy.project.id;
const branchId = auth.tenancy.branchId;
const userId = auth.user.id;
const refreshTokenId = auth.refreshTokenId;
const tenancyId = auth.tenancy.id;

const prisma = await getPrismaClientForTenancy(auth.tenancy);
const recentSession = await findRecentSessionReplay(prisma, { tenancyId, refreshTokenId });

const clickhouseClient = getClickhouseAdminClient();

const rows = body.events.map((event) => ({
event_type: event.event_type,
event_at: new Date(event.event_at_ms),
data: event.data,
project_id: projectId,
branch_id: branchId,
user_id: userId,
team_id: null,
refresh_token_id: refreshTokenId,
session_replay_id: recentSession?.id ?? null,
session_replay_segment_id: body.session_replay_segment_id,
}));
Comment thread
BilalG1 marked this conversation as resolved.

await clickhouseClient.insert({
table: "analytics_internal.events",
values: rows,
format: "JSONEachRow",
clickhouse_settings: {
date_time_input_format: "best_effort",
async_insert: 1,
},
});

return {
statusCode: 200,
bodyType: "json",
body: { inserted: body.events.length },
};
},
});
31 changes: 3 additions & 28 deletions apps/backend/src/app/api/latest/session-replays/batch/route.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { getPrismaClientForTenancy } from "@/prisma-client";
import { uploadBytes } from "@/s3";
import { createSmartRouteHandler } from "@/route-handlers/smart-route-handler";
import { Prisma } from "@/generated/prisma/client";
import { findRecentSessionReplay } from "@/lib/session-replays";
import { KnownErrors } from "@stackframe/stack-shared";
import { adaptSchema, clientOrHigherAuthTypeSchema, yupArray, yupMixed, yupNumber, yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields";
import { StatusError } from "@stackframe/stack-shared/dist/utils/errors";
Expand All @@ -15,8 +16,6 @@ const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[089ab][0-9a-f]{3}-[0

const MAX_BODY_BYTES = 5_000_000;
const MAX_EVENTS = 5_000;
const SESSION_IDLE_TIMEOUT_MS = 3 * 60 * 1000;
const MAX_SESSION_DURATION_MS = 12 * 60 * 60 * 1000;

function extractEventTimesMs(events: unknown[], fallbackMs: number) {
let minTs = Infinity;
Expand Down Expand Up @@ -72,16 +71,7 @@ export const POST = createSmartRouteHandler({
}),
async handler({ auth, body }, fullReq) {
if (!auth.tenancy.config.apps.installed["analytics"]?.enabled) {
return {
statusCode: 200,
bodyType: "json",
body: {
session_replay_id: "",
batch_id: body.batch_id,
s3_key: "",
deduped: false,
},
};
throw new KnownErrors.AnalyticsNotEnabled();
}
if (!auth.user) {
throw new KnownErrors.UserAuthenticationRequired();
Expand Down Expand Up @@ -114,22 +104,7 @@ export const POST = createSmartRouteHandler({
const { firstMs, lastMs } = extractEventTimesMs(body.events, body.sent_at_ms);

const prisma = await getPrismaClientForTenancy(auth.tenancy);

// Find a recent session replay for this refresh token (temporal grouping).
// If the last batch arrived within SESSION_IDLE_TIMEOUT_MS, reuse that replay.
// Also enforce a max session duration so replays don't grow indefinitely.
const cutoff = new Date(Date.now() - SESSION_IDLE_TIMEOUT_MS);
const maxDurationCutoff = new Date(Date.now() - MAX_SESSION_DURATION_MS);
const recentSession = await prisma.sessionReplay.findFirst({
where: {
tenancyId,
refreshTokenId,
updatedAt: { gte: cutoff },
startedAt: { gte: maxDurationCutoff },
},
orderBy: { updatedAt: "desc" },
select: { id: true, startedAt: true, lastEventAt: true },
});
const recentSession = await findRecentSessionReplay(prisma, { tenancyId, refreshTokenId });

const replayId = recentSession?.id ?? randomUUID();
const s3Key = `session-replays/${projectId}/${branchId}/${replayId}/${batchId}.json.gz`;
Expand Down
23 changes: 23 additions & 0 deletions apps/backend/src/lib/session-replays.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { PrismaClient } from "@/generated/prisma/client";
import { PrismaClientWithReplica } from "@/prisma-client";

export const SESSION_IDLE_TIMEOUT_MS = 3 * 60 * 1000;
export const MAX_SESSION_DURATION_MS = 12 * 60 * 60 * 1000;

export async function findRecentSessionReplay(prisma: PrismaClientWithReplica<PrismaClient>, options: {
tenancyId: string,
refreshTokenId: string,
}) {
const cutoff = new Date(Date.now() - SESSION_IDLE_TIMEOUT_MS);
const maxDurationCutoff = new Date(Date.now() - MAX_SESSION_DURATION_MS);
return await prisma.sessionReplay.findFirst({
where: {
tenancyId: options.tenancyId,
refreshTokenId: options.refreshTokenId,
updatedAt: { gte: cutoff },
startedAt: { gte: maxDurationCutoff },
},
orderBy: { updatedAt: "desc" },
select: { id: true, startedAt: true, lastEventAt: true },
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ export default function PageClient() {
return (
<AppEnabledGuard appId="analytics">
<PageLayout title="Session Replays" fillWidth>
<PanelGroup direction="horizontal" className="h-[calc(100vh-180px)] min-h-[520px] rounded-xl border border-border/40 overflow-hidden bg-background">
<PanelGroup direction="horizontal" className="!h-[calc(100vh-180px)] min-h-[520px] rounded-xl border border-border/40 overflow-hidden bg-background">
<Panel defaultSize={25} minSize={16}>
<div className="h-full flex flex-col">
<div className="shrink-0 px-3 py-2 border-b border-border/30 flex items-center h-10">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,16 +557,18 @@ describe("session-replay-machine", () => {

describe("TOGGLE_PLAY_PAUSE", () => {
it("pauses from playing", () => {
const state = twoTabReadyState({ playbackMode: "playing" });
const state = twoTabReadyState({ playbackMode: "playing", currentGlobalTimeMsForUi: 2500 });
const { state: s, effects } = dispatch(state, { type: "TOGGLE_PLAY_PAUSE", nowMs: 1000 });
expect(s.playbackMode).toBe("paused");
expect(s.pausedAtGlobalMs).toBe(2500);
expect(hasEffect(effects, "pause_all")).toBe(true);
});

it("pauses from gap_fast_forward", () => {
const state = twoTabReadyState({ playbackMode: "gap_fast_forward" });
const state = twoTabReadyState({ playbackMode: "gap_fast_forward", currentGlobalTimeMsForUi: 3000 });
const { state: s } = dispatch(state, { type: "TOGGLE_PLAY_PAUSE", nowMs: 1000 });
expect(s.playbackMode).toBe("paused");
expect(s.pausedAtGlobalMs).toBe(3000);
expect(s.gapFastForward).toBeNull();
});

Expand All @@ -575,9 +577,11 @@ describe("session-replay-machine", () => {
playbackMode: "buffering",
bufferingAtGlobalMs: 1000,
autoResumeAfterBuffering: true,
currentGlobalTimeMsForUi: 1500,
});
const { state: s } = dispatch(state, { type: "TOGGLE_PLAY_PAUSE", nowMs: 1000 });
expect(s.playbackMode).toBe("paused");
expect(s.pausedAtGlobalMs).toBe(1500);
expect(s.bufferingAtGlobalMs).toBeNull();
expect(s.autoResumeAfterBuffering).toBe(false);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ export function replayReducer(state: ReplayState, action: ReplayAction): Reducer
state: {
...state,
playbackMode: "paused",
pausedAtGlobalMs: state.currentGlobalTimeMsForUi,
gapFastForward: null,
bufferingAtGlobalMs: null,
autoResumeAfterBuffering: false,
Expand Down
Loading
Loading