From dbbeaeb8e53ecf9c28363b12bbcf201f42e8d962 Mon Sep 17 00:00:00 2001 From: Greg Shear Date: Fri, 22 May 2026 10:49:47 -0400 Subject: [PATCH] control plane: Add service accounts, api keys, gql operations for both (and refresh tokens). also a new token exchange rest api --- .gitignore | 1 + ...7d6beb7bb1771d44e7d4afc541aa3aa2c7cda.json | 14 + ...6124a797ee7d3e80cc193da05be099747b933.json | 22 + ...ff1302480d5745c4586d20b1c877544549841.json | 65 + ...e7ed4e8dbc9d03d44fa76ae34a037d05883a3.json | 60 + ...6df1cbefa4c0749bc4f8076308cdc2316719f.json | 14 + ...0d094c8dd24b2833dca89f0af22398419b65f.json | 58 + ...2961ddc283f0ca9cc17f827220f14d03aba75.json | 14 + ...4eb0fe0c4c01983c16eb9612465189cf3a0ff.json | 14 + ...8d4a1c1971309631e02c33c6575195eca54e0.json | 14 + ...0240638f92d8914e3d891e400f0ab96df93d7.json | 15 + ...1fbe471070810a12f453b3e26b4f1cd70fcc.json} | 4 +- ...ee23b5e0ba04d94375f96cf129614d0824b16.json | 14 + ...d4539ac0d705ab87a2bbc5ee2fde359e9c9ad.json | 111 ++ ...8ddbeeb9ed24bce7ad27f354730b7d486fcc9.json | 29 + ...9d00c3215285f0259adf061fdebbc36bd637f.json | 31 + ...f7b967c364bfdb01f4d7d30ad7a62a1b42131.json | 22 + ...09d20fc68a40bbe860fe46df0ff9ed60fae67.json | 73 + ...2f6e61bc80ef244afcf09d5c5e871866942c3.json | 31 + ...cf3e2d32a6ee387d69af18ea7b06047335376.json | 16 + ...be71088668139743ab8f0e4cc4f3fe7885537.json | 14 + crates/billing-integrations/src/publish.rs | 5 + .../src/server/public/graphql/access.rs | 1420 +++++++++++++++++ .../src/server/public/graphql/mod.rs | 3 + .../src/server/public/mod.rs | 5 + .../src/server/public/token_exchange.rs | 206 +++ crates/flow-client/control-plane-api.graphql | 302 ++++ .../20260528120000_service_accounts.sql | 59 + 28 files changed, 2634 insertions(+), 2 deletions(-) create mode 100644 .sqlx/query-1197b10e75effb22aafe3e2487d7d6beb7bb1771d44e7d4afc541aa3aa2c7cda.json create mode 100644 .sqlx/query-18fb7e773efb6c6ffb4edaa5b836124a797ee7d3e80cc193da05be099747b933.json create mode 100644 .sqlx/query-2d205236bdd8f9618fd54d57c6fff1302480d5745c4586d20b1c877544549841.json create mode 100644 .sqlx/query-3046665e01e7ceca9e5a0b8d14de7ed4e8dbc9d03d44fa76ae34a037d05883a3.json create mode 100644 .sqlx/query-374c4e69a73d24f6bd7dad8da1d6df1cbefa4c0749bc4f8076308cdc2316719f.json create mode 100644 .sqlx/query-437e3f8398b2b33708c1e2b7dc00d094c8dd24b2833dca89f0af22398419b65f.json create mode 100644 .sqlx/query-4cdd6e65a0bca3fbf170d8a4fbe2961ddc283f0ca9cc17f827220f14d03aba75.json create mode 100644 .sqlx/query-5391cdf049766cb28889dbd26164eb0fe0c4c01983c16eb9612465189cf3a0ff.json create mode 100644 .sqlx/query-62a51c9b125fd613cc1716daeeb8d4a1c1971309631e02c33c6575195eca54e0.json create mode 100644 .sqlx/query-75ac1bae73299ed0eee07b64b230240638f92d8914e3d891e400f0ab96df93d7.json rename .sqlx/{query-3284a4b8b8368d36bb286e8aaad6a12642ebfe7815281a62705bd3d7a32e2eb0.json => query-7903341c9acaf216eafd315680b31fbe471070810a12f453b3e26b4f1cd70fcc.json} (51%) create mode 100644 .sqlx/query-99c1b19721958884eb08dab6f0cee23b5e0ba04d94375f96cf129614d0824b16.json create mode 100644 .sqlx/query-9ba006c5050dda138c5336def66d4539ac0d705ab87a2bbc5ee2fde359e9c9ad.json create mode 100644 .sqlx/query-bc2e27f7d6247b463c97893082c8ddbeeb9ed24bce7ad27f354730b7d486fcc9.json create mode 100644 .sqlx/query-c5a6b956c3e5d8b239df41e36f89d00c3215285f0259adf061fdebbc36bd637f.json create mode 100644 .sqlx/query-cb2678905898cd566802b5f77cff7b967c364bfdb01f4d7d30ad7a62a1b42131.json create mode 100644 .sqlx/query-eafefa5f25e28fd4a189bf83bf509d20fc68a40bbe860fe46df0ff9ed60fae67.json create mode 100644 .sqlx/query-ec33ef8d7561b0a224cc8f71ba82f6e61bc80ef244afcf09d5c5e871866942c3.json create mode 100644 .sqlx/query-f7562d1965e61fe0ad91b6daf0ccf3e2d32a6ee387d69af18ea7b06047335376.json create mode 100644 .sqlx/query-fc8161a9908be0a92913daabb38be71088668139743ab8f0e4cc4f3fe7885537.json create mode 100644 crates/control-plane-api/src/server/public/graphql/access.rs create mode 100644 crates/control-plane-api/src/server/public/token_exchange.rs create mode 100644 supabase/migrations/20260528120000_service_accounts.sql diff --git a/.gitignore b/.gitignore index 0c846847fb6..ab2f0e14f7c 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ __pycache__ .claude/* !.claude/skills/ +mise.local.toml diff --git a/.sqlx/query-1197b10e75effb22aafe3e2487d7d6beb7bb1771d44e7d4afc541aa3aa2c7cda.json b/.sqlx/query-1197b10e75effb22aafe3e2487d7d6beb7bb1771d44e7d4afc541aa3aa2c7cda.json new file mode 100644 index 00000000000..24692eeeccf --- /dev/null +++ b/.sqlx/query-1197b10e75effb22aafe3e2487d7d6beb7bb1771d44e7d4afc541aa3aa2c7cda.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM public.user_grants WHERE user_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "1197b10e75effb22aafe3e2487d7d6beb7bb1771d44e7d4afc541aa3aa2c7cda" +} diff --git a/.sqlx/query-18fb7e773efb6c6ffb4edaa5b836124a797ee7d3e80cc193da05be099747b933.json b/.sqlx/query-18fb7e773efb6c6ffb4edaa5b836124a797ee7d3e80cc193da05be099747b933.json new file mode 100644 index 00000000000..2d6e826f582 --- /dev/null +++ b/.sqlx/query-18fb7e773efb6c6ffb4edaa5b836124a797ee7d3e80cc193da05be099747b933.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT ak.service_account_id\n FROM internal.api_keys ak\n WHERE ak.id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "service_account_id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Macaddr8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "18fb7e773efb6c6ffb4edaa5b836124a797ee7d3e80cc193da05be099747b933" +} diff --git a/.sqlx/query-2d205236bdd8f9618fd54d57c6fff1302480d5745c4586d20b1c877544549841.json b/.sqlx/query-2d205236bdd8f9618fd54d57c6fff1302480d5745c4586d20b1c877544549841.json new file mode 100644 index 00000000000..79270cd854b --- /dev/null +++ b/.sqlx/query-2d205236bdd8f9618fd54d57c6fff1302480d5745c4586d20b1c877544549841.json @@ -0,0 +1,65 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO internal.service_accounts (user_id, prefix, capability, display_name, created_by)\n VALUES ($1, $2::text::catalog_prefix, $3, $4, $5)\n RETURNING created_at AS \"created_at!: chrono::DateTime\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "created_at!: chrono::DateTime", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + { + "Custom": { + "name": "grant_capability", + "kind": { + "Enum": [ + "none", + "x_01", + "x_02", + "x_03", + "x_04", + "x_05", + "x_06", + "x_07", + "x_08", + "x_09", + "read", + "x_11", + "x_12", + "x_13", + "x_14", + "x_15", + "x_16", + "x_17", + "x_18", + "x_19", + "write", + "x_21", + "x_22", + "x_23", + "x_24", + "x_25", + "x_26", + "x_27", + "x_28", + "x_29", + "admin" + ] + } + } + }, + "Text", + "Uuid" + ] + }, + "nullable": [ + false + ] + }, + "hash": "2d205236bdd8f9618fd54d57c6fff1302480d5745c4586d20b1c877544549841" +} diff --git a/.sqlx/query-3046665e01e7ceca9e5a0b8d14de7ed4e8dbc9d03d44fa76ae34a037d05883a3.json b/.sqlx/query-3046665e01e7ceca9e5a0b8d14de7ed4e8dbc9d03d44fa76ae34a037d05883a3.json new file mode 100644 index 00000000000..748cdc295eb --- /dev/null +++ b/.sqlx/query-3046665e01e7ceca9e5a0b8d14de7ed4e8dbc9d03d44fa76ae34a037d05883a3.json @@ -0,0 +1,60 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n id AS \"id!: models::Id\",\n detail,\n created_at AS \"created_at!: chrono::DateTime\",\n updated_at AS \"updated_at!: chrono::DateTime\",\n multi_use AS \"multi_use!: bool\",\n valid_for::text AS \"valid_for!: String\",\n uses AS \"uses!: i32\"\n FROM refresh_tokens\n WHERE user_id = $1\n AND ($2::timestamptz IS NULL OR created_at < $2)\n ORDER BY created_at DESC\n LIMIT $3 + 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!: models::Id", + "type_info": "Macaddr8" + }, + { + "ordinal": 1, + "name": "detail", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "created_at!: chrono::DateTime", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "updated_at!: chrono::DateTime", + "type_info": "Timestamptz" + }, + { + "ordinal": 4, + "name": "multi_use!: bool", + "type_info": "Bool" + }, + { + "ordinal": 5, + "name": "valid_for!: String", + "type_info": "Text" + }, + { + "ordinal": 6, + "name": "uses!: i32", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz", + "Int4" + ] + }, + "nullable": [ + false, + true, + false, + false, + true, + null, + true + ] + }, + "hash": "3046665e01e7ceca9e5a0b8d14de7ed4e8dbc9d03d44fa76ae34a037d05883a3" +} diff --git a/.sqlx/query-374c4e69a73d24f6bd7dad8da1d6df1cbefa4c0749bc4f8076308cdc2316719f.json b/.sqlx/query-374c4e69a73d24f6bd7dad8da1d6df1cbefa4c0749bc4f8076308cdc2316719f.json new file mode 100644 index 00000000000..33096fc2153 --- /dev/null +++ b/.sqlx/query-374c4e69a73d24f6bd7dad8da1d6df1cbefa4c0749bc4f8076308cdc2316719f.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM internal.api_keys WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Macaddr8" + ] + }, + "nullable": [] + }, + "hash": "374c4e69a73d24f6bd7dad8da1d6df1cbefa4c0749bc4f8076308cdc2316719f" +} diff --git a/.sqlx/query-437e3f8398b2b33708c1e2b7dc00d094c8dd24b2833dca89f0af22398419b65f.json b/.sqlx/query-437e3f8398b2b33708c1e2b7dc00d094c8dd24b2833dca89f0af22398419b65f.json new file mode 100644 index 00000000000..5437cf9a08f --- /dev/null +++ b/.sqlx/query-437e3f8398b2b33708c1e2b7dc00d094c8dd24b2833dca89f0af22398419b65f.json @@ -0,0 +1,58 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n ak.id AS \"id!: models::Id\",\n ak.service_account_id,\n ak.label,\n ak.created_by,\n ak.created_at AS \"created_at!: chrono::DateTime\",\n ak.expires_at AS \"expires_at!: chrono::DateTime\",\n ak.last_used_at AS \"last_used_at: chrono::DateTime\"\n FROM internal.api_keys ak\n WHERE ak.service_account_id = ANY($1)\n ORDER BY ak.created_at DESC\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!: models::Id", + "type_info": "Macaddr8" + }, + { + "ordinal": 1, + "name": "service_account_id", + "type_info": "Uuid" + }, + { + "ordinal": 2, + "name": "label", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "created_by", + "type_info": "Uuid" + }, + { + "ordinal": 4, + "name": "created_at!: chrono::DateTime", + "type_info": "Timestamptz" + }, + { + "ordinal": 5, + "name": "expires_at!: chrono::DateTime", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "last_used_at: chrono::DateTime", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "UuidArray" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + true + ] + }, + "hash": "437e3f8398b2b33708c1e2b7dc00d094c8dd24b2833dca89f0af22398419b65f" +} diff --git a/.sqlx/query-4cdd6e65a0bca3fbf170d8a4fbe2961ddc283f0ca9cc17f827220f14d03aba75.json b/.sqlx/query-4cdd6e65a0bca3fbf170d8a4fbe2961ddc283f0ca9cc17f827220f14d03aba75.json new file mode 100644 index 00000000000..dd8218b3df2 --- /dev/null +++ b/.sqlx/query-4cdd6e65a0bca3fbf170d8a4fbe2961ddc283f0ca9cc17f827220f14d03aba75.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE internal.api_keys SET last_used_at = now() WHERE id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Macaddr8" + ] + }, + "nullable": [] + }, + "hash": "4cdd6e65a0bca3fbf170d8a4fbe2961ddc283f0ca9cc17f827220f14d03aba75" +} diff --git a/.sqlx/query-5391cdf049766cb28889dbd26164eb0fe0c4c01983c16eb9612465189cf3a0ff.json b/.sqlx/query-5391cdf049766cb28889dbd26164eb0fe0c4c01983c16eb9612465189cf3a0ff.json new file mode 100644 index 00000000000..2e39bb22d2c --- /dev/null +++ b/.sqlx/query-5391cdf049766cb28889dbd26164eb0fe0c4c01983c16eb9612465189cf3a0ff.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM internal.api_keys WHERE service_account_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "5391cdf049766cb28889dbd26164eb0fe0c4c01983c16eb9612465189cf3a0ff" +} diff --git a/.sqlx/query-62a51c9b125fd613cc1716daeeb8d4a1c1971309631e02c33c6575195eca54e0.json b/.sqlx/query-62a51c9b125fd613cc1716daeeb8d4a1c1971309631e02c33c6575195eca54e0.json new file mode 100644 index 00000000000..2cdd3409efb --- /dev/null +++ b/.sqlx/query-62a51c9b125fd613cc1716daeeb8d4a1c1971309631e02c33c6575195eca54e0.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE internal.service_accounts SET disabled_at = now(), updated_at = now() WHERE user_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "62a51c9b125fd613cc1716daeeb8d4a1c1971309631e02c33c6575195eca54e0" +} diff --git a/.sqlx/query-75ac1bae73299ed0eee07b64b230240638f92d8914e3d891e400f0ab96df93d7.json b/.sqlx/query-75ac1bae73299ed0eee07b64b230240638f92d8914e3d891e400f0ab96df93d7.json new file mode 100644 index 00000000000..5ddaca91202 --- /dev/null +++ b/.sqlx/query-75ac1bae73299ed0eee07b64b230240638f92d8914e3d891e400f0ab96df93d7.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM refresh_tokens WHERE id = $1 AND user_id = $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Macaddr8", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "75ac1bae73299ed0eee07b64b230240638f92d8914e3d891e400f0ab96df93d7" +} diff --git a/.sqlx/query-3284a4b8b8368d36bb286e8aaad6a12642ebfe7815281a62705bd3d7a32e2eb0.json b/.sqlx/query-7903341c9acaf216eafd315680b31fbe471070810a12f453b3e26b4f1cd70fcc.json similarity index 51% rename from .sqlx/query-3284a4b8b8368d36bb286e8aaad6a12642ebfe7815281a62705bd3d7a32e2eb0.json rename to .sqlx/query-7903341c9acaf216eafd315680b31fbe471070810a12f453b3e26b4f1cd70fcc.json index dbaada791a7..3a12243cf30 100644 --- a/.sqlx/query-3284a4b8b8368d36bb286e8aaad6a12642ebfe7815281a62705bd3d7a32e2eb0.json +++ b/.sqlx/query-7903341c9acaf216eafd315680b31fbe471070810a12f453b3e26b4f1cd70fcc.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n select users.email as email\n from user_grants\n join auth.users as users on user_grants.user_id = users.id\n where users.email is not null and user_grants.object_role = $1\n and user_grants.capability = 'admin'\n order by users.created_at asc\n ", + "query": "\n select users.email as email\n from user_grants\n join auth.users as users on user_grants.user_id = users.id\n where users.email is not null and user_grants.object_role = $1\n and user_grants.capability = 'admin'\n -- Exclude service accounts: their synthetic sa+ addresses\n -- must never be chosen as a tenant's Stripe billing contact.\n and not exists (\n select 1 from internal.service_accounts sa where sa.user_id = users.id\n )\n order by users.created_at asc\n ", "describe": { "columns": [ { @@ -18,5 +18,5 @@ true ] }, - "hash": "3284a4b8b8368d36bb286e8aaad6a12642ebfe7815281a62705bd3d7a32e2eb0" + "hash": "7903341c9acaf216eafd315680b31fbe471070810a12f453b3e26b4f1cd70fcc" } diff --git a/.sqlx/query-99c1b19721958884eb08dab6f0cee23b5e0ba04d94375f96cf129614d0824b16.json b/.sqlx/query-99c1b19721958884eb08dab6f0cee23b5e0ba04d94375f96cf129614d0824b16.json new file mode 100644 index 00000000000..b4425df0789 --- /dev/null +++ b/.sqlx/query-99c1b19721958884eb08dab6f0cee23b5e0ba04d94375f96cf129614d0824b16.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE internal.service_accounts SET last_used_at = now() WHERE user_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "99c1b19721958884eb08dab6f0cee23b5e0ba04d94375f96cf129614d0824b16" +} diff --git a/.sqlx/query-9ba006c5050dda138c5336def66d4539ac0d705ab87a2bbc5ee2fde359e9c9ad.json b/.sqlx/query-9ba006c5050dda138c5336def66d4539ac0d705ab87a2bbc5ee2fde359e9c9ad.json new file mode 100644 index 00000000000..934902752f2 --- /dev/null +++ b/.sqlx/query-9ba006c5050dda138c5336def66d4539ac0d705ab87a2bbc5ee2fde359e9c9ad.json @@ -0,0 +1,111 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n sa.user_id,\n sa.display_name,\n sa.prefix AS \"prefix!: String\",\n sa.capability AS \"capability!: models::Capability\",\n sa.created_by,\n sa.created_at AS \"created_at!: chrono::DateTime\",\n sa.updated_at AS \"updated_at!: chrono::DateTime\",\n sa.last_used_at AS \"last_used_at: chrono::DateTime\",\n sa.disabled_at AS \"disabled_at: chrono::DateTime\"\n FROM internal.service_accounts sa\n WHERE sa.prefix::text ^@ ANY($1)\n AND ($2::timestamptz IS NULL OR sa.created_at < $2)\n ORDER BY sa.created_at DESC\n LIMIT $3 + 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "user_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "display_name", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "prefix!: String", + "type_info": "Text" + }, + { + "ordinal": 3, + "name": "capability!: models::Capability", + "type_info": { + "Custom": { + "name": "grant_capability", + "kind": { + "Enum": [ + "none", + "x_01", + "x_02", + "x_03", + "x_04", + "x_05", + "x_06", + "x_07", + "x_08", + "x_09", + "read", + "x_11", + "x_12", + "x_13", + "x_14", + "x_15", + "x_16", + "x_17", + "x_18", + "x_19", + "write", + "x_21", + "x_22", + "x_23", + "x_24", + "x_25", + "x_26", + "x_27", + "x_28", + "x_29", + "admin" + ] + } + } + } + }, + { + "ordinal": 4, + "name": "created_by", + "type_info": "Uuid" + }, + { + "ordinal": 5, + "name": "created_at!: chrono::DateTime", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "updated_at!: chrono::DateTime", + "type_info": "Timestamptz" + }, + { + "ordinal": 7, + "name": "last_used_at: chrono::DateTime", + "type_info": "Timestamptz" + }, + { + "ordinal": 8, + "name": "disabled_at: chrono::DateTime", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "TextArray", + "Timestamptz", + "Int4" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + true, + true + ] + }, + "hash": "9ba006c5050dda138c5336def66d4539ac0d705ab87a2bbc5ee2fde359e9c9ad" +} diff --git a/.sqlx/query-bc2e27f7d6247b463c97893082c8ddbeeb9ed24bce7ad27f354730b7d486fcc9.json b/.sqlx/query-bc2e27f7d6247b463c97893082c8ddbeeb9ed24bce7ad27f354730b7d486fcc9.json new file mode 100644 index 00000000000..bce19db1707 --- /dev/null +++ b/.sqlx/query-bc2e27f7d6247b463c97893082c8ddbeeb9ed24bce7ad27f354730b7d486fcc9.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n ak.service_account_id,\n sa.disabled_at\n FROM internal.api_keys ak\n JOIN internal.service_accounts sa ON sa.user_id = ak.service_account_id\n WHERE ak.id = $1\n AND ak.secret_hash = crypt($2, ak.secret_hash)\n AND ak.expires_at > now()\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "service_account_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "disabled_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Macaddr8", + "Text" + ] + }, + "nullable": [ + false, + true + ] + }, + "hash": "bc2e27f7d6247b463c97893082c8ddbeeb9ed24bce7ad27f354730b7d486fcc9" +} diff --git a/.sqlx/query-c5a6b956c3e5d8b239df41e36f89d00c3215285f0259adf061fdebbc36bd637f.json b/.sqlx/query-c5a6b956c3e5d8b239df41e36f89d00c3215285f0259adf061fdebbc36bd637f.json new file mode 100644 index 00000000000..7b7621d3958 --- /dev/null +++ b/.sqlx/query-c5a6b956c3e5d8b239df41e36f89d00c3215285f0259adf061fdebbc36bd637f.json @@ -0,0 +1,31 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH new_token AS (\n SELECT gen_random_uuid()::text AS secret\n )\n INSERT INTO refresh_tokens (user_id, multi_use, valid_for, hash, detail)\n SELECT\n $1,\n $2,\n $3::text::interval,\n crypt(nt.secret, gen_salt('bf')),\n $4\n FROM new_token nt\n RETURNING\n id AS \"id!: models::Id\",\n (SELECT secret FROM new_token) AS \"secret!: String\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!: models::Id", + "type_info": "Macaddr8" + }, + { + "ordinal": 1, + "name": "secret!: String", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Bool", + "Text", + "Text" + ] + }, + "nullable": [ + false, + null + ] + }, + "hash": "c5a6b956c3e5d8b239df41e36f89d00c3215285f0259adf061fdebbc36bd637f" +} diff --git a/.sqlx/query-cb2678905898cd566802b5f77cff7b967c364bfdb01f4d7d30ad7a62a1b42131.json b/.sqlx/query-cb2678905898cd566802b5f77cff7b967c364bfdb01f4d7d30ad7a62a1b42131.json new file mode 100644 index 00000000000..8bb93fbf4a9 --- /dev/null +++ b/.sqlx/query-cb2678905898cd566802b5f77cff7b967c364bfdb01f4d7d30ad7a62a1b42131.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT $1::text::interval > interval '0'\n AND $1::text::interval <= interval '1 year' AS \"ok!: bool\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "ok!: bool", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "cb2678905898cd566802b5f77cff7b967c364bfdb01f4d7d30ad7a62a1b42131" +} diff --git a/.sqlx/query-eafefa5f25e28fd4a189bf83bf509d20fc68a40bbe860fe46df0ff9ed60fae67.json b/.sqlx/query-eafefa5f25e28fd4a189bf83bf509d20fc68a40bbe860fe46df0ff9ed60fae67.json new file mode 100644 index 00000000000..e8e88fea744 --- /dev/null +++ b/.sqlx/query-eafefa5f25e28fd4a189bf83bf509d20fc68a40bbe860fe46df0ff9ed60fae67.json @@ -0,0 +1,73 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n prefix AS \"prefix!: String\",\n capability AS \"capability!: models::Capability\",\n disabled_at AS \"disabled_at: chrono::DateTime\"\n FROM internal.service_accounts\n WHERE user_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "prefix!: String", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "capability!: models::Capability", + "type_info": { + "Custom": { + "name": "grant_capability", + "kind": { + "Enum": [ + "none", + "x_01", + "x_02", + "x_03", + "x_04", + "x_05", + "x_06", + "x_07", + "x_08", + "x_09", + "read", + "x_11", + "x_12", + "x_13", + "x_14", + "x_15", + "x_16", + "x_17", + "x_18", + "x_19", + "write", + "x_21", + "x_22", + "x_23", + "x_24", + "x_25", + "x_26", + "x_27", + "x_28", + "x_29", + "admin" + ] + } + } + } + }, + { + "ordinal": 2, + "name": "disabled_at: chrono::DateTime", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + true + ] + }, + "hash": "eafefa5f25e28fd4a189bf83bf509d20fc68a40bbe860fe46df0ff9ed60fae67" +} diff --git a/.sqlx/query-ec33ef8d7561b0a224cc8f71ba82f6e61bc80ef244afcf09d5c5e871866942c3.json b/.sqlx/query-ec33ef8d7561b0a224cc8f71ba82f6e61bc80ef244afcf09d5c5e871866942c3.json new file mode 100644 index 00000000000..58dd96db4b5 --- /dev/null +++ b/.sqlx/query-ec33ef8d7561b0a224cc8f71ba82f6e61bc80ef244afcf09d5c5e871866942c3.json @@ -0,0 +1,31 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH new_key AS (\n SELECT\n internal.id_generator() AS id,\n gen_random_uuid()::text AS secret\n )\n INSERT INTO internal.api_keys (id, service_account_id, secret_hash, label, expires_at, created_by)\n SELECT\n nk.id,\n $1,\n crypt(nk.secret, gen_salt('bf')),\n $2,\n now() + $3::text::interval,\n $4\n FROM new_key nk\n RETURNING\n id AS \"id!: models::Id\",\n (SELECT secret FROM new_key) AS \"secret!: String\"\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id!: models::Id", + "type_info": "Macaddr8" + }, + { + "ordinal": 1, + "name": "secret!: String", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Text", + "Uuid" + ] + }, + "nullable": [ + false, + null + ] + }, + "hash": "ec33ef8d7561b0a224cc8f71ba82f6e61bc80ef244afcf09d5c5e871866942c3" +} diff --git a/.sqlx/query-f7562d1965e61fe0ad91b6daf0ccf3e2d32a6ee387d69af18ea7b06047335376.json b/.sqlx/query-f7562d1965e61fe0ad91b6daf0ccf3e2d32a6ee387d69af18ea7b06047335376.json new file mode 100644 index 00000000000..d59830a69f5 --- /dev/null +++ b/.sqlx/query-f7562d1965e61fe0ad91b6daf0ccf3e2d32a6ee387d69af18ea7b06047335376.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO auth.users (id, email, raw_user_meta_data)\n VALUES ($1, $2, $3)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Varchar", + "Jsonb" + ] + }, + "nullable": [] + }, + "hash": "f7562d1965e61fe0ad91b6daf0ccf3e2d32a6ee387d69af18ea7b06047335376" +} diff --git a/.sqlx/query-fc8161a9908be0a92913daabb38be71088668139743ab8f0e4cc4f3fe7885537.json b/.sqlx/query-fc8161a9908be0a92913daabb38be71088668139743ab8f0e4cc4f3fe7885537.json new file mode 100644 index 00000000000..9fbc9b4a427 --- /dev/null +++ b/.sqlx/query-fc8161a9908be0a92913daabb38be71088668139743ab8f0e4cc4f3fe7885537.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE internal.service_accounts SET disabled_at = NULL, updated_at = now() WHERE user_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "fc8161a9908be0a92913daabb38be71088668139743ab8f0e4cc4f3fe7885537" +} diff --git a/crates/billing-integrations/src/publish.rs b/crates/billing-integrations/src/publish.rs index ebe31955887..ddba8d31959 100644 --- a/crates/billing-integrations/src/publish.rs +++ b/crates/billing-integrations/src/publish.rs @@ -825,6 +825,11 @@ async fn get_or_create_customer_for_tenant( join auth.users as users on user_grants.user_id = users.id where users.email is not null and user_grants.object_role = $1 and user_grants.capability = 'admin' + -- Exclude service accounts: their synthetic sa+ addresses + -- must never be chosen as a tenant's Stripe billing contact. + and not exists ( + select 1 from internal.service_accounts sa where sa.user_id = users.id + ) order by users.created_at asc "#, tenant diff --git a/crates/control-plane-api/src/server/public/graphql/access.rs b/crates/control-plane-api/src/server/public/graphql/access.rs new file mode 100644 index 00000000000..9dcc954dd35 --- /dev/null +++ b/crates/control-plane-api/src/server/public/graphql/access.rs @@ -0,0 +1,1420 @@ +use super::{TimestampCursor, filters}; +use async_graphql::{Context, types::connection}; + +#[derive(Debug, Clone, async_graphql::SimpleObject)] +pub struct ServiceAccount { + // Exposed as `id`: a service account's identifier happens to be its + // backing auth.users id, but that's an implementation detail we don't + // surface in the public schema. + #[graphql(name = "id")] + pub user_id: uuid::Uuid, + pub display_name: String, + pub prefix: models::Prefix, + pub capability: models::Capability, + pub created_by: uuid::Uuid, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, + pub last_used_at: Option>, + pub disabled_at: Option>, + pub api_keys: Vec, +} + +#[derive(Debug, Clone, async_graphql::SimpleObject)] +pub struct ApiKeyInfo { + #[graphql(name = "id")] + pub key_id: models::Id, + pub label: String, + pub created_by: uuid::Uuid, + pub created_at: chrono::DateTime, + pub expires_at: chrono::DateTime, + pub last_used_at: Option>, +} + +#[derive(Debug, Clone, async_graphql::SimpleObject)] +pub struct CreateApiKeyResult { + #[graphql(name = "id")] + pub key_id: models::Id, + pub secret: String, +} + +#[derive(Debug, Clone, async_graphql::SimpleObject)] +pub struct RefreshTokenResult { + pub id: models::Id, + pub secret: String, +} + +#[derive(Debug, Clone, async_graphql::SimpleObject)] +pub struct RefreshTokenInfo { + pub id: models::Id, + pub detail: Option, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, + pub multi_use: bool, + pub valid_for: String, + pub uses: i32, +} + +pub type PaginatedServiceAccounts = connection::Connection< + TimestampCursor, + ServiceAccount, + connection::EmptyFields, + connection::EmptyFields, + connection::DefaultConnectionName, + connection::DefaultEdgeName, + connection::DisableNodesField, +>; + +pub type PaginatedRefreshTokens = connection::Connection< + TimestampCursor, + RefreshTokenInfo, + connection::EmptyFields, + connection::EmptyFields, + connection::DefaultConnectionName, + connection::DefaultEdgeName, + connection::DisableNodesField, +>; + +#[derive(Debug, Clone, Default, async_graphql::InputObject)] +pub struct ServiceAccountsFilter { + pub catalog_prefix: Option, +} + +#[derive(Debug, Default)] +pub struct AccessQuery; + +const DEFAULT_PAGE_SIZE: usize = 25; +const MAX_PREFIXES: usize = 20; + +#[async_graphql::Object] +impl AccessQuery { + async fn service_accounts( + &self, + ctx: &Context<'_>, + filter: Option, + after: Option, + first: Option, + ) -> async_graphql::Result { + let env = ctx.data::()?; + + let prefix_starts_with = filter + .and_then(|f| f.catalog_prefix) + .and_then(|f| f.starts_with); + + let admin_prefixes = super::authorized_prefixes::authorized_prefixes( + &env.snapshot().role_grants, + &env.snapshot().user_grants, + env.claims()?.sub, + models::Capability::Admin, + prefix_starts_with.as_deref(), + ); + + if admin_prefixes.is_empty() { + return Ok(PaginatedServiceAccounts::new(false, false)); + } + if admin_prefixes.len() > MAX_PREFIXES { + return Err(async_graphql::Error::new( + "Too many admin prefixes; narrow results with a prefix filter", + )); + } + + connection::query_with::( + after, + None, + first, + None, + |after, _, first, _| async move { + let after_created_at = after.map(|c| c.0); + let limit = first.unwrap_or(DEFAULT_PAGE_SIZE); + + let sa_rows = sqlx::query!( + r#" + SELECT + sa.user_id, + sa.display_name, + sa.prefix AS "prefix!: String", + sa.capability AS "capability!: models::Capability", + sa.created_by, + sa.created_at AS "created_at!: chrono::DateTime", + sa.updated_at AS "updated_at!: chrono::DateTime", + sa.last_used_at AS "last_used_at: chrono::DateTime", + sa.disabled_at AS "disabled_at: chrono::DateTime" + FROM internal.service_accounts sa + WHERE sa.prefix::text ^@ ANY($1) + AND ($2::timestamptz IS NULL OR sa.created_at < $2) + ORDER BY sa.created_at DESC + LIMIT $3 + 1 + "#, + &admin_prefixes, + after_created_at, + limit as i64, + ) + .fetch_all(&env.pg_pool) + .await?; + + let has_next = sa_rows.len() > limit; + + let user_ids: Vec = + sa_rows.iter().take(limit).map(|r| r.user_id).collect(); + + // Keys are batch-loaded for the whole page in one query (no + // N+1). The tradeoff is that this runs even when the caller + // didn't select `apiKeys`. That's fine for a low-frequency admin + // listing against an indexed column. Revisit with a `DataLoader` + // (a `#[ComplexObject]` api_keys resolver backed by a batching + // loader keyed on service_account_id) if either changes: callers + // commonly list service accounts WITHOUT `apiKeys` (making this + // fetch mostly wasted), or more lazily-resolved per-account child + // collections get added — at which point one batching mechanism + // beats several conditional eager fetches. + let key_rows = if user_ids.is_empty() { + vec![] + } else { + sqlx::query!( + r#" + SELECT + ak.id AS "id!: models::Id", + ak.service_account_id, + ak.label, + ak.created_by, + ak.created_at AS "created_at!: chrono::DateTime", + ak.expires_at AS "expires_at!: chrono::DateTime", + ak.last_used_at AS "last_used_at: chrono::DateTime" + FROM internal.api_keys ak + WHERE ak.service_account_id = ANY($1) + ORDER BY ak.created_at DESC + "#, + &user_ids, + ) + .fetch_all(&env.pg_pool) + .await? + }; + + let mut keys_by_sa: std::collections::HashMap> = + std::collections::HashMap::new(); + for kr in key_rows { + keys_by_sa + .entry(kr.service_account_id) + .or_default() + .push(ApiKeyInfo { + key_id: kr.id, + label: kr.label, + created_by: kr.created_by, + created_at: kr.created_at, + expires_at: kr.expires_at, + last_used_at: kr.last_used_at, + }); + } + + let edges: Vec<_> = sa_rows + .into_iter() + .take(limit) + .map(|r| { + let api_keys = keys_by_sa.remove(&r.user_id).unwrap_or_default(); + connection::Edge::new( + TimestampCursor(r.created_at), + ServiceAccount { + user_id: r.user_id, + display_name: r.display_name, + prefix: models::Prefix::new(&r.prefix), + capability: r.capability, + created_by: r.created_by, + created_at: r.created_at, + updated_at: r.updated_at, + last_used_at: r.last_used_at, + disabled_at: r.disabled_at, + api_keys, + }, + ) + }) + .collect(); + + let mut conn = connection::Connection::new(after_created_at.is_some(), has_next); + conn.edges = edges; + Ok(conn) + }, + ) + .await + } + + /// List refresh tokens owned by the authenticated user. + async fn refresh_tokens( + &self, + ctx: &Context<'_>, + after: Option, + first: Option, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + connection::query_with::( + after, + None, + first, + None, + |after, _, first, _| async move { + let after_created_at = after.map(|c| c.0); + let limit = first.unwrap_or(DEFAULT_PAGE_SIZE); + + let rows = sqlx::query!( + r#" + SELECT + id AS "id!: models::Id", + detail, + created_at AS "created_at!: chrono::DateTime", + updated_at AS "updated_at!: chrono::DateTime", + multi_use AS "multi_use!: bool", + valid_for::text AS "valid_for!: String", + uses AS "uses!: i32" + FROM refresh_tokens + WHERE user_id = $1 + AND ($2::timestamptz IS NULL OR created_at < $2) + ORDER BY created_at DESC + LIMIT $3 + 1 + "#, + claims.sub, + after_created_at, + limit as i64, + ) + .fetch_all(&env.pg_pool) + .await?; + + let has_next = rows.len() > limit; + + let edges: Vec<_> = rows + .into_iter() + .take(limit) + .map(|r| { + connection::Edge::new( + TimestampCursor(r.created_at), + RefreshTokenInfo { + id: r.id, + detail: r.detail, + created_at: r.created_at, + updated_at: r.updated_at, + multi_use: r.multi_use, + valid_for: r.valid_for, + uses: r.uses, + }, + ) + }) + .collect(); + + let mut conn = connection::Connection::new(after_created_at.is_some(), has_next); + conn.edges = edges; + Ok(conn) + }, + ) + .await + } +} + +#[derive(Debug, Default)] +pub struct AccessMutation; + +#[async_graphql::Object] +impl AccessMutation { + /// Create a service account with a grant to the specified prefix. + /// + /// The caller must have admin capability on the prefix. + /// Creates an auth.users row, an internal.service_accounts row, + /// and a user_grants row for the service account. + async fn create_service_account( + &self, + ctx: &Context<'_>, + prefix: models::Prefix, + capability: models::Capability, + display_name: String, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + if let Err(err) = validator::Validate::validate(&prefix) { + return Err(async_graphql::Error::new(format!( + "invalid catalog prefix: {err}" + ))); + } + + // `none` is permitted by the table's check constraint (reserved for the + // future bundles-only path) but confers no access until bundles are + // wired, so reject it here rather than mint a no-op grant. + if capability == models::Capability::None { + return Err(async_graphql::Error::new( + "capability must be one of: read, write, admin", + )); + } + + super::verify_authorization(env, prefix.as_str(), models::Capability::Admin).await?; + + let mut txn = env.pg_pool.begin().await?; + + let sa_user_id = uuid::Uuid::new_v4(); + + sqlx::query!( + r#" + INSERT INTO auth.users (id, email, raw_user_meta_data) + VALUES ($1, $2, $3) + "#, + sa_user_id, + format!("sa+{}@service.estuary.dev", sa_user_id), + serde_json::json!({ + "full_name": display_name, + }), + ) + .execute(&mut *txn) + .await?; + + let now = sqlx::query_scalar!( + r#" + INSERT INTO internal.service_accounts (user_id, prefix, capability, display_name, created_by) + VALUES ($1, $2::text::catalog_prefix, $3, $4, $5) + RETURNING created_at AS "created_at!: chrono::DateTime" + "#, + sa_user_id, + prefix.as_str(), + capability as models::Capability, + display_name, + claims.sub, + ) + .fetch_one(&mut *txn) + .await?; + + crate::grants::upsert_user_grant( + sa_user_id, + prefix.as_str(), + capability, + Some("service account grant".to_string()), + &mut txn, + ) + .await?; + + txn.commit().await?; + + tracing::info!( + %prefix, + ?capability, + %claims.sub, + %sa_user_id, + "created service account" + ); + + Ok(ServiceAccount { + user_id: sa_user_id, + display_name, + prefix, + capability, + created_by: claims.sub, + created_at: now, + updated_at: now, + last_used_at: None, + disabled_at: None, + api_keys: vec![], + }) + } + + /// Disable a service account, revoking all API keys and grants. + /// + /// The caller must have admin capability on the service account's prefix. + /// The auth.users row is preserved for audit trail / FK integrity. + /// + /// Unlike revoking a single key, disabling removes the service account's + /// grants, so access tokens already issued from its keys resolve to zero + /// capability on their next authorization check (bounded by snapshot-refresh + /// lag, not the token's full ~1h lifetime). Use this to cut off an + /// active service account, not just stop new tokens. + async fn disable_service_account( + &self, + ctx: &Context<'_>, + #[graphql(name = "id")] user_id: uuid::Uuid, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + let sa = lookup_service_account(&env.pg_pool, user_id).await?; + super::verify_authorization(env, &sa.prefix, models::Capability::Admin).await?; + + if sa.disabled_at.is_some() { + return Err(async_graphql::Error::new( + "service account is already disabled", + )); + } + + let mut txn = env.pg_pool.begin().await?; + + sqlx::query!( + "UPDATE internal.service_accounts SET disabled_at = now(), updated_at = now() WHERE user_id = $1", + user_id, + ) + .execute(&mut *txn) + .await?; + + sqlx::query!( + "DELETE FROM internal.api_keys WHERE service_account_id = $1", + user_id + ) + .execute(&mut *txn) + .await?; + + sqlx::query!("DELETE FROM public.user_grants WHERE user_id = $1", user_id) + .execute(&mut *txn) + .await?; + + txn.commit().await?; + + tracing::info!( + %user_id, + prefix = %sa.prefix, + %claims.sub, + "disabled service account" + ); + + Ok(true) + } + + /// Re-enable a disabled service account, restoring its user_grants row. + /// + /// Does NOT restore previously revoked API keys — new ones must be minted. + async fn enable_service_account( + &self, + ctx: &Context<'_>, + #[graphql(name = "id")] user_id: uuid::Uuid, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + let sa = lookup_service_account(&env.pg_pool, user_id).await?; + super::verify_authorization(env, &sa.prefix, models::Capability::Admin).await?; + + if sa.disabled_at.is_none() { + return Err(async_graphql::Error::new("service account is not disabled")); + } + + let mut txn = env.pg_pool.begin().await?; + + sqlx::query!( + "UPDATE internal.service_accounts SET disabled_at = NULL, updated_at = now() WHERE user_id = $1", + user_id, + ) + .execute(&mut *txn) + .await?; + + crate::grants::upsert_user_grant( + user_id, + &sa.prefix, + sa.capability, + Some("service account grant".to_string()), + &mut txn, + ) + .await?; + + txn.commit().await?; + + tracing::info!( + %user_id, + prefix = %sa.prefix, + %claims.sub, + "enabled service account" + ); + + Ok(true) + } + + /// Create an API key for a service account. + /// + /// Returns the key_id and the plaintext secret (flow_sa_...). + /// The secret is returned exactly once and cannot be retrieved again. + async fn create_api_key( + &self, + ctx: &Context<'_>, + #[graphql(name = "serviceAccountId")] user_id: uuid::Uuid, + label: String, + #[graphql(desc = "ISO 8601 duration for key validity (e.g. P90D, P1Y)")] valid_for: String, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + let sa = lookup_service_account(&env.pg_pool, user_id).await?; + super::verify_authorization(env, &sa.prefix, models::Capability::Admin).await?; + + if sa.disabled_at.is_some() { + return Err(async_graphql::Error::new( + "cannot create API key for a disabled service account", + )); + } + + // Validate and bound the requested lifetime. Postgres parses the + // ISO 8601 duration; we cap it at one year so a key can't become an + // effectively-permanent credential, and require it to be positive. + let within_bounds = sqlx::query_scalar!( + r#" + SELECT $1::text::interval > interval '0' + AND $1::text::interval <= interval '1 year' AS "ok!: bool" + "#, + valid_for, + ) + .fetch_one(&env.pg_pool) + .await; + + let within_bounds = match within_bounds { + Ok(ok) => ok, + // A malformed duration fails the `::interval` cast (SQLSTATE 22007/ + // 22008); surface that as a client error rather than a 500. + Err(sqlx::Error::Database(db)) + if matches!(db.code().as_deref(), Some("22007") | Some("22008")) => + { + return Err(async_graphql::Error::new( + "invalid valid_for: expected an ISO 8601 duration (e.g. P90D, P1Y)", + )); + } + Err(err) => return Err(err.into()), + }; + + if !within_bounds { + return Err(async_graphql::Error::new( + "valid_for must be a positive duration no greater than 1 year", + )); + } + + let row = sqlx::query!( + r#" + WITH new_key AS ( + SELECT + internal.id_generator() AS id, + gen_random_uuid()::text AS secret + ) + INSERT INTO internal.api_keys (id, service_account_id, secret_hash, label, expires_at, created_by) + SELECT + nk.id, + $1, + crypt(nk.secret, gen_salt('bf')), + $2, + now() + $3::text::interval, + $4 + FROM new_key nk + RETURNING + id AS "id!: models::Id", + (SELECT secret FROM new_key) AS "secret!: String" + "#, + user_id, + label, + valid_for, + claims.sub, + ) + .fetch_one(&env.pg_pool) + .await?; + + use base64::Engine; + let payload = format!("{}:{}", row.id, row.secret); + let encoded = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(payload.as_bytes()); + let full_secret = format!("flow_sa_{encoded}"); + + tracing::info!( + key_id = %row.id, + %user_id, + %label, + %claims.sub, + "created api key for service account" + ); + + Ok(CreateApiKeyResult { + key_id: row.id, + secret: full_secret, + }) + } + + /// Revoke (delete) an API key. + /// + /// The caller must have admin capability on the owning service account's prefix. + /// + /// Revocation stops *new* access tokens from being minted with this key, but + /// does not invalidate access tokens already issued from it — those remain + /// valid until they expire (up to ~1h), since the service account's grants + /// are untouched. To cut off active sessions immediately, disable the + /// service account (which removes its grants). + async fn revoke_api_key( + &self, + ctx: &Context<'_>, + #[graphql(name = "id")] key_id: models::Id, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + let key_owner = sqlx::query!( + r#" + SELECT ak.service_account_id + FROM internal.api_keys ak + WHERE ak.id = $1 + "#, + key_id as models::Id, + ) + .fetch_optional(&env.pg_pool) + .await?; + + let key_owner = match key_owner { + Some(row) => row.service_account_id, + None => return Err(async_graphql::Error::new("API key not found")), + }; + + let sa = lookup_service_account(&env.pg_pool, key_owner).await?; + super::verify_authorization(env, &sa.prefix, models::Capability::Admin).await?; + + sqlx::query!( + "DELETE FROM internal.api_keys WHERE id = $1", + key_id as models::Id + ) + .execute(&env.pg_pool) + .await?; + + tracing::info!( + %key_id, + service_account = %key_owner, + %claims.sub, + "revoked api key" + ); + + Ok(true) + } + + /// Create a refresh token for the authenticated user. + async fn create_refresh_token( + &self, + ctx: &Context<'_>, + #[graphql( + desc = "ISO 8601 duration for token validity (e.g. P90D)", + default_with = "String::from(\"P90D\")" + )] + valid_for: String, + #[graphql(default = true)] multi_use: bool, + #[graphql(default)] detail: Option, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + // Service accounts authenticate exclusively via API keys, which are + // expiring, revocable, and respect the account's disabled state. A + // refresh token bypasses all three, so deny issuance to SA principals. + if is_service_account(&env.pg_pool, claims.sub).await? { + return Err(async_graphql::Error::new( + "service accounts cannot create refresh tokens; authenticate with an API key instead", + )); + } + + let row = sqlx::query!( + r#" + WITH new_token AS ( + SELECT gen_random_uuid()::text AS secret + ) + INSERT INTO refresh_tokens (user_id, multi_use, valid_for, hash, detail) + SELECT + $1, + $2, + $3::text::interval, + crypt(nt.secret, gen_salt('bf')), + $4 + FROM new_token nt + RETURNING + id AS "id!: models::Id", + (SELECT secret FROM new_token) AS "secret!: String" + "#, + claims.sub, + multi_use, + valid_for, + detail.as_deref(), + ) + .fetch_one(&env.pg_pool) + .await?; + + tracing::info!( + refresh_token_id = %row.id, + %claims.sub, + "created refresh token" + ); + + Ok(RefreshTokenResult { + id: row.id, + secret: row.secret, + }) + } + + /// Delete a refresh token owned by the authenticated user. + async fn delete_refresh_token( + &self, + ctx: &Context<'_>, + id: models::Id, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + let result = sqlx::query!( + "DELETE FROM refresh_tokens WHERE id = $1 AND user_id = $2", + id as models::Id, + claims.sub, + ) + .execute(&env.pg_pool) + .await?; + + if result.rows_affected() == 0 { + return Err(async_graphql::Error::new("refresh token not found")); + } + + tracing::info!( + refresh_token_id = %id, + %claims.sub, + "deleted refresh token" + ); + + Ok(true) + } +} + +struct ServiceAccountRow { + prefix: String, + capability: models::Capability, + disabled_at: Option>, +} + +/// Returns whether `user_id` is backed by a service account. Used to deny +/// SA principals operations reserved for human users (e.g. refresh tokens). +async fn is_service_account( + pg_pool: &sqlx::PgPool, + user_id: uuid::Uuid, +) -> async_graphql::Result { + let exists = sqlx::query_scalar!( + r#"SELECT EXISTS(SELECT 1 FROM internal.service_accounts WHERE user_id = $1) AS "exists!""#, + user_id, + ) + .fetch_one(pg_pool) + .await?; + + Ok(exists) +} + +async fn lookup_service_account( + pg_pool: &sqlx::PgPool, + user_id: uuid::Uuid, +) -> async_graphql::Result { + let row = sqlx::query!( + r#" + SELECT + prefix AS "prefix!: String", + capability AS "capability!: models::Capability", + disabled_at AS "disabled_at: chrono::DateTime" + FROM internal.service_accounts + WHERE user_id = $1 + "#, + user_id, + ) + .fetch_optional(pg_pool) + .await?; + + match row { + Some(r) => Ok(ServiceAccountRow { + prefix: r.prefix, + capability: r.capability, + disabled_at: r.disabled_at, + }), + None => Err(async_graphql::Error::new("service account not found")), + } +} + +#[cfg(test)] +mod test { + use crate::test_server; + + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures(path = "../../../fixtures", scripts("data_planes", "alice")) + )] + async fn test_service_account_lifecycle(pool: sqlx::PgPool) { + let _guard = test_server::init(); + + let server = test_server::TestServer::start( + pool.clone(), + test_server::snapshot(pool.clone(), true).await, + ) + .await; + + let alice_token = server.make_access_token( + uuid::Uuid::from_bytes([0x11; 16]), + Some("alice@example.test"), + ); + + // Create a bob user who does NOT have admin on aliceCo/. + sqlx::query("INSERT INTO auth.users (id, email) VALUES ('22222222-2222-2222-2222-222222222222', 'bob@example.test')") + .execute(&pool) + .await + .unwrap(); + + let bob_token = + server.make_access_token(uuid::Uuid::from_bytes([0x22; 16]), Some("bob@example.test")); + + // === Create a service account === + let create_response: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($prefix: Prefix!, $capability: Capability!, $name: String!) { + createServiceAccount( + prefix: $prefix + capability: $capability + displayName: $name + ) { + id + displayName + prefix + capability + disabledAt + apiKeys { id } + } + }"#, + "variables": { + "prefix": "aliceCo/", + "capability": "admin", + "name": "CI Deploy Bot" + } + }), + Some(&alice_token), + ) + .await; + + assert!( + create_response["errors"].is_null(), + "create should succeed: {create_response}" + ); + let sa = &create_response["data"]["createServiceAccount"]; + let sa_user_id = sa["id"].as_str().expect("should have id"); + assert_eq!(sa["displayName"], "CI Deploy Bot"); + assert_eq!(sa["prefix"], "aliceCo/"); + assert_eq!(sa["capability"], "admin"); + assert!(sa["disabledAt"].is_null()); + assert_eq!(sa["apiKeys"].as_array().unwrap().len(), 0); + + // === Bob cannot create a service account for aliceCo/ === + let unauthorized: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation { + createServiceAccount( + prefix: "aliceCo/" + capability: read + displayName: "hacker bot" + ) { id } + }"# + }), + Some(&bob_token), + ) + .await; + + assert!(unauthorized["errors"].is_array()); + + // === Create an API key === + let create_key: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!, $label: String!, $validFor: String!) { + createApiKey( + serviceAccountId: $userId + label: $label + validFor: $validFor + ) { + id + secret + } + }"#, + "variables": { + "userId": sa_user_id, + "label": "GitHub Actions", + "validFor": "P90D" + } + }), + Some(&alice_token), + ) + .await; + + assert!( + create_key["errors"].is_null(), + "create key should succeed: {create_key}" + ); + let key_data = &create_key["data"]["createApiKey"]; + let key_id = key_data["id"].as_str().expect("should have id"); + let secret = key_data["secret"].as_str().expect("should have secret"); + assert!(secret.starts_with("flow_sa_")); + + // === Exchange the API key for an access token === + let exchange_result: serde_json::Value = server + .rest_client() + .post( + "/api/v1/auth/token", + &serde_json::json!({ + "grant_type": "api_key", + "api_key": secret, + }), + None, + ) + .send() + .await + .unwrap() + .error_for_status() + .unwrap() + .json() + .await + .unwrap(); + assert!(exchange_result["access_token"].is_string()); + + // === List service accounts === + let list: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + query { + serviceAccounts(filter: { catalogPrefix: { startsWith: "aliceCo/" } }) { + edges { + node { + id + displayName + prefix + capability + apiKeys { id label } + } + } + } + }"# + }), + Some(&alice_token), + ) + .await; + + let edges = list["data"]["serviceAccounts"]["edges"] + .as_array() + .expect("should have edges"); + assert_eq!(edges.len(), 1); + assert_eq!(edges[0]["node"]["displayName"], "CI Deploy Bot"); + assert_eq!(edges[0]["node"]["apiKeys"].as_array().unwrap().len(), 1); + + // Bob sees no service accounts. + let bob_list: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + query { + serviceAccounts { edges { node { id } } } + }"# + }), + Some(&bob_token), + ) + .await; + + let bob_edges = bob_list["data"]["serviceAccounts"]["edges"] + .as_array() + .expect("should have edges"); + assert_eq!(bob_edges.len(), 0); + + // === Revoke the API key === + let revoke: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($keyId: Id!) { + revokeApiKey(id: $keyId) + }"#, + "variables": { "keyId": key_id } + }), + Some(&alice_token), + ) + .await; + + assert!( + revoke["errors"].is_null(), + "revoke should succeed: {revoke}" + ); + + // Exchanging the revoked key fails. + let exchange_fail = server + .rest_client() + .post( + "/api/v1/auth/token", + &serde_json::json!({ + "grant_type": "api_key", + "api_key": secret, + }), + None, + ) + .send() + .await + .unwrap(); + assert!(!exchange_fail.status().is_success()); + + // === Create a new key and then disable the service account === + let create_key2: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!, $label: String!, $validFor: String!) { + createApiKey(serviceAccountId: $userId, label: $label, validFor: $validFor) { + id + secret + } + }"#, + "variables": { + "userId": sa_user_id, + "label": "temp key", + "validFor": "P30D" + } + }), + Some(&alice_token), + ) + .await; + + let secret2 = create_key2["data"]["createApiKey"]["secret"] + .as_str() + .unwrap(); + + let disable: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!) { + disableServiceAccount(id: $userId) + }"#, + "variables": { "userId": sa_user_id } + }), + Some(&alice_token), + ) + .await; + + assert!( + disable["errors"].is_null(), + "disable should succeed: {disable}" + ); + + // API key from disabled account fails. + let exchange_disabled = server + .rest_client() + .post( + "/api/v1/auth/token", + &serde_json::json!({ + "grant_type": "api_key", + "api_key": secret2, + }), + None, + ) + .send() + .await + .unwrap(); + assert!(!exchange_disabled.status().is_success()); + + // Cannot create key for disabled account. + let key_while_disabled: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!, $label: String!, $validFor: String!) { + createApiKey(serviceAccountId: $userId, label: $label, validFor: $validFor) { + id + secret + } + }"#, + "variables": { + "userId": sa_user_id, + "label": "should fail", + "validFor": "P30D" + } + }), + Some(&alice_token), + ) + .await; + + assert!(key_while_disabled["errors"].is_array()); + + // Disabling again fails. + let disable_again: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!) { + disableServiceAccount(id: $userId) + }"#, + "variables": { "userId": sa_user_id } + }), + Some(&alice_token), + ) + .await; + + assert!(disable_again["errors"].is_array()); + + // === Re-enable the service account === + let enable: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!) { + enableServiceAccount(id: $userId) + }"#, + "variables": { "userId": sa_user_id } + }), + Some(&alice_token), + ) + .await; + + assert!( + enable["errors"].is_null(), + "enable should succeed: {enable}" + ); + + // Re-enabled account can have new keys created. + let key_after_enable: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!, $label: String!, $validFor: String!) { + createApiKey(serviceAccountId: $userId, label: $label, validFor: $validFor) { + id + secret + } + }"#, + "variables": { + "userId": sa_user_id, + "label": "post-enable key", + "validFor": "P90D" + } + }), + Some(&alice_token), + ) + .await; + + assert!( + key_after_enable["errors"].is_null(), + "create key after enable should succeed: {key_after_enable}" + ); + + let secret3 = key_after_enable["data"]["createApiKey"]["secret"] + .as_str() + .unwrap(); + + // Exchange works again. + let exchange_reenabled = server + .rest_client() + .post( + "/api/v1/auth/token", + &serde_json::json!({ + "grant_type": "api_key", + "api_key": secret3, + }), + None, + ) + .send() + .await + .unwrap(); + assert!(exchange_reenabled.status().is_success()); + + // Enabling an already enabled account fails. + let enable_again: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!) { + enableServiceAccount(id: $userId) + }"#, + "variables": { "userId": sa_user_id } + }), + Some(&alice_token), + ) + .await; + + assert!(enable_again["errors"].is_array()); + } + + /// Covers the refresh-token GraphQL surface (create → list → delete, plus + /// the not-found idempotency guard), the `/api/v1/auth/token` + /// refresh-token dispatch, and the guard denying refresh tokens to + /// service-account principals. + /// + /// The happy-path *exchange* — `generate_access_token` actually signing a + /// JWT — is intentionally not exercised here: it reads `app.jwt_secret` from + /// `vault.decrypted_secrets` and calls pgjwt's `sign()`, neither of which + /// exists in the sqlx::test DB (only `auth`/`stripe` are polyfilled). That + /// signing path is covered by the pgTAP `test_generate_access_token`. We + /// instead assert the endpoint routes the `refresh_token` grant and rejects + /// a bad secret — which fails in `generate_access_token` *before* signing, + /// so it's deterministic without the vault/pgjwt setup. + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures(path = "../../../fixtures", scripts("data_planes", "alice")) + )] + async fn test_refresh_token_management(pool: sqlx::PgPool) { + let _guard = test_server::init(); + + let server = test_server::TestServer::start( + pool.clone(), + test_server::snapshot(pool.clone(), true).await, + ) + .await; + + let alice_token = server.make_access_token( + uuid::Uuid::from_bytes([0x11; 16]), + Some("alice@example.test"), + ); + + // === Create a refresh token === + let create: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation { + createRefreshToken(validFor: "P30D", detail: "test token") { + id + secret + } + }"# + }), + Some(&alice_token), + ) + .await; + + assert!( + create["errors"].is_null(), + "create should succeed: {create}" + ); + let created = &create["data"]["createRefreshToken"]; + let token_id = created["id"].as_str().expect("should have id").to_string(); + assert!( + created["secret"].as_str().is_some(), + "should return a secret" + ); + + // === List refresh tokens (scoped to the authenticated user) === + let list: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + query { + refreshTokens { + edges { node { id detail multiUse uses } } + } + }"# + }), + Some(&alice_token), + ) + .await; + + let edges = list["data"]["refreshTokens"]["edges"] + .as_array() + .expect("should have edges"); + assert_eq!(edges.len(), 1); + assert_eq!(edges[0]["node"]["id"], token_id); + assert_eq!(edges[0]["node"]["detail"], "test token"); + assert_eq!(edges[0]["node"]["multiUse"], true); + assert_eq!(edges[0]["node"]["uses"], 0); + + // === The endpoint dispatches the refresh_token grant === + // A wrong secret is rejected inside generate_access_token (before it + // reaches signing), so this exercises routing + error-shaping for the + // refresh-token branch without depending on the vault/pgjwt signing path. + let bad_secret = server + .rest_client() + .post( + "/api/v1/auth/token", + &serde_json::json!({ + "grant_type": "refresh_token", + "refresh_token_id": token_id, + "secret": "not-the-real-secret", + }), + None, + ) + .send() + .await + .unwrap(); + assert!(!bad_secret.status().is_success()); + + // === Delete the refresh token === + let delete: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#"mutation($id: Id!) { deleteRefreshToken(id: $id) }"#, + "variables": { "id": token_id } + }), + Some(&alice_token), + ) + .await; + assert!( + delete["errors"].is_null(), + "delete should succeed: {delete}" + ); + assert_eq!(delete["data"]["deleteRefreshToken"], true); + + // It's gone from the list. + let list_after: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#"query { refreshTokens { edges { node { id } } } }"# + }), + Some(&alice_token), + ) + .await; + assert_eq!( + list_after["data"]["refreshTokens"]["edges"] + .as_array() + .unwrap() + .len(), + 0 + ); + + // Deleting again fails (not-found guard). + let delete_again: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#"mutation($id: Id!) { deleteRefreshToken(id: $id) }"#, + "variables": { "id": token_id } + }), + Some(&alice_token), + ) + .await; + assert!(delete_again["errors"].is_array()); + + // === Service accounts cannot create refresh tokens === + // They authenticate via API keys, which are expiring, revocable, and + // respect the account's disabled state; a refresh token would bypass + // all three, so issuance to an SA principal must be denied. + let create_sa: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation { + createServiceAccount( + prefix: "aliceCo/" + capability: admin + displayName: "refresh-token bot" + ) { id } + }"# + }), + Some(&alice_token), + ) + .await; + assert!( + create_sa["errors"].is_null(), + "create SA should succeed: {create_sa}" + ); + let sa_user_id = create_sa["data"]["createServiceAccount"]["id"] + .as_str() + .expect("should have id"); + + // Mint an access token whose `sub` is the service account, mirroring + // what `exchange_api_key` issues (no email for an SA principal). + let sa_token = + server.make_access_token(uuid::Uuid::parse_str(sa_user_id).unwrap(), None); + + let sa_create_rt: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#"mutation { createRefreshToken(validFor: "P30D") { id secret } }"# + }), + Some(&sa_token), + ) + .await; + assert!( + sa_create_rt["errors"].is_array(), + "service account should be denied a refresh token: {sa_create_rt}" + ); + } +} diff --git a/crates/control-plane-api/src/server/public/graphql/mod.rs b/crates/control-plane-api/src/server/public/graphql/mod.rs index 189799e46e0..5a02ef3a859 100644 --- a/crates/control-plane-api/src/server/public/graphql/mod.rs +++ b/crates/control-plane-api/src/server/public/graphql/mod.rs @@ -30,6 +30,7 @@ mod billing; mod data_planes; mod filters; pub(crate) use data_planes::parse_data_plane_name; +mod access; mod connectors; pub mod id; mod invite_links; @@ -83,6 +84,7 @@ pub struct QueryRoot( invite_links::InviteLinksQuery, connectors::ConnectorsQuery, tenant::TenantQuery, + access::AccessQuery, ); // Represents the portion of the GraphQL schema that deals with mutations. @@ -93,6 +95,7 @@ pub struct MutationRoot( alert_configs::AlertConfigsMutation, alert_subscriptions::AlertSubscriptionsMutation, invite_links::InviteLinksMutation, + access::AccessMutation, ); pub fn create_schema(alert_config_defaults: models::AlertConfig) -> GraphQLSchema { diff --git a/crates/control-plane-api/src/server/public/mod.rs b/crates/control-plane-api/src/server/public/mod.rs index b9e9378e0af..94bbc545423 100644 --- a/crates/control-plane-api/src/server/public/mod.rs +++ b/crates/control-plane-api/src/server/public/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; pub mod graphql; mod open_metrics; pub mod status; +pub mod token_exchange; /// Creates a router for the public API that can be merged into an existing router. /// All endpoints registered here are documented in an OpenAPI spec. For adding new @@ -69,6 +70,10 @@ pub(crate) fn api_v1_router( // The openapi json is itself documented as an API route .api_route("/api/v1/openapi.json", aide::axum::routing::get(serve_docs)) // The docs UI is not documented as an API route + .api_route( + "/api/v1/auth/token", + aide::axum::routing::post(token_exchange::handle_post_token), + ) .route( "/api/v1/docs", axum::routing::get( diff --git a/crates/control-plane-api/src/server/public/token_exchange.rs b/crates/control-plane-api/src/server/public/token_exchange.rs new file mode 100644 index 00000000000..42a3b131a32 --- /dev/null +++ b/crates/control-plane-api/src/server/public/token_exchange.rs @@ -0,0 +1,206 @@ +use std::sync::Arc; + +#[derive(Debug, serde::Deserialize, schemars::JsonSchema)] +#[serde(tag = "grant_type")] +pub enum TokenRequest { + #[serde(rename = "api_key")] + ApiKey { api_key: String }, + #[serde(rename = "refresh_token")] + RefreshToken { + refresh_token_id: models::Id, + secret: String, + }, +} + +#[derive(Debug, serde::Serialize, schemars::JsonSchema)] +pub struct TokenResponse { + pub access_token: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub refresh_token: Option, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema)] +pub struct RefreshTokenResponse { + pub id: models::Id, + pub secret: String, +} + +pub async fn handle_post_token( + axum::extract::State(app): axum::extract::State>, + axum::Json(req): axum::Json, +) -> Result, crate::ApiError> { + match req { + TokenRequest::ApiKey { api_key } => exchange_api_key(&app, &api_key).await, + TokenRequest::RefreshToken { + refresh_token_id, + secret, + } => exchange_refresh_token(&app, refresh_token_id, &secret).await, + } +} + +async fn exchange_api_key( + app: &crate::App, + api_key: &str, +) -> Result, crate::ApiError> { + let raw = api_key + .strip_prefix("flow_sa_") + .ok_or_else(|| bad_request("api_key must start with flow_sa_"))?; + + use base64::Engine; + let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(raw) + .map_err(|_| bad_request("malformed api_key: invalid base64"))?; + + let decoded_str = + String::from_utf8(decoded).map_err(|_| bad_request("malformed api_key: invalid UTF-8"))?; + + let (id_str, plaintext_secret) = decoded_str + .split_once(':') + .ok_or_else(|| bad_request("malformed api_key payload"))?; + + let key_id: models::Id = id_str + .parse() + .map_err(|_| bad_request("malformed api_key: invalid key id"))?; + + // Verify the key against bcrypt hash and check expiry in one query. + // disabled_at is returned separately so we can give a distinct error. + let row = sqlx::query!( + r#" + SELECT + ak.service_account_id, + sa.disabled_at + FROM internal.api_keys ak + JOIN internal.service_accounts sa ON sa.user_id = ak.service_account_id + WHERE ak.id = $1 + AND ak.secret_hash = crypt($2, ak.secret_hash) + AND ak.expires_at > now() + "#, + key_id as models::Id, + plaintext_secret, + ) + .fetch_optional(&app.pg_pool) + .await + .map_err(|err| { + crate::ApiError::Status(tonic::Status::internal(format!( + "failed to verify api key: {err}" + ))) + })? + .ok_or_else(|| unauthenticated("invalid or expired api key"))?; + + if row.disabled_at.is_some() { + return Err(unauthenticated("service account is disabled")); + } + + // Update last_used_at on the key and the service account. + sqlx::query!( + "UPDATE internal.api_keys SET last_used_at = now() WHERE id = $1", + key_id as models::Id, + ) + .execute(&app.pg_pool) + .await + .map_err(|err| { + crate::ApiError::Status(tonic::Status::internal(format!( + "failed to update api key last_used_at: {err}" + ))) + })?; + + sqlx::query!( + "UPDATE internal.service_accounts SET last_used_at = now() WHERE user_id = $1", + row.service_account_id, + ) + .execute(&app.pg_pool) + .await + .map_err(|err| { + crate::ApiError::Status(tonic::Status::internal(format!( + "failed to update service account last_used_at: {err}" + ))) + })?; + + // Mint the access token directly in the application layer. This is the + // canonical token-minting path: the plan is to retire the SQL + // `generate_access_token` function and have all access tokens minted here. + // Until then, the refresh-token branch below still delegates to SQL for + // existing PostgREST callers, so any change to the access-token claim + // shape must be made here (not in the SQL function, which is frozen). + let now = tokens::now(); + let claims = models::authorizations::ControlClaims { + iat: now.timestamp() as u64, + exp: (now + chrono::Duration::hours(1)).timestamp() as u64, + sub: row.service_account_id, + role: "authenticated".to_string(), + aud: "authenticated".to_string(), + email: None, + }; + + let access_token = + tokens::jwt::sign(&claims, &app.control_plane_jwt_encode_key).map_err(|err| { + crate::ApiError::Status(tonic::Status::internal(format!( + "failed to sign access token: {err}" + ))) + })?; + + tracing::info!( + %key_id, + service_account_id = %row.service_account_id, + "exchanged api key for access token" + ); + + Ok(axum::Json(TokenResponse { + access_token, + refresh_token: None, + })) +} + +// Exchange a refresh token for an access token. +// +// This delegates to the SQL `generate_access_token` function transitionally: +// existing clients (flowctl via flow-client) still authenticate against the +// PostgREST `/rpc/generate_access_token` surface, so the function must keep +// working unchanged. The plan is to migrate those callers onto this endpoint +// and then retire the SQL function, folding refresh-token minting into the +// application-layer path used by `exchange_api_key`. New clients should target +// this endpoint rather than PostgREST. +async fn exchange_refresh_token( + app: &crate::App, + refresh_token_id: models::Id, + secret: &str, +) -> Result, crate::ApiError> { + #[derive(Debug, serde::Deserialize)] + struct SqlResponse { + access_token: String, + refresh_token: Option, + } + + let response = sqlx::query!( + "select generate_access_token($1, $2) as token", + refresh_token_id as models::Id, + secret, + ) + .fetch_one(&app.pg_pool) + .await + .map_err(|err| { + crate::ApiError::Status(tonic::Status::unauthenticated(format!( + "failed to exchange refresh token: {err}" + ))) + })?; + + let parsed: SqlResponse = + serde_json::from_value(response.token.unwrap_or_default()).map_err(|err| { + crate::ApiError::Status(tonic::Status::internal(format!( + "invalid token response: {err}" + ))) + })?; + + Ok(axum::Json(TokenResponse { + access_token: parsed.access_token, + refresh_token: parsed.refresh_token, + })) +} + +fn bad_request(msg: &str) -> crate::ApiError { + crate::ApiError::Status(tonic::Status::invalid_argument(msg)) +} + +fn unauthenticated(msg: &str) -> crate::ApiError { + crate::ApiError::Status(tonic::Status::unauthenticated(msg)) +} diff --git a/crates/flow-client/control-plane-api.graphql b/crates/flow-client/control-plane-api.graphql index e8523045889..357894f8fe4 100644 --- a/crates/flow-client/control-plane-api.graphql +++ b/crates/flow-client/control-plane-api.graphql @@ -242,6 +242,15 @@ input AlertsBy { active: Boolean } +type ApiKeyInfo { + id: Id! + label: String! + createdBy: UUID! + createdAt: DateTime! + expiresAt: DateTime! + lastUsedAt: DateTime +} + type AutoDiscoverFailure { """ The number of consecutive failures that have been observed. @@ -312,6 +321,11 @@ type AutoDiscoverStatus { failure: AutoDiscoverFailure } +type BillingPaymentMethodPayload { + paymentMethods: [PaymentMethod!]! + primaryPaymentMethod: PaymentMethod +} + input BoolFilter { eq: Boolean } @@ -341,6 +355,13 @@ enum CapabilityBit { Assume } +type CardPaymentMethodDetails { + brand: String + last4: String + expMonth: Int! + expYear: Int! +} + enum CatalogType { capture collection @@ -348,6 +369,12 @@ enum CatalogType { test } +enum ChargeStatus { + FAILED + PENDING + SUCCEEDED +} + scalar Collection """ @@ -585,6 +612,15 @@ type Controller { updatedAt: DateTime! } +type CreateApiKeyResult { + id: Id! + secret: String! +} + +type CreateBillingSetupIntentPayload { + clientSecret: String! +} + """ Result of creating a storage mapping. """ @@ -689,6 +725,11 @@ type DataPlaneEdge { cursor: String! } +input DateFilter { + gt: NaiveDate + lt: NaiveDate +} + """ Implement the DateTime scalar @@ -833,6 +874,72 @@ input InviteLinksFilter { catalogPrefix: PrefixFilter } +type Invoice { + dateStart: String! + dateEnd: String! + invoiceType: InvoiceType! + subtotal: Int! + lineItems: JSON! + extra: JSON! + amountDue: Int + status: String + invoicePdf: String + hostedInvoiceUrl: String + paymentDetails: InvoicePaymentDetails +} + +type InvoiceConnection { + """ + Information to aid in pagination. + """ + pageInfo: PageInfo! + """ + A list of edges. + """ + edges: [InvoiceEdge!]! + """ + A list of nodes. + """ + nodes: [Invoice!]! +} + +""" +An edge in a connection. +""" +type InvoiceEdge { + """ + The item at the end of the edge + """ + node: Invoice! + """ + A cursor for use in pagination + """ + cursor: String! +} + +input InvoiceFilter { + dateStart: DateFilter + dateEnd: DateFilter + invoiceType: InvoiceTypeFilter +} + +type InvoicePaymentDetails { + status: ChargeStatus! + receiptUrl: String + card: CardPaymentMethodDetails + usBankAccount: UsBankAccountPaymentMethodDetails +} + +enum InvoiceType { + FINAL + PREVIEW + MANUAL +} + +input InvoiceTypeFilter { + eq: InvoiceType +} + """ A scalar that can represent any JSON value. """ @@ -1010,6 +1117,9 @@ type LockFailure { } type MutationRoot { + createBillingSetupIntent(tenant: String!): CreateBillingSetupIntentPayload! + setBillingPaymentMethod(tenant: String!, paymentMethodId: String!): BillingPaymentMethodPayload! + deleteBillingPaymentMethod(tenant: String!, paymentMethodId: String!): BillingPaymentMethodPayload! """ Create a storage mapping for the given catalog prefix. @@ -1090,8 +1200,83 @@ type MutationRoot { The caller must have admin capability on the invite link's catalog prefix. """ deleteInviteLink(token: UUID!): Boolean! + """ + Create a service account with a grant to the specified prefix. + + The caller must have admin capability on the prefix. + Creates an auth.users row, an internal.service_accounts row, + and a user_grants row for the service account. + """ + createServiceAccount(prefix: Prefix!, capability: Capability!, displayName: String!): ServiceAccount! + """ + Disable a service account, revoking all API keys and grants. + + The caller must have admin capability on the service account's prefix. + The auth.users row is preserved for audit trail / FK integrity. + + Unlike revoking a single key, disabling removes the service account's + grants, so access tokens already issued from its keys resolve to zero + capability on their next authorization check (bounded by snapshot-refresh + lag, not the token's full ~1h lifetime). Use this to cut off an + active service account, not just stop new tokens. + """ + disableServiceAccount(id: UUID!): Boolean! + """ + Re-enable a disabled service account, restoring its user_grants row. + + Does NOT restore previously revoked API keys — new ones must be minted. + """ + enableServiceAccount(id: UUID!): Boolean! + """ + Create an API key for a service account. + + Returns the key_id and the plaintext secret (flow_sa_...). + The secret is returned exactly once and cannot be retrieved again. + """ + createApiKey( serviceAccountId: UUID!, label: String!, + """ + ISO 8601 duration for key validity (e.g. P90D, P1Y) + """ + validFor: String! + ): CreateApiKeyResult! + """ + Revoke (delete) an API key. + + The caller must have admin capability on the owning service account's prefix. + + Revocation stops *new* access tokens from being minted with this key, but + does not invalidate access tokens already issued from it — those remain + valid until they expire (up to ~1h), since the service account's grants + are untouched. To cut off active sessions immediately, disable the + service account (which removes its grants). + """ + revokeApiKey(id: Id!): Boolean! + """ + Create a refresh token for the authenticated user. + """ + createRefreshToken( + """ + ISO 8601 duration for token validity (e.g. P90D) + """ + validFor: String! = "P90D", multiUse: Boolean! = true, detail: String = null + ): RefreshTokenResult! + """ + Delete a refresh token owned by the authenticated user. + """ + deleteRefreshToken(id: Id!): Boolean! } +""" +ISO 8601 calendar date without timezone. +Format: %Y-%m-%d + +# Examples + +* `1994-11-13` +* `2000-02-24` +""" +scalar NaiveDate + scalar Name """ @@ -1116,6 +1301,18 @@ type PageInfo { endCursor: String } +type PaymentMethod { + id: String! + type: String! + billingDetails: PaymentMethodBillingDetails! + card: CardPaymentMethodDetails + usBankAccount: UsBankAccountPaymentMethodDetails +} + +type PaymentMethodBillingDetails { + name: String +} + """ Information on the config updates performed by the controller. This does not include any information on user-initiated config updates. @@ -1361,6 +1558,12 @@ type QueryRoot { Returns a paginated list of connectors, optionally filtered by protocol. """ connectors(filter: ConnectorsFilter, after: String, before: String, first: Int, last: Int): ConnectorConnection! + tenant(name: String!): Tenant + serviceAccounts(filter: ServiceAccountsFilter, after: String, first: Int): ServiceAccountConnection! + """ + List refresh tokens owned by the authenticated user. + """ + refreshTokens(after: String, first: Int): RefreshTokenInfoConnection! } """ @@ -1377,6 +1580,46 @@ type RedeemInviteLinkResult { capability: Capability! } +type RefreshTokenInfo { + id: Id! + detail: String + createdAt: DateTime! + updatedAt: DateTime! + multiUse: Boolean! + validFor: String! + uses: Int! +} + +type RefreshTokenInfoConnection { + """ + Information to aid in pagination. + """ + pageInfo: PageInfo! + """ + A list of edges. + """ + edges: [RefreshTokenInfoEdge!]! +} + +""" +An edge in a connection. +""" +type RefreshTokenInfoEdge { + """ + The item at the end of the edge + """ + node: RefreshTokenInfo! + """ + A cursor for use in pagination + """ + cursor: String! +} + +type RefreshTokenResult { + id: Id! + secret: String! +} + type RepublishRequested { """ Informational only, timestamp of when the controller observed the `Republish` request. @@ -1395,6 +1638,48 @@ type RepublishRequested { lastBuildId: Id! } +type ServiceAccount { + id: UUID! + displayName: String! + prefix: Prefix! + capability: Capability! + createdBy: UUID! + createdAt: DateTime! + updatedAt: DateTime! + lastUsedAt: DateTime + disabledAt: DateTime + apiKeys: [ApiKeyInfo!]! +} + +type ServiceAccountConnection { + """ + Information to aid in pagination. + """ + pageInfo: PageInfo! + """ + A list of edges. + """ + edges: [ServiceAccountEdge!]! +} + +""" +An edge in a connection. +""" +type ServiceAccountEdge { + """ + The item at the end of the edge + """ + node: ServiceAccount! + """ + A cursor for use in pagination + """ + cursor: String! +} + +input ServiceAccountsFilter { + catalogPrefix: PrefixFilter +} + """ The shape of a connector status, which matches that of an ops::Log. """ @@ -1737,6 +2022,17 @@ input StorageMappingsBy { underPrefix: Prefix } +type Tenant { + name: String! + billing: TenantBilling! +} + +type TenantBilling { + paymentMethods: [PaymentMethod!]! + primaryPaymentMethod: PaymentMethod + invoices(filter: InvoiceFilter, after: String, before: String, first: Int, last: Int): InvoiceConnection! +} + """ A UUID is a unique 128-bit number, stored as 16 octets. UUIDs are parsed as Strings within GraphQL. UUIDs are used to assign unique identifiers to @@ -1777,6 +2073,12 @@ URL is a String implementing the [URL Standard](http://url.spec.whatwg.org/) """ scalar Url +type UsBankAccountPaymentMethodDetails { + bankName: String + last4: String + accountHolderType: String +} + """ Marks an element of a GraphQL schema as no longer supported. """ diff --git a/supabase/migrations/20260528120000_service_accounts.sql b/supabase/migrations/20260528120000_service_accounts.sql new file mode 100644 index 00000000000..64457dc0d7d --- /dev/null +++ b/supabase/migrations/20260528120000_service_accounts.sql @@ -0,0 +1,59 @@ +begin; + +-- Service accounts: non-login identities that authenticate via API keys. +-- Used for programmatic access (CI/CD, automation) and scoped, time-limited +-- access grants. + +create table internal.service_accounts ( + user_id uuid primary key references auth.users (id), + prefix public.catalog_prefix not null, + -- `capability` mirrors the legacy column on user_grants/role_grants and is + -- slated for the same eventual retirement in favor of `bundles`. `'none'` is + -- permitted so a service account can be authorized entirely by its bundles. + capability public.grant_capability not null + constraint valid_capability check ( + capability = any (array[ + 'none'::public.grant_capability, + 'read'::public.grant_capability, + 'write'::public.grant_capability, + 'admin'::public.grant_capability + ]) + ), + bundles public.capability_bundle[] not null default '{}', + display_name text not null, + created_by uuid not null references auth.users (id), + last_used_at timestamptz, + disabled_at timestamptz, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +comment on table internal.service_accounts is + 'Non-login identities that authenticate via API keys and are authorized through user_grants.'; + +-- The serviceAccounts query scopes results to a caller's admin prefixes with +-- `prefix::text ^@ ANY($1)`. SP-GiST natively supports the `^@` (starts-with) +-- operator; a btree index would not be used by it. +create index service_accounts_prefix_spgist on internal.service_accounts + using spgist ((prefix::text)); + +-- API keys: long-lived credentials for service accounts, exchanged for short-lived JWTs +-- via the /api/v1/auth/token REST endpoint. + +create table internal.api_keys ( + id public.flowid primary key not null default internal.id_generator(), + service_account_id uuid not null references internal.service_accounts (user_id), + secret_hash text not null, + label text not null, + expires_at timestamptz not null, + created_by uuid not null references auth.users (id), + last_used_at timestamptz, + created_at timestamptz not null default now() +); + +create index api_keys_service_account_id on internal.api_keys (service_account_id); + +comment on table internal.api_keys is + 'Long-lived credentials for service accounts, exchanged for short-lived JWTs via the token exchange endpoint.'; + +commit;