diff --git a/extensions/kafka/CHANGELOG.md b/extensions/kafka/CHANGELOG.md
new file mode 100644
index 0000000000000..9a85ee1e43f24
--- /dev/null
+++ b/extensions/kafka/CHANGELOG.md
@@ -0,0 +1,12 @@
+# Kafka Changelog
+
+## [Added kafka] - 2023-03-01
+
+### Kafka command
+
+- List and consult topic configurations
+- List consumers with state, members and overall lag
+
+### Kafka menu bar command
+
+Have kafka in menu bar with background actualization every 5 minutes
diff --git a/extensions/kafka/README.md b/extensions/kafka/README.md
new file mode 100644
index 0000000000000..450cdc47cf15a
--- /dev/null
+++ b/extensions/kafka/README.md
@@ -0,0 +1,93 @@
+# Kafka extension
+
+> Extension to quickly consult topics and consumers of kafka broker
+
+## Configuration
+
+| Name | Required | Default | Example | Description |
+|-----------------------------|----------|-------------|------------------------|-------------------------------------------------------------------|
+| configDirectory | Yes | `~/.kafka/` | `/kafka-config/` | Configuration directory containing [env files](#environment-file) |
+| extractRegex | No | - | `topic_(.*)_(.*)_(.*)` | Regex to extract information from topic name |
+| extractTitleGroup | No | - | 1 | Group to get from regex to display title |
+| extractSubTitleGroup | No | - | 2 | Group to get from regex to display subtitle |
+| extractMetadataNameAndGroup | No | - | Application=3 | Extract metadata from regex |
+
+Note: in this example, with a topic named `topic_NAME_OWNER_APP`, the title
+would be **NAME**, the subtitle **OWNER**, and in the side panel we would have
+the following metadata **Application=APP**. If you don't set extractRegex full
+topic name will be displayed.
+
+### Environment file
+
+Environment file must be a json in the configuration directory with following
+fields :
+
+````typescript
+export interface KafkaEnv {
+ // env name to display on raycast dropdown
+ name: string;
+ // to filter topics by keyword(s)
+ filterTopics?: string[];
+ // to filter consumers by keyword(s)
+ filterConsumers?: string[];
+ // kafka js configuration for broker, authentication, etc
+ kafkaJs: KafkaConfig;
+}
+````
+
+Following [kafkaJS](https://kafka.js.org/) configuration is used by default :
+
+````typescript
+const defaultConfiguration = {
+ connectionTimeout: 10000,
+ requestTimeout: 30000,
+ ssl: {
+ rejectUnauthorized: false
+ },
+ logLevel: logLevel.ERROR
+};
+````
+
+More info on configuring
+[kafkaJS](https://kafka.js.org/) [here](https://kafka.js.org/docs/configuration)
+
+_Example_
+
+````json
+{
+ "name": "Dev",
+ "filterTopics": [
+ "my-prefix-for-dev"
+ ],
+ "filterConsumers": [
+ "dev",
+ "def"
+ ],
+ "kafkaJs": {
+ "brokers": [
+ "kafka-host:kafka-port"
+ ],
+ "sasl": {
+ "mechanism": "plain",
+ "username": "user",
+ "password": "password"
+ }
+ }
+}
+
+````
+
+## Kafka command
+
+- List and consult topic configurations
+- List consumers with state, members and overall lag
+
+## Kafka menu bar command
+
+Have kafka in menu bar with background actualization every 5 minutes
+
+## Configuration
+
+| Name | Required | Default | Description |
+|----------------|----------|---------|------------------------------------------------|
+| hideWithoutLag | No | false | Hide consumers without lag to avoid long lists |
diff --git a/extensions/kafka/assets/kafka-dark.png b/extensions/kafka/assets/kafka-dark.png
new file mode 100644
index 0000000000000..cd0a8a86cd638
Binary files /dev/null and b/extensions/kafka/assets/kafka-dark.png differ
diff --git a/extensions/kafka/assets/kafka.png b/extensions/kafka/assets/kafka.png
new file mode 100644
index 0000000000000..6977c1200f097
Binary files /dev/null and b/extensions/kafka/assets/kafka.png differ
diff --git a/extensions/kafka/assets/kafka.svg b/extensions/kafka/assets/kafka.svg
new file mode 100644
index 0000000000000..e373aefc36e1b
--- /dev/null
+++ b/extensions/kafka/assets/kafka.svg
@@ -0,0 +1,16 @@
+
+
diff --git a/extensions/kafka/metadata/kafka-1.png b/extensions/kafka/metadata/kafka-1.png
new file mode 100644
index 0000000000000..61a8697670c32
Binary files /dev/null and b/extensions/kafka/metadata/kafka-1.png differ
diff --git a/extensions/kafka/metadata/kafka-2.png b/extensions/kafka/metadata/kafka-2.png
new file mode 100644
index 0000000000000..bdaa602932404
Binary files /dev/null and b/extensions/kafka/metadata/kafka-2.png differ
diff --git a/extensions/kafka/metadata/kafka-3.png b/extensions/kafka/metadata/kafka-3.png
new file mode 100644
index 0000000000000..1f5700f44d0e3
Binary files /dev/null and b/extensions/kafka/metadata/kafka-3.png differ
diff --git a/extensions/kafka/metadata/kafka-4.png b/extensions/kafka/metadata/kafka-4.png
new file mode 100644
index 0000000000000..614f178566450
Binary files /dev/null and b/extensions/kafka/metadata/kafka-4.png differ
diff --git a/extensions/kafka/package-lock.json b/extensions/kafka/package-lock.json
new file mode 100644
index 0000000000000..0967ef424bce6
--- /dev/null
+++ b/extensions/kafka/package-lock.json
@@ -0,0 +1 @@
+{}
diff --git a/extensions/kafka/package.json b/extensions/kafka/package.json
new file mode 100644
index 0000000000000..4c38e272a81aa
--- /dev/null
+++ b/extensions/kafka/package.json
@@ -0,0 +1,103 @@
+{
+ "$schema": "https://www.raycast.com/schemas/extension.json",
+ "name": "kafka",
+ "title": "Kafka",
+ "description": "Kafka extension for Raycast",
+ "icon": "kafka.png",
+ "author": "fonimus",
+ "categories": [
+ "Productivity",
+ "Developer Tools"
+ ],
+ "license": "MIT",
+ "commands": [
+ {
+ "name": "kafka",
+ "title": "Kafka",
+ "subtitle": "Kafka - topics and consumers",
+ "description": "Overview of kafka topics and consumers by environment",
+ "mode": "view"
+ },
+ {
+ "name": "kafka-menubar",
+ "title": "Kafka Menubar",
+ "subtitle": "Kafka menu bar - consumers",
+ "description": "Overview of kafka consumers lag by environment with refresh",
+ "mode": "menu-bar",
+ "interval": "5m",
+ "preferences": [
+ {
+ "name": "hideWithoutLag",
+ "title": "Hide without lag",
+ "label": "Hide consumers without lag",
+ "description": "Hide consumers without lag to avoid long lists",
+ "type": "checkbox",
+ "required": false
+ }
+ ]
+ }
+ ],
+ "preferences": [
+ {
+ "name": "configDirectory",
+ "title": "Config directory for env files",
+ "description": "Configuration directory containing env files",
+ "type": "directory",
+ "default": "~/.kafka/",
+ "required": true
+ },
+ {
+ "name": "extractRegex",
+ "title": "Extract regex",
+ "description": "Regex to extract information from topic name",
+ "type": "textfield",
+ "required": false
+ },
+ {
+ "name": "extractTitleGroup",
+ "title": "Extract title group",
+ "description": "Group to get from regex to display title",
+ "type": "textfield",
+ "required": false
+ },
+ {
+ "name": "extractSubTitleGroup",
+ "title": "Extract sub title group",
+ "description": "Group to get from regex to display sub title",
+ "type": "textfield",
+ "required": false
+ },
+ {
+ "name": "extractMetadataNameAndGroup",
+ "title": "Extract metadata name and group",
+ "description": "Extract metadata from regex. Ex: (Owner=2,Application=4)",
+ "type": "textfield",
+ "required": false
+ }
+ ],
+ "dependencies": {
+ "@raycast/api": "1.48.5",
+ "@raycast/utils": "1.5.0",
+ "kafkajs": "2.2.3",
+ "moment": "2.29.4"
+ },
+ "devDependencies": {
+ "@types/kafkajs": "1.9.0",
+ "@types/node": "18.14.2",
+ "@types/react": "18.0.28",
+ "@typescript-eslint/eslint-plugin": "5.53.0",
+ "@typescript-eslint/parser": "5.54.0",
+ "eslint": "8.35.0",
+ "eslint-config-prettier": "8.6.0",
+ "eslint-plugin-react-hooks": "4.6.0",
+ "prettier": "2.8.4",
+ "typescript": "4.9.5"
+ },
+ "scripts": {
+ "build": "ray build -e dist",
+ "dev": "ray develop",
+ "fix-lint": "ray lint --fix",
+ "lint": "ray lint",
+ "publish": "ray publish"
+ }
+}
diff --git a/extensions/kafka/src/components/consumers.tsx b/extensions/kafka/src/components/consumers.tsx
new file mode 100644
index 0000000000000..5ac7de5ba46e4
--- /dev/null
+++ b/extensions/kafka/src/components/consumers.tsx
@@ -0,0 +1,276 @@
+import { Action, ActionPanel, Alert, Color, confirmAlert, Icon, List, showToast, Toast } from "@raycast/api";
+import { useCallback, useEffect, useState } from "react";
+import { ConsumerGroupState, GroupDescription, GroupOverview } from "kafkajs";
+import { buildAdmin, getConfig, getConsumerInfo, getEnvs } from "../utils";
+import { useCachedState } from "@raycast/utils";
+import moment from "moment/moment";
+
+interface ConsumerInfo {
+ groupId: string;
+ topicName?: string;
+ state?: ConsumerGroupState;
+ members?: number;
+ overall?: number;
+}
+
+function getConsumerStateColor(state: ConsumerGroupState): Color.ColorLike {
+ switch (state) {
+ case "PreparingRebalance":
+ case "CompletingRebalance":
+ return Color.Yellow;
+ case "Stable":
+ return Color.Green;
+ case "Dead":
+ return Color.Red;
+ case "Empty":
+ case "Unknown":
+ return Color.Brown;
+ }
+}
+
+function getAccessories(consumer: ConsumerInfo) {
+ const result = [];
+ if (consumer.overall !== undefined) {
+ result.push({ text: consumer.overall.toLocaleString() });
+ }
+ if (consumer.state) {
+ result.push({
+ tag: {
+ color: getConsumerStateColor(consumer.state),
+ value: consumer.state,
+ },
+ });
+ }
+ if (consumer.members !== undefined) {
+ result.push({ text: consumer.members + " members" });
+ }
+ return result;
+}
+
+export default function KafkaConsumers() {
+ const [isLoading, setIsLoading] = useState(false);
+ const [env, setEnv] = useCachedState("env", getEnvs()[0]);
+ const [consumers, setConsumers] = useState([]);
+
+ const load = useCallback(async (env: string) => {
+ const start = moment.now();
+ setConsumers([]);
+ console.info("[load] get kafka consumers for env:", env);
+ setIsLoading(true);
+ try {
+ const admin = await buildAdmin(env);
+ const filterConsumers = getConfig(env).filterConsumers;
+
+ const groupIds = (await admin.listGroups()).groups
+ .map((group: GroupOverview) => group.groupId)
+ .filter((groupId) => {
+ if (!filterConsumers) {
+ return true;
+ }
+ return filterConsumers.some((filterConsumer) => groupId.includes(filterConsumer));
+ });
+ setConsumers(groupIds.map((group) => ({ groupId: group })));
+
+ const groups = new Map(
+ (await admin.describeGroups(groupIds)).groups.map((group) => [group.groupId, group])
+ );
+
+ setConsumers(
+ Array.from(groups.values()).map((group) => {
+ return {
+ groupId: group.groupId,
+ state: group.state,
+ members: group.members.length,
+ };
+ })
+ );
+
+ // compute lag
+ for (const groupId of groupIds) {
+ const lagInfo = await getConsumerInfo(admin, groupId);
+ setConsumers((existingItems) => {
+ return existingItems
+ .map((value) => {
+ const newValue = JSON.parse(JSON.stringify(value));
+ if (newValue.groupId === groupId) {
+ newValue.topicName = lagInfo.topicName;
+ newValue.overall = lagInfo.overallLag;
+ }
+ return newValue;
+ })
+ .sort((a, b) => {
+ if (b.overall === a.overall) {
+ return a.groupId.localeCompare(b.groupId);
+ } else if (a.overall !== undefined && b.overall !== undefined) {
+ return b.overall - a.overall;
+ } else if (a.overall !== undefined) {
+ return -1;
+ } else if (b.overall !== undefined) {
+ return 1;
+ }
+ return 0;
+ });
+ });
+ }
+ } catch (e) {
+ console.error("Unable to get kafka consumers", e);
+ } finally {
+ setIsLoading(false);
+ console.info(`[load] done in ${moment.utc(moment.now() - start).format("mm:ss.SSS")}`);
+ }
+ }, []);
+
+ useEffect(() => {
+ load(env).finally(() => console.debug("Init done"));
+ }, [env, load]);
+
+ const setOffset = useCallback(
+ async (consumer: ConsumerInfo, from: Date | null | "latest" | "earliest" | number) => {
+ setIsLoading(true);
+ const toast = await showToast({
+ style: Toast.Style.Animated,
+ title: "Setting offset",
+ });
+
+ try {
+ if (from === null) {
+ toast.style = Toast.Style.Failure;
+ toast.message = "Please select a date !";
+ return;
+ }
+ if (!consumer.topicName) {
+ toast.style = Toast.Style.Failure;
+ toast.message = "Could not find topic name !";
+ return;
+ }
+ if (
+ await confirmAlert({
+ title: `Set offset of '${consumer.groupId}' from ${
+ typeof from === "number" ? "latest -" + from : from instanceof Date ? from.toISOString() : from
+ } ?`,
+ primaryAction: {
+ title: "Reset",
+ style: Alert.ActionStyle.Destructive,
+ },
+ icon: Icon.Gear,
+ dismissAction: {
+ title: "Cancel",
+ style: Alert.ActionStyle.Cancel,
+ },
+ })
+ ) {
+ const admin = await buildAdmin(env);
+ const baseOptions = {
+ groupId: consumer.groupId,
+ topic: consumer.topicName,
+ };
+ console.info("Set offset at", baseOptions, from);
+ if (from instanceof Date) {
+ const offsets = await admin.fetchTopicOffsetsByTimestamp(consumer.topicName, from.getTime());
+ await admin.setOffsets({ ...baseOptions, partitions: offsets });
+ } else if (from === "earliest") {
+ const offsets = await admin.fetchTopicOffsets(consumer.topicName);
+ await admin.setOffsets({
+ ...baseOptions,
+ partitions: offsets.map((offset) => ({
+ partition: offset.partition,
+ offset: offset.low,
+ })),
+ });
+ } else if (from === "latest") {
+ const offsets = await admin.fetchTopicOffsets(consumer.topicName);
+ await admin.setOffsets({
+ ...baseOptions,
+ partitions: offsets.map((offset) => ({
+ partition: offset.partition,
+ offset: offset.offset,
+ })),
+ });
+ } else {
+ // from is number
+ const offsets = await admin.fetchTopicOffsets(consumer.topicName);
+ await admin.setOffsets({
+ ...baseOptions,
+ partitions: offsets.map((offset) => ({
+ partition: offset.partition,
+ offset: (Number(offset.offset) - 10).toString(),
+ })),
+ });
+ }
+
+ toast.style = Toast.Style.Success;
+ toast.message = "Offset reset with success, reloading";
+
+ await load(env);
+ setIsLoading(false);
+ } else {
+ await toast.hide();
+ }
+ } catch (error) {
+ toast.style = Toast.Style.Failure;
+ toast.message = "Failed to reset offset" + "\n" + String(error);
+ } finally {
+ setIsLoading(false);
+ }
+ },
+ [env, load]
+ );
+
+ return (
+ setEnv(newValue)}>
+
+ {getEnvs().map((env) => (
+
+ ))}
+
+
+ }
+ >
+
+ {consumers.map((consumer) => (
+
+
+ await load(env)}
+ />
+ setOffset(consumer, "latest")}
+ />
+ setOffset(consumer, "earliest")}
+ />
+ setOffset(consumer, date)}
+ />
+ setOffset(consumer, 10)}
+ />
+
+ }
+ />
+ ))}
+
+
+ );
+}
diff --git a/extensions/kafka/src/components/topics.tsx b/extensions/kafka/src/components/topics.tsx
new file mode 100644
index 0000000000000..250d992166d9f
--- /dev/null
+++ b/extensions/kafka/src/components/topics.tsx
@@ -0,0 +1,262 @@
+import { Action, ActionPanel, Color, Icon, List } from "@raycast/api";
+import { useCallback, useEffect, useState } from "react";
+import { ConfigEntries, ConfigResourceTypes } from "kafkajs";
+import { buildAdmin, getConfig, getEnvs, getExtractConfig } from "../utils";
+import { useCachedState } from "@raycast/utils";
+
+interface ConfigEntry {
+ name: string;
+ value: string;
+}
+
+enum Compacted {
+ loading = "loading",
+ unauthorized = "unauthorized",
+ compacted = "compacted",
+ not_compacted = "not_compacted",
+}
+
+interface TopicInfo {
+ name: string;
+ title?: string;
+ subtitle?: string;
+ metadata: Record;
+ compacted: Compacted;
+ config: ConfigEntry[];
+ partitions?: number;
+}
+
+function getAccessories(topic: TopicInfo) {
+ const result = [];
+ if (topic.title) {
+ result.push({
+ text: topic.partitions?.toString(),
+ tooltip: "Number of partitions",
+ });
+ for (const metadataKey in topic.metadata) {
+ result.push({ text: topic.metadata[metadataKey] });
+ }
+ }
+ switch (topic.compacted) {
+ case Compacted.loading:
+ result.push({ tag: { color: Color.Brown, value: topic.compacted } });
+ break;
+ case Compacted.unauthorized:
+ result.push({ tag: { color: Color.Yellow, value: topic.compacted } });
+ break;
+ case Compacted.compacted:
+ result.push({ tag: { color: Color.Green, value: topic.compacted } });
+ break;
+ case Compacted.not_compacted:
+ result.push({ tag: { color: Color.Red, value: topic.compacted } });
+ break;
+ }
+ return result;
+}
+
+function buildNamesAndMetadata(topic: string) {
+ const config = getExtractConfig();
+ const base = {
+ name: topic,
+ metadata: {} as Record,
+ };
+ if (config === null) {
+ return base;
+ }
+
+ const matches = topic.match(config.regex);
+ if (!matches) {
+ return base;
+ }
+
+ const title = matches.length - 1 >= config.extractTitleGroup ? matches[config.extractTitleGroup] : undefined;
+ const subtitle = matches.length - 1 >= config.extractSubTitleGroup ? matches[config.extractSubTitleGroup] : undefined;
+
+ if (config.extractMetadataNameAndGroup) {
+ for (const element of config.extractMetadataNameAndGroup) {
+ if (matches.length - 1 >= element.group) {
+ base.metadata[element.metadataName] = matches[element.group];
+ }
+ }
+ }
+
+ return {
+ ...base,
+ ...(title && { title }),
+ ...(subtitle && { subtitle }),
+ };
+}
+
+export default function KafkaTopics() {
+ const [isLoading, setIsLoading] = useState(false);
+ const [withDetails, setWithDetails] = useState(false);
+ const [env, setEnv] = useCachedState("env", getEnvs()[0]);
+ const [topics, setTopics] = useState([]);
+
+ const load = useCallback(async (env: string) => {
+ console.info("Get kafka topics for env:", env);
+ setIsLoading(true);
+ try {
+ const admin = await buildAdmin(env);
+ const filterTopics = getConfig(env).filterTopics;
+ const topics = (await admin.listTopics())
+ .filter((topic) => {
+ if (!filterTopics) {
+ return true;
+ }
+ return filterTopics.some((filterTopic) => topic.includes(filterTopic));
+ })
+ .sort((a, b) => a.localeCompare(b, "en", { sensitivity: "base" }));
+ setTopics(
+ topics.map((topic) => ({
+ ...buildNamesAndMetadata(topic),
+ compacted: Compacted.loading,
+ config: [],
+ }))
+ );
+
+ const metadata = new Map(
+ (
+ await admin.fetchTopicMetadata({
+ topics: topics,
+ })
+ ).topics.map((topic) => [topic.name, topic.partitions.length])
+ );
+ setTopics(
+ topics.map((topic) => ({
+ ...buildNamesAndMetadata(topic),
+ compacted: Compacted.loading,
+ config: [],
+ partitions: metadata.get(topic),
+ }))
+ );
+
+ for (const topic of topics) {
+ let compacted: Compacted;
+ let config: ConfigEntry[] = [];
+
+ try {
+ const kafkaConfig = await admin.describeConfigs({
+ resources: [{ type: ConfigResourceTypes.TOPIC, name: topic }],
+ includeSynonyms: false,
+ });
+
+ config = kafkaConfig.resources[0].configEntries
+ .map((configEntry: ConfigEntries) => {
+ const entry = {
+ name: configEntry.configName,
+ value: configEntry.configValue,
+ };
+ if (configEntry.configName === "cleanup.policy") {
+ compacted = configEntry.configValue === "compact" ? Compacted.compacted : Compacted.not_compacted;
+ }
+ return entry;
+ })
+ .sort((a, b) => a.name.localeCompare(b.name));
+ } catch (e) {
+ compacted = Compacted.unauthorized;
+ }
+
+ setTopics((existingItems) => {
+ return existingItems.map((value) => {
+ const newValue = JSON.parse(JSON.stringify(value));
+ if (newValue.name === topic) {
+ newValue.compacted = compacted;
+ newValue.config = config;
+ }
+ return newValue;
+ });
+ });
+ }
+ } catch (e) {
+ console.error("Unable to get kafka topics", e);
+ } finally {
+ setIsLoading(false);
+ }
+ }, []);
+
+ useEffect(() => {
+ load(env).finally(() => console.debug("Init done"));
+ }, [env, load]);
+
+ return (
+ setEnv(newValue)}>
+
+ {getEnvs().map((env) => (
+
+ ))}
+
+
+ }
+ >
+
+ {topics.map((topic) => (
+
+
+
+
+
+ {Object.keys(topic.metadata).length > 0 && (
+ <>
+
+ {topic.title && }
+ {topic.subtitle && }
+ {Object.keys(topic.metadata).map((key) => (
+
+ ))}
+
+ >
+ )}
+ {topic.config.length > 0 && (
+ <>
+
+ {topic.config.map((entry) => (
+
+ ))}
+ >
+ )}
+
+ }
+ />
+ }
+ accessories={getAccessories(topic)}
+ actions={
+
+
+ await load(env)}
+ />
+ setWithDetails((withDetails) => !withDetails)}
+ />
+
+ }
+ />
+ ))}
+
+
+ );
+}
diff --git a/extensions/kafka/src/kafka-menubar.tsx b/extensions/kafka/src/kafka-menubar.tsx
new file mode 100644
index 0000000000000..0e4da1fbe4ede
--- /dev/null
+++ b/extensions/kafka/src/kafka-menubar.tsx
@@ -0,0 +1,188 @@
+import {
+ Cache,
+ Clipboard,
+ environment,
+ getPreferenceValues,
+ Icon,
+ LaunchType,
+ MenuBarExtra,
+ openCommandPreferences,
+} from "@raycast/api";
+import { ConsumerGroupState, GroupDescription, GroupOverview } from "kafkajs";
+import { useCallback, useEffect, useState } from "react";
+import { useCachedState } from "@raycast/utils";
+import { buildAdmin, getConfig, getConsumerInfo, getEnvs } from "./utils";
+import moment from "moment";
+
+type State = "Loaded" | "NotLoaded" | "Loading";
+
+interface ConsumerInfo {
+ groupId: string;
+ state: ConsumerGroupState;
+ members: number;
+ overall: number;
+}
+
+interface KafkaMenuBarPreferences {
+ hideWithoutLag: boolean;
+}
+
+const preferences = getPreferenceValues();
+
+const cacheNamespace = "kafka-menubar";
+const cacheKeyLag = "kafkaLagConsumers";
+const cacheKeyLastUpdate = "kafkaLagLastUpdate";
+const cache = new Cache({ namespace: cacheNamespace });
+
+export function MenuConsumer(props: { consumer: ConsumerInfo }) {
+ return (
+
+ await Clipboard.copy(event.type === "left-click" ? props.consumer.groupId : props.consumer.overall)
+ }
+ />
+ );
+}
+
+export default function KafkaLag() {
+ const [consumers, setConsumers] = useState([]);
+ const [isLoading, setIsLoading] = useState(true);
+ const [env, setEnv] = useCachedState("env", getEnvs()[0]);
+ const [state, setState] = useCachedState("state", "NotLoaded");
+
+ const load = useCallback(
+ async (launchType: LaunchType, env: string) => {
+ const start = moment.now();
+ if (launchType === LaunchType.UserInitiated) {
+ const cached = cache.get(cacheKeyLag);
+ if (cached) {
+ const fromCache = JSON.parse(cached);
+ setConsumers(fromCache);
+ }
+ return;
+ }
+ console.info("[background] get kafka consumers for env:", env);
+ setIsLoading(true);
+ setState("Loading");
+ try {
+ const admin = await buildAdmin(env);
+ const filterConsumers = getConfig(env).filterConsumers;
+
+ const groupIds = (await admin.listGroups()).groups
+ .map((group: GroupOverview) => group.groupId)
+ .filter((groupId) => {
+ if (!filterConsumers) {
+ return true;
+ }
+ return filterConsumers.some((filterConsumer) => groupId.includes(filterConsumer));
+ });
+
+ const groups = new Map(
+ (await admin.describeGroups(groupIds)).groups.map((group) => [group.groupId, group])
+ );
+
+ let consumers: ConsumerInfo[] = [];
+ // compute lag
+ for (const groupId of groupIds) {
+ const lagInfo = await getConsumerInfo(admin, groupId);
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const description = groups.get(groupId)!;
+ consumers.push({
+ groupId,
+ state: description.state,
+ members: description.members.length,
+ overall: lagInfo.overallLag,
+ });
+ }
+
+ consumers = consumers.sort((a, b) => {
+ if (b.overall === a.overall) {
+ return a.groupId.localeCompare(b.groupId);
+ }
+ return b.overall - a.overall;
+ });
+
+ cache.set(cacheKeyLag, JSON.stringify(consumers));
+ cache.set(cacheKeyLastUpdate, moment(new Date()).format("HH:mm:ss"));
+ setState("Loaded");
+ setConsumers(consumers);
+ } catch (e) {
+ console.error("Unable to get kafka consumers", e);
+ } finally {
+ setIsLoading(false);
+ console.info(`[background] done in ${moment.utc(moment.now() - start).format("mm:ss.SSS")}`);
+ }
+ },
+ [setState]
+ );
+
+ useEffect(() => {
+ load(environment.launchType, env).finally(() => setIsLoading(false));
+ }, [env, load, setIsLoading]);
+
+ return (
+
+
+
+ {getEnvs().map((e) => (
+ {
+ setConsumers([]);
+ cache.remove(cacheKeyLag);
+ setState("NotLoaded");
+ setEnv(e);
+ }}
+ />
+ ))}
+
+
+ {state !== "Loading" && (
+ {
+ setIsLoading(true);
+ await load(LaunchType.Background, env).finally(() => setIsLoading(false));
+ }}
+ />
+ )}
+ {state === "Loading" && }
+ {consumers.length > 0 && (
+ <>
+
+ {consumers.filter((consumer) => consumer.overall > 0).length === 0 && (
+ {
+ console.info("No consumers with lag");
+ }}
+ />
+ )}
+ {consumers
+ .filter((consumer) => consumer.overall > 0)
+ .map((consumer) => (
+
+ ))}
+ {!preferences.hideWithoutLag && (
+ <>
+
+ {consumers
+ .filter((consumer) => consumer.overall === 0)
+ .map((consumer) => (
+
+ ))}
+ >
+ )}
+ >
+ )}
+
+ );
+}
diff --git a/extensions/kafka/src/kafka.tsx b/extensions/kafka/src/kafka.tsx
new file mode 100644
index 0000000000000..e229bb75aa829
--- /dev/null
+++ b/extensions/kafka/src/kafka.tsx
@@ -0,0 +1,25 @@
+import { Action, ActionPanel, Icon, List } from "@raycast/api";
+import KafkaTopics from "./components/topics";
+import KafkaConsumers from "./components/consumers";
+
+export default function Kafka() {
+ return (
+
+ {[
+ { type: "Topics", icon: Icon.List, to: },
+ { type: "Consumers", icon: Icon.PersonLines, to: },
+ ].map(({ type, icon, to }) => (
+
+
+
+ }
+ />
+ ))}
+
+ );
+}
diff --git a/extensions/kafka/src/utils.ts b/extensions/kafka/src/utils.ts
new file mode 100644
index 0000000000000..025be531dff2a
--- /dev/null
+++ b/extensions/kafka/src/utils.ts
@@ -0,0 +1,111 @@
+import { getPreferenceValues } from "@raycast/api";
+import { Admin, Kafka, KafkaConfig, logLevel } from "kafkajs";
+import * as fs from "fs";
+
+export interface KafkaEnv {
+ name: string;
+ filterTopics?: string[];
+ filterConsumers?: string[];
+ kafkaJs: KafkaConfig;
+}
+
+export interface KafkaPreferences {
+ configDirectory: string;
+ extractRegex: string;
+ extractTitleGroup: string;
+ extractSubTitleGroup: string;
+ extractMetadataNameAndGroup: string;
+}
+
+export interface LagInfo {
+ topicName: string;
+ overallLag: number;
+}
+
+const preferences = getPreferenceValues();
+const envs = new Map();
+const files = fs.readdirSync(preferences.configDirectory, "utf-8");
+for (const file of files) {
+ const env = JSON.parse(fs.readFileSync(preferences.configDirectory + "/" + file).toString());
+ envs.set(env.name, env);
+}
+
+const admins: Record = {};
+
+export function getExtractConfig() {
+ if (preferences.extractRegex) {
+ return {
+ regex: new RegExp(preferences.extractRegex),
+ extractTitleGroup: Number(preferences.extractTitleGroup),
+ extractSubTitleGroup: Number(preferences.extractSubTitleGroup),
+ extractMetadataNameAndGroup: preferences.extractMetadataNameAndGroup.split(",").map((split) => ({
+ metadataName: split.split("=")[0],
+ group: Number(split.split("=")[1]),
+ })),
+ };
+ }
+ return null;
+}
+
+export function getEnvs(): string[] {
+ return Array.from(envs.keys());
+}
+
+export function getConfig(env: string): KafkaEnv {
+ const conf = envs.get(env);
+ if (!conf) {
+ throw new Error("Unknown env : " + env);
+ }
+ if (!conf.kafkaJs.connectionTimeout) {
+ conf.kafkaJs.connectionTimeout = 10000;
+ }
+ if (!conf.kafkaJs.requestTimeout) {
+ conf.kafkaJs.requestTimeout = 30000;
+ }
+ if (!conf.kafkaJs.ssl) {
+ conf.kafkaJs.ssl = {
+ rejectUnauthorized: false,
+ };
+ }
+ conf.kafkaJs.logLevel = logLevel.ERROR;
+ return conf;
+}
+
+export async function buildAdmin(env: string): Promise {
+ if (admins[env]) {
+ return admins[env];
+ }
+ const conf = getConfig(env);
+ const kafka = new Kafka(conf.kafkaJs);
+ const admin = kafka.admin();
+ await admin.connect();
+ admins[env] = admin;
+ return admin;
+}
+
+export async function getConsumerInfo(admin: Admin, groupId: string): Promise {
+ const consumerOffsets = await admin.fetchOffsets({
+ groupId: groupId,
+ });
+
+ let overallLag = 0;
+ let topicName = "";
+
+ for (const consumerOffset of consumerOffsets) {
+ topicName = consumerOffset.topic;
+ const topicOffset = await admin.fetchTopicOffsets(topicName);
+
+ let topicLag = 0;
+
+ for (const partition of consumerOffset.partitions) {
+ const current = +partition.offset;
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const end = +topicOffset.find((value) => value.partition === partition.partition)!.offset;
+ const partitionLag = end - current;
+ topicLag += partitionLag;
+ }
+
+ overallLag += topicLag;
+ }
+ return { topicName, overallLag };
+}
diff --git a/extensions/kafka/tsconfig.json b/extensions/kafka/tsconfig.json
new file mode 100644
index 0000000000000..43325b172114d
--- /dev/null
+++ b/extensions/kafka/tsconfig.json
@@ -0,0 +1,6 @@
+{
+ "extends": "../../tsconfig.base.json",
+ "include": [
+ "./src/**/*"
+ ]
+}