From b91d38fe81711973eda90b00a046097d05f7d197 Mon Sep 17 00:00:00 2001 From: HTGAzureX1212 <39023054+HTG-YT@users.noreply.github.com> Date: Thu, 9 Feb 2023 21:01:29 +0800 Subject: [PATCH] remove kafka component --- discord-frontend/Cargo.lock | 2 - .../hartex-discord-commands/Cargo.toml | 3 -- .../src/general/uptime.rs | 34 --------------- .../hartex-discord-leader/src/main.rs | 13 ------ .../hartex-discord-leader/src/outbound.rs | 41 ------------------- 5 files changed, 93 deletions(-) delete mode 100644 discord-frontend/hartex-discord-leader/src/outbound.rs diff --git a/discord-frontend/Cargo.lock b/discord-frontend/Cargo.lock index ee2e1a768..7aa383ae2 100644 --- a/discord-frontend/Cargo.lock +++ b/discord-frontend/Cargo.lock @@ -621,10 +621,8 @@ dependencies = [ "hartex_discord_core", "hartex_discord_utils", "hartex_eyre", - "hartex_kafka_utils", "hartex_localization_core", "hartex_localization_macros", - "rdkafka", ] [[package]] diff --git a/discord-frontend/hartex-discord-commands/Cargo.toml b/discord-frontend/hartex-discord-commands/Cargo.toml index 2b0905852..894b64c41 100644 --- a/discord-frontend/hartex-discord-commands/Cargo.toml +++ b/discord-frontend/hartex-discord-commands/Cargo.toml @@ -19,8 +19,5 @@ hartex_localization_core = { path = "../../localization/hartex-localization-core hartex_localization_macros = { path = "../../localization/hartex-localization-macros" } hartex_discord_utils = { path = "../../rust-utilities/hartex-discord-utils" } -hartex_kafka_utils = { path = "../../rust-utilities/hartex-kafka-utils" } - -rdkafka = { version = "0.29.0", default-features = false, features = ["cmake-build", "external-lz4", "tokio"] } [features] diff --git a/discord-frontend/hartex-discord-commands/src/general/uptime.rs b/discord-frontend/hartex-discord-commands/src/general/uptime.rs index 3b33a0353..2a5d199f8 100644 --- a/discord-frontend/hartex-discord-commands/src/general/uptime.rs +++ b/discord-frontend/hartex-discord-commands/src/general/uptime.rs @@ -20,9 +20,6 @@ * with HarTex. If not, see . */ -use std::env; -use std::time::Duration; - use hartex_discord_commands_core::traits::Command; use hartex_discord_commands_core::CommandMetadata; use hartex_discord_core::discord::model::application::interaction::Interaction; @@ -30,13 +27,6 @@ use hartex_discord_core::discord::model::http::interaction::InteractionResponse; use hartex_discord_core::discord::model::http::interaction::InteractionResponseType; use hartex_discord_core::discord::util::builder::InteractionResponseDataBuilder; use hartex_discord_utils::CLIENT; -use hartex_eyre::eyre::Report; -use hartex_kafka_utils::traits::ClientConfigUtils; -use hartex_kafka_utils::types::CompressionType; -use rdkafka::producer::FutureProducer; -use rdkafka::producer::FutureRecord; -use rdkafka::util::Timeout; -use rdkafka::ClientConfig; #[derive(CommandMetadata)] #[metadata(command_type = 1)] @@ -47,30 +37,6 @@ pub struct Uptime; impl Command for Uptime { async fn execute(&self, interaction: Interaction) -> hartex_eyre::Result<()> { let interaction_client = CLIENT.interaction(interaction.application_id); - let bootstrap_servers = env::var("KAFKA_BOOTSTRAP_SERVERS")? - .split(';') - .map(String::from) - .collect::>(); - let producer = ClientConfig::new() - .bootstrap_servers(bootstrap_servers.into_iter()) - .compression_type(CompressionType::Lz4) - .delivery_timeout_ms(30000) - .create::()?; - let topic = env::var("KAFKA_TOPIC_OUTBOUND_COMMUNICATION")?; - - let bytes = b"uptime"; - - if let Err((error, _)) = producer - .send( - FutureRecord::to(&topic) - .key("OUTBOUND_COMMUNICATION") - .payload(bytes), - Timeout::After(Duration::from_secs(0)), - ) - .await - { - return Err(Report::new(error)); - } interaction_client .create_response( diff --git a/discord-frontend/hartex-discord-leader/src/main.rs b/discord-frontend/hartex-discord-leader/src/main.rs index 801f5f29c..02513b71b 100644 --- a/discord-frontend/hartex-discord-leader/src/main.rs +++ b/discord-frontend/hartex-discord-leader/src/main.rs @@ -38,13 +38,10 @@ use hartex_discord_core::tokio::sync::watch; use hartex_discord_core::tokio::time; use hartex_kafka_utils::traits::ClientConfigUtils; use hartex_kafka_utils::types::CompressionType; -use rdkafka::consumer::Consumer; -use rdkafka::consumer::StreamConsumer; use rdkafka::producer::FutureProducer; use rdkafka::ClientConfig; mod inbound; -mod outbound; mod queue; mod shards; @@ -60,15 +57,6 @@ pub async fn main() -> hartex_eyre::Result<()> { .split(';') .map(String::from) .collect::>(); - let topic = env::var("KAFKA_TOPIC_OUTBOUND_COMMUNICATION")?; - - let consumer = ClientConfig::new() - .bootstrap_servers(bootstrap_servers.clone().into_iter()) - .compression_type(CompressionType::Lz4) - .group_id("com.github.teamhartex.hartex.outbound.communication.consumer") - .create::()?; - - consumer.subscribe(&[&topic])?; let producer = ClientConfig::new() .bootstrap_servers(bootstrap_servers.into_iter()) @@ -89,7 +77,6 @@ pub async fn main() -> hartex_eyre::Result<()> { tokio::spawn(async move { tokio::select! { _ = inbound::handle(shards.iter_mut(), producer) => {}, - _ = outbound::consume(consumer) => {}, _ = rx.changed() => { future::join_all(shards.iter_mut().map(|shard: &mut Shard| async move { shard.close(CloseFrame::RESUME).await diff --git a/discord-frontend/hartex-discord-leader/src/outbound.rs b/discord-frontend/hartex-discord-leader/src/outbound.rs deleted file mode 100644 index 54f27776f..000000000 --- a/discord-frontend/hartex-discord-leader/src/outbound.rs +++ /dev/null @@ -1,41 +0,0 @@ -/* - * SPDX-License-Identifier: AGPL-3.0-only - * - * This file is part of HarTex. - * - * HarTex - * Copyright (c) 2021-2023 HarTex Project Developers - * - * HarTex is free software; you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * HarTex is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License along - * with HarTex. If not, see . - */ - -use futures_util::StreamExt; -use hartex_eyre::eyre::Report; -use rdkafka::consumer::StreamConsumer; -use rdkafka::Message; - -pub async fn consume(consumer: StreamConsumer) -> hartex_eyre::Result<()> { - while let Some(result) = consumer.stream().next().await { - let Ok(message) = result else { - let error = result.unwrap_err(); - println!("{:?}", Report::new(error)); - - continue; - }; - - let _ = message.payload().unwrap(); - } - - Ok(()) -}