Skip to content

Commit

Permalink
Command exchanged on so-topic
Browse files Browse the repository at this point in the history
  • Loading branch information
Frankccv committed Oct 2, 2023
1 parent 00fb33a commit e6db377
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 39 deletions.
5 changes: 1 addition & 4 deletions www/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
"client_name": "mmt-operator-kafka-client"
},
"master_node":{
"host":"localhost",
"port":"22",
"password":"12345",
"username":"frank",

"command": "kubectl exec amf-45-ipds-0 -n ath-cmm-45 -- bash -c \"nft insert rule ip filter INPUT ip daddr IP_ATT drop\""

},
Expand Down
7 changes: 4 additions & 3 deletions www/reportReader/busReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const constant = require('../libs/constant.js');
const ProcessMessage = require("./ProcessMessage");
const DataBase = require("./DataBase.js");
const DBInserter = require("./DBInserter.js");
const { MongoClient } = require('mongodb');
const { MongoClient }= require('mongodb');
const database = new DataBase();
const processMessage = new ProcessMessage( database );

Expand Down Expand Up @@ -54,11 +54,11 @@ async function queryIpMongo( attackId ) {

// Perform the aggregation query
var result = await db.collection('security').aggregate(pipeline, {cursor : { }} ).toArray();
if (result.length > 0) {
console.log("Mongo ip result "+result ) ;
if ( result.length > 0 ) {
ipAttacker = result[0].ipSrcValue;

}


} catch (err) {
console.error('Error:', err);
Expand Down Expand Up @@ -145,6 +145,7 @@ report_client_miugio.on('message', async function ( channel,message) {
catch (error) {
// Handle the exception here
console.error('An error occcurred:', error);

}


Expand Down
125 changes: 93 additions & 32 deletions www/routes/sancus/remediation.js
Original file line number Diff line number Diff line change
@@ -1,52 +1,114 @@
var express = require('express');
var router = express.Router();
const config = require("../../libs/config");
const SSHClient = require("../../libs/client_ssh.js");

pub_sub = require("../../libs/kafka");


//Todo:produce message on specific topic
//Pass the message through the route
const { Kafka } = require('kafkajs');

const connSettings = {
host: config.master_node.host,//ip master node
port: config.master_node.port, // Default SSH port
username: config.master_node.username,
password: config.master_node.password // Or you can use key-based authentication
};
async function produceMessage(msg) {
// Create a new Kafka instance
const kafka = new Kafka({
clientId: 'my-app',
brokers: [config.kafka_input.host+':'+config.kafka_input.port] // Replace with your Kafka broker(s) address
});

// Create a producer
const producer = kafka.producer({ createPartitioner: Kafka.DefaultPartitioner });

// Connect the producer to the Kafka cluster
await producer.connect();//In JavaScript, the await keyword is used to pause the execution of an asynchronous function until a promise is resolved or rejected. It can only be used inside an async function.
let result=false;
try {
// Create a message object with the desired payload and topic
console.log("subscribed to "+config.kafka_input.orchestrator_topic);

// Publish the message to the Kafka topic
await producer.send({
topic: config.kafka_input.orchestrator_topic,//Orchestrator-Topic
messages: [{
value: msg
}]
});
console.log('Message published successfully to orchestrator');
result=true;
return result;
} catch (error) {
console.error('Error publishing message to orchestrator:', error);
result=false;

} finally {
// Disconnect the producer from the Kafka cluster
await producer.disconnect();
console.log("result in function "+result)
return result;
}
}


router.get("",async function(req, res) {
try{
console.log("Received "+req.query.cid);
//produceMessage();
//_publishMessage( "testTopic", "ciao" )
var scriptCode = config.master_node.command;
const command_ip = scriptCode.replace(/IP_ATT/g, req.query.IP);
console.log("Command "+command_ip);
var result=await produceMessage( command_ip );

// publisher.publish( "testTopic", "Hello Kafkabus");
console.log("Remediation.js server");
console.log("Result "+result)
// res.sendFile('index.html', { root: __dirname + "../views/" } )
if(result==true){
res.status(200).end()//204: The server has successfully fulfilled the request and that there is no additional content to send in the response payload body.

// res.status(202).setHeader("Content-Type", "application/json");
//res.sendFile('index.html', { root: __dirname + "../views/" } )

// res.send({message: "Message correctly published on kafkabus "});
}
else
res.status(500).send( "Error:Message not published on KafkaBus" );

// res.status(400).end()//204: The server has not fulfilled the request and that there is no additional content to send in the response payload body.
}catch(error ){
console.error('Error publishing message to orchestrator:', error);

res.status(500).send( "Error:Message not published on KafkaBus" );

}
});

router.post("",async function(req, res) {
console.log("Received "+req.query.CID+" "+ req.query.IP );
// var scriptCode = `kubectl exec amf-45-ipds-0 -n ath-cmm-45 -- bash -c "nft insert rule ip filter INPUT ip daddr `+ req.query.IP +` drop"`;
var scriptCode = config.master_node.command;
const command_ip = scriptCode.replace(/IP_ATT/g, req.query.IP);
console.log("Command "+command_ip );
//produceMessage();
//_publishMessage( "testTopic", "ciao" )
//var result=await produceMessage(req.query.CID);
const ssh = new SSHClient();
try{
ssh.connect(connSettings).then( () =>{
ssh.executeCommand ( command_ip ) .then(() => {
console.log('Command kubectl executed successfully');
res.status(204).end()//204: The server has successfully fulfilled the request and that there is no additional content to send in the response payload body.
ssh.disconnect();

} )
.catch((err) => {
var scriptCode = config.master_node.command;
const command_ip = scriptCode.replace(/IP_ATT/g, req.query.IP);
console.log("Command "+command_ip);
var result=await produceMessage( command_ip );
// publisher.publish( "testTopic", "Hello Kafkabus");
console.log("Remediation.js server");
console.log("Result "+result)
// res.sendFile('index.html', { root: __dirname + "../views/" } )
if(result==true){
res.status(204).end()//204: The server has successfully fulfilled the request and that there is no additional content to send in the response payload body.

console.error('Command execution failed');
res.status(500).send( "Error: Script not execute on Nokia firewall" );
ssh.disconnect();
// res.status(202).setHeader("Content-Type", "application/json");
//res.sendFile('index.html', { root: __dirname + "../views/" } )

});
});
//res.send({message: "Message correctly published on kafkabus "});
///MMTDrop.alert.success("your message here", 10*1000);
}
else
res.status(500).send( "Error:Message not published on KafkaBus" );

}catch(err){
console.error(err);
// res.status(400).end()//204: The server has not fulfilled the request and that there is no additional content to send in the response payload body.

}

});


Expand All @@ -61,4 +123,3 @@ module.exports = router;




0 comments on commit e6db377

Please sign in to comment.