diff --git a/www/config.json b/www/config.json index 0b7e33ba..58c62b99 100644 --- a/www/config.json +++ b/www/config.json @@ -18,10 +18,7 @@ "client_name": "mmt-operator-kafka-client" }, "master_node":{ - "host":"localhost", - "port":"22", - "password":"12345", - "username":"frank", + "command": "kubectl exec amf-45-ipds-0 -n ath-cmm-45 -- bash -c \"nft insert rule ip filter INPUT ip daddr IP_ATT drop\"" }, diff --git a/www/reportReader/busReader.js b/www/reportReader/busReader.js index 2438b8c4..67b52719 100644 --- a/www/reportReader/busReader.js +++ b/www/reportReader/busReader.js @@ -6,7 +6,7 @@ const constant = require('../libs/constant.js'); const ProcessMessage = require("./ProcessMessage"); const DataBase = require("./DataBase.js"); const DBInserter = require("./DBInserter.js"); -const { MongoClient } = require('mongodb'); +const { MongoClient }= require('mongodb'); const database = new DataBase(); const processMessage = new ProcessMessage( database ); @@ -54,11 +54,11 @@ async function queryIpMongo( attackId ) { // Perform the aggregation query var result = await db.collection('security').aggregate(pipeline, {cursor : { }} ).toArray(); - if (result.length > 0) { + console.log("Mongo ip result "+result ) ; + if ( result.length > 0 ) { ipAttacker = result[0].ipSrcValue; } - } catch (err) { console.error('Error:', err); @@ -145,6 +145,7 @@ report_client_miugio.on('message', async function ( channel,message) { catch (error) { // Handle the exception here console.error('An error occcurred:', error); + } diff --git a/www/routes/sancus/remediation.js b/www/routes/sancus/remediation.js index 06c31230..4557bc49 100644 --- a/www/routes/sancus/remediation.js +++ b/www/routes/sancus/remediation.js @@ -1,52 +1,114 @@ var express = require('express'); var router = express.Router(); const config = require("../../libs/config"); -const SSHClient = require("../../libs/client_ssh.js"); +pub_sub = require("../../libs/kafka"); +//Todo:produce message on specific topic +//Pass the message through the route +const { Kafka } = require('kafkajs'); -const connSettings = { - host: config.master_node.host,//ip master node - port: config.master_node.port, // Default SSH port - username: config.master_node.username, - password: config.master_node.password // Or you can use key-based authentication -}; +async function produceMessage(msg) { + // Create a new Kafka instance + const kafka = new Kafka({ + clientId: 'my-app', + brokers: [config.kafka_input.host+':'+config.kafka_input.port] // Replace with your Kafka broker(s) address + }); + // Create a producer + const producer = kafka.producer({ createPartitioner: Kafka.DefaultPartitioner }); + + // Connect the producer to the Kafka cluster + await producer.connect();//In JavaScript, the await keyword is used to pause the execution of an asynchronous function until a promise is resolved or rejected. It can only be used inside an async function. + let result=false; + try { + // Create a message object with the desired payload and topic + console.log("subscribed to "+config.kafka_input.orchestrator_topic); + + // Publish the message to the Kafka topic + await producer.send({ + topic: config.kafka_input.orchestrator_topic,//Orchestrator-Topic + messages: [{ + value: msg + }] + }); + console.log('Message published successfully to orchestrator'); + result=true; + return result; + } catch (error) { + console.error('Error publishing message to orchestrator:', error); + result=false; + + } finally { + // Disconnect the producer from the Kafka cluster + await producer.disconnect(); + console.log("result in function "+result) + return result; + } +} + + +router.get("",async function(req, res) { + try{ + console.log("Received "+req.query.cid); + //produceMessage(); + //_publishMessage( "testTopic", "ciao" ) + var scriptCode = config.master_node.command; + const command_ip = scriptCode.replace(/IP_ATT/g, req.query.IP); + console.log("Command "+command_ip); + var result=await produceMessage( command_ip ); + + // publisher.publish( "testTopic", "Hello Kafkabus"); + console.log("Remediation.js server"); + console.log("Result "+result) + // res.sendFile('index.html', { root: __dirname + "../views/" } ) + if(result==true){ + res.status(200).end()//204: The server has successfully fulfilled the request and that there is no additional content to send in the response payload body. + + // res.status(202).setHeader("Content-Type", "application/json"); + //res.sendFile('index.html', { root: __dirname + "../views/" } ) + + // res.send({message: "Message correctly published on kafkabus "}); + } + else + res.status(500).send( "Error:Message not published on KafkaBus" ); + + // res.status(400).end()//204: The server has not fulfilled the request and that there is no additional content to send in the response payload body. + }catch(error ){ + console.error('Error publishing message to orchestrator:', error); + + res.status(500).send( "Error:Message not published on KafkaBus" ); + } +}); router.post("",async function(req, res) { console.log("Received "+req.query.CID+" "+ req.query.IP ); - // var scriptCode = `kubectl exec amf-45-ipds-0 -n ath-cmm-45 -- bash -c "nft insert rule ip filter INPUT ip daddr `+ req.query.IP +` drop"`; - var scriptCode = config.master_node.command; - const command_ip = scriptCode.replace(/IP_ATT/g, req.query.IP); - console.log("Command "+command_ip ); //produceMessage(); //_publishMessage( "testTopic", "ciao" ) - //var result=await produceMessage(req.query.CID); - const ssh = new SSHClient(); - try{ - ssh.connect(connSettings).then( () =>{ - ssh.executeCommand ( command_ip ) .then(() => { - console.log('Command kubectl executed successfully'); - res.status(204).end()//204: The server has successfully fulfilled the request and that there is no additional content to send in the response payload body. - ssh.disconnect(); - - } ) - .catch((err) => { + var scriptCode = config.master_node.command; + const command_ip = scriptCode.replace(/IP_ATT/g, req.query.IP); + console.log("Command "+command_ip); + var result=await produceMessage( command_ip ); + // publisher.publish( "testTopic", "Hello Kafkabus"); + console.log("Remediation.js server"); + console.log("Result "+result) + // res.sendFile('index.html', { root: __dirname + "../views/" } ) + if(result==true){ + res.status(204).end()//204: The server has successfully fulfilled the request and that there is no additional content to send in the response payload body. - console.error('Command execution failed'); - res.status(500).send( "Error: Script not execute on Nokia firewall" ); - ssh.disconnect(); + // res.status(202).setHeader("Content-Type", "application/json"); + //res.sendFile('index.html', { root: __dirname + "../views/" } ) - }); - }); + //res.send({message: "Message correctly published on kafkabus "}); + ///MMTDrop.alert.success("your message here", 10*1000); + } + else + res.status(500).send( "Error:Message not published on KafkaBus" ); - }catch(err){ - console.error(err); + // res.status(400).end()//204: The server has not fulfilled the request and that there is no additional content to send in the response payload body. - } - }); @@ -61,4 +123,3 @@ module.exports = router; -