Skip to content

Commit

Permalink
Added Partially support for topicPublished in MQTT when messages reta…
Browse files Browse the repository at this point in the history
…ined. Fixed thread memory overflow error.
  • Loading branch information
irinil committed Jul 6, 2020
1 parent 650df00 commit 8d73074
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 45 deletions.
11 changes: 5 additions & 6 deletions src/main/java/de/tudarmstadt/informatik/hostage/Hostage.java
Expand Up @@ -71,7 +71,7 @@ public class Hostage extends Service {

private HashMap<String, Boolean> mProtocolActiveAttacks;
MultiStageAlarm alarm = new MultiStageAlarm();
DaoSession dbSession;
private DaoSession dbSession;
public static int prefix;

public class LocalBinder extends Binder {
Expand Down Expand Up @@ -306,7 +306,7 @@ public void onCreate() {
implementedProtocols = getImplementedProtocols();
connectionInfo = getSharedPreferences(getString(R.string.connection_info), Context.MODE_PRIVATE);
connectionInfoEditor = connectionInfo.edit();
connectionInfoEditor.commit();
connectionInfoEditor.apply();

mProtocolActiveAttacks = new HashMap<String, Boolean>();

Expand Down Expand Up @@ -552,17 +552,16 @@ private synchronized void createNotification() {
boolean activeHandlers = false;
boolean bssidSeen = false;
boolean listening = false;
Iterator<Listener> iterator = listeners.iterator();
while(iterator.hasNext()) {
Listener listener = iterator.next();
for (Listener listener : listeners) {
if (listener.isRunning())
listening = true;
if (listener.getHandlerCount() > 0) {
activeHandlers = true;
}
if (attackRecordDAO.bssidSeen(listener.getProtocolName(), getBSSID(getApplicationContext()))) {
bssidSeen = true;
} }
}
}


PendingIntent resultPendingIntent = intentNotificationGenerator();
Expand Down
Expand Up @@ -246,6 +246,7 @@ private void fullHandler() throws IOException {
Socket client = server.accept();
Thread socketsThread = socketsThread(client);
threadPool.submit(socketsThread);
threadPool.shutdown();
}
}

Expand Down
Expand Up @@ -3,12 +3,16 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

import de.tudarmstadt.informatik.hostage.protocol.MQTT;
import de.tudarmstadt.informatik.hostage.protocol.Protocol;
import de.tudarmstadt.informatik.hostage.protocol.mqttUtils.MQTTHandler;

import static de.tudarmstadt.informatik.hostage.protocol.mqttUtils.MQTTHandler.isTopicPublished;

public class MQTTListener extends Listener {
private ArrayList<Handler> handlers = new ArrayList<Handler>();
private Thread thread;
Expand Down Expand Up @@ -114,8 +118,11 @@ public boolean stopMqttBroker(){

private void fullHandler() throws IOException {
if (conReg.isConnectionFree()) {
ExecutorService threadPool = Executors.newFixedThreadPool(1);

Thread brokerThread = brokerThread();
brokerThread.start();
threadPool.submit(brokerThread);
threadPool.shutdown();

}
}
Expand All @@ -138,9 +145,9 @@ public void run() {
if (ConnectionGuard.portscanInProgress())
return;

//isTopicPublished(); //The record wont get updated.
if(MQTTHandler.isAnAttackOngoing()) {
startHandler();

conReg.newOpenConnection();
}
} catch (Exception e) {
Expand Down
Expand Up @@ -25,6 +25,17 @@ public DAOHelper(DaoSession daoSession, Context context){

}

public DAOHelper(DaoSession daoSession){
this.daoSession= daoSession;
this.messageRecordDAO = new MessageRecordDAO(daoSession);
this.networkRecordDAO = new NetworkRecordDAO(daoSession);
this.profileDAO = new ProfileDAO(daoSession);
this.syncInfoRecordDAO = new SyncInfoRecordDAO(daoSession);
this.attackRecordDAO = new AttackRecordDAO(daoSession);
this.syncDeviceDAO = new SyncDeviceDAO(daoSession);

}

public DaoSession getDaoSession() {
return daoSession;
}
Expand Down
Expand Up @@ -46,6 +46,30 @@ public void insertMessageRecords(List<MessageRecord> records){

}

/**
* Returns Last Inserted Record.
* @return
*/

public MessageRecord getLastedInsertedRecord(){
MessageRecordDao recordDao = this.daoSession.getMessageRecordDao();
MessageRecord record = new MessageRecord();

List<MessageRecord> messageRecords = recordDao.queryBuilder()
.orderAsc(MessageRecordDao.Properties.Id)
.limit(1)
.list();
if(!messageRecords.isEmpty()){
return messageRecords.get(0);
}
return record;
}

public void updateRecord(MessageRecord record){
MessageRecordDao recordDao = this.daoSession.getMessageRecordDao();

updateElement(recordDao,record);
}

public ArrayList<MessageRecord> getAllMessageRecords(){
MessageRecordDao recordDao = this.daoSession.getMessageRecordDao();
Expand Down
Expand Up @@ -113,9 +113,7 @@ private List<Packet> processRequest(byte[] request,int requestType) {
switch (requestType){

case MODBUS_SERVICE:
// responsePackets.add(new Packet(request,getDeviceInfo()));
responsePackets.add(new Packet(getDeviceInfo()+"\r\n","EE:FF:66:88:GH:JI:DJ"));
System.out.println(responsePackets);
break;

case READ_INPUT_REGISTERS:
Expand Down
Expand Up @@ -4,14 +4,18 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.UUID;

import de.tudarmstadt.informatik.hostage.Hostage;
import de.tudarmstadt.informatik.hostage.HostageApplication;
import de.tudarmstadt.informatik.hostage.commons.HelperUtils;
import de.tudarmstadt.informatik.hostage.commons.SubnetUtils;
import de.tudarmstadt.informatik.hostage.logging.AttackRecord;
import de.tudarmstadt.informatik.hostage.logging.DaoSession;
import de.tudarmstadt.informatik.hostage.logging.MessageRecord;
import de.tudarmstadt.informatik.hostage.logging.SyncDevice;
import de.tudarmstadt.informatik.hostage.persistence.DAO.DAOHelper;
import de.tudarmstadt.informatik.hostage.protocol.MQTT;
import de.tudarmstadt.informatik.hostage.protocol.Protocol;
import io.moquette.broker.ClientDescriptor;
Expand Down Expand Up @@ -65,7 +69,7 @@ public Class<?>[] getInterceptedMessageTypes() {

@Override
public void onConnect(InterceptConnectMessage interceptConnectMessage) {
//interceptConnectMessages.add(interceptConnectMessage);
interceptConnectMessages.add(interceptConnectMessage);
currentConnectedMessages.add(interceptConnectMessage);

}
Expand Down Expand Up @@ -154,9 +158,34 @@ public static ArrayList<InterceptSubscribeMessage> getCurrentSubscribeMessages()
*/

public static boolean isAnAttackOngoing(){
return isAnAttackerConnected(getCurrentConnectedMessages());
}

/**
* Checks if a topic is published from an Attacker and updates the record.
*/

public static void isTopicPublished(){
boolean isAnAttackerConnected = isAnAttackerConnected(currentPublishMessages);
if(isAnAttackerConnected){
if(!currentPublishMessages.isEmpty()) {
DaoSession dbSession = HostageApplication.getInstances().getDaoSession();

DAOHelper daoHelper = new DAOHelper(dbSession);
MessageRecord record = daoHelper.getMessageRecordDAO().getLastedInsertedRecord();
record.setPacket(getPublishedTopics());
daoHelper.getMessageRecordDAO().updateRecord(record);

currentPublishMessages.clear();
}
}

}

private static boolean isAnAttackerConnected(ArrayList<?> connectedMessages){
Collection<ClientDescriptor> clients = MQTT.listConnectedClients();

if(!clients.isEmpty() && !getCurrentConnectedMessages().isEmpty()) {
if(!clients.isEmpty() && !connectedMessages.isEmpty()) {
for (ClientDescriptor item : clients) {
if (item != null) {
if(item.getClientID().equals(SensorProfile.getClientID()) && clients.size()==1)
Expand All @@ -166,6 +195,8 @@ public static boolean isAnAttackOngoing(){
return true;
}
return false;


}

/**
Expand All @@ -189,21 +220,21 @@ private static String getIPCurrentClient() {
}

private static String getPublishedTopics(){
StringBuilder packet = new StringBuilder();
if(!publishMessages.isEmpty()){
for(InterceptPublishMessage message:publishMessages){
String packet = "";

if(!currentPublishMessages.isEmpty()){
InterceptPublishMessage message = currentPublishMessages.get(0);
if(message!=null) {
if (message.getClientID().equals(currentConnectedMessages.get(currentConnectedMessages.size() - 1).getClientID())) {
packet.append("TopicName: ").append(message.getTopicName())
.append(" ").append("Message: ").append(message.getPayload())
.append(" ").append(System.getProperty("line.separator"));
if (message.getClientID().equals(interceptConnectMessages.get(interceptConnectMessages.size()-1).getClientID())) {

packet+="TopicName: "+message.getTopicName()+" "+
"Message Clientid: "+message.getClientID()+
"/n";
}
}

}

}
return packet.toString();
return packet;
}

/**
Expand Down Expand Up @@ -254,7 +285,7 @@ public static AttackRecord createAttackRecord(Long attack_id, String externalIP,
record.setAttack_id(attack_id);
record.setSync_id(attack_id);
if(SyncDevice.currentDevice()!=null)
record.setDevice(SyncDevice.currentDevice().getDeviceID());
record.setDevice(Objects.requireNonNull(SyncDevice.currentDevice()).getDeviceID());
else
record.setDevice(UUID.randomUUID().toString());
record.setProtocol("MQTT");
Expand Down
39 changes: 17 additions & 22 deletions src/main/java/de/tudarmstadt/informatik/hostage/system/Device.java
Expand Up @@ -47,33 +47,28 @@ public static void checkCapabilities() {
porthack = false;
break;
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}

// TODO: test with various devices, cannot run programm su permission denied
if (Build.VERSION.SDK_INT >= 18) { // iptables isn't fully implemented on older versions
final String ipTablesList = "iptables -L -n -t nat"; // list all rules in NAT table
try {
Process p = new ProcessBuilder("su", "-c", ipTablesList).start();
switch (p.waitFor()) {
case 0: // everything is fine
iptables = true; // iptables available and working
break;

case 3: // no such table
case 127: // command not found
default: // unexpected return code
// while testing code 3 has been returned when table NAT is not available
iptables = false;
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
// iptables isn't fully implemented on older versions
final String ipTablesList = "iptables -L -n -t nat"; // list all rules in NAT table
try {
Process p = new ProcessBuilder("su", "-c", ipTablesList).start();
switch (p.waitFor()) {
case 0: // everything is fine
iptables = true; // iptables available and working
break;

case 3: // no such table
case 127: // command not found
default: // unexpected return code
// while testing code 3 has been returned when table NAT is not available
iptables = false;
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}

initialized = true;
Expand Down

0 comments on commit 8d73074

Please sign in to comment.