In [7]:
import os
import numpy as np
import random
import torch
import torch.nn as nn
from torch.autograd import Variable
import threading
class CharRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, model="gru", n_layers=1):
        super(CharRNN, self).__init__()
        self.model = model.lower()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.n_layers = n_layers
        self.encoder = nn.Embedding(input_size, hidden_size)
        if self.model == "gru":
            self.rnn = nn.GRU(hidden_size, hidden_size, n_layers)
        elif self.model == "lstm":
            self.rnn = nn.LSTM(hidden_size, hidden_size, n_layers)
        self.decoder = nn.Linear(hidden_size, output_size)

    def forward(self, input, hidden):
        batch_size = input.size(0)
        #print(input)
        encoded = self.encoder(input)
        output, hidden = self.rnn(encoded.view(1, batch_size, -1), hidden)
        output = self.decoder(output.view(batch_size, -1))
        return output, hidden

    def forward2(self, input, hidden):
        encoded = self.encoder(input.view(1, -1))
        output, hidden = self.rnn(encoded.view(1, 1, -1), hidden)
        output = self.decoder(output.view(1, -1))
        return output, hidden

    def init_hidden(self, batch_size):
        hidden = torch.zeros(self.n_layers, batch_size, self.hidden_size)
        if self.model == "lstm":
            cell = torch.zeros(self.n_layers, batch_size, self.hidden_size)
            if use_cuda:
                hidden, cell = hidden.cuda(), cell.cuda()
            return (Variable(hidden), Variable(cell))
        else:
            if use_cuda:
                hidden = hidden.cuda()
            return Variable(hidden)

In [8]:
n_epochs = 100000
hidden_size = 100
n_layers = 2
learning_rate = 0.01
chunk_len = 200
batch_size = 10
use_cuda = False
def read_and_preprocess(file_path):
    with open(file_path, 'rb') as f:
        bytez = bytearray(f.read())
    tensor = torch.tensor(bytez, dtype=torch.long)
    return tensor
def read_file(filename):
    with open (filename,"rb") as f:
        bytez = f.read()
    file = np.frombuffer(bytez,dtype=np.uint8)[np.newaxis,:][0]
    return file, len(file), bytez
def sys_sample(bytez):
    result = []
    for i in range(0, len(bytez), 10000):
        result.append(int(bytez[i]))
    return result
def check(input, model1):
    with torch.no_grad():
        input = torch.from_numpy(np.frombuffer(input,dtype=np.uint8)[np.newaxis,:] )
        a = model1(input)
        global outputs1
        outputs1 = F.softmax( a, dim=-1)
def random_training_set(chunk_len, batch_size):
    input = torch.LongTensor(batch_size, chunk_len)
    target = torch.LongTensor(batch_size, chunk_len)
    for bi in range(batch_size):
        start_index = random.randint(0, file_len - chunk_len)
        end_index = start_index + chunk_len + 1
        chunk = file[start_index:end_index]
        input[bi] = char_tensor(chunk[:-1])
        targetut[bi] = char_tensor(chunk[1:])
    input = Variable(inp)
    target = Variable(target)
    return input, target
target_list = os.listdir(malware_path)
def train(input, target):
    global cur_file
    global file2
    global file_len2
    global cur_num_remaining
    global bytez2
    global counter
    global pool_sample
    counter += 1
    hidden = decoder.init_hidden(batch_size)
    flag = 0
    decoder.zero_grad()
    loss = 0
    predicted, output_list = generate_gan(decoder, prime_str=pool_sample, predict_len=35)
    generated_bytes = bytez2+bytearray(predicted[0])
    fresult =  []
    thread = threading.Thread(target=check, args=(generated_bytes,malconv))
    thread.start()
    thread.join()
    fresult.append(outputs1.detach().numpy()[0,1])
    fresult.append(outputs2.detach().numpy()[0,1])
    if((fresult[0]<=0.5) and(fresult[1]<=0.35)):
        print("get one!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        with open(save_path+cur_file, 'wb') as f:
            f.write(bytearray(generated_bytes))
            print("saving: ",cur_file)
        try:
            cur_file = target_list.pop()
            file2, file_len2, bytez2 = read_file(malware_path+cur_file)
            pool_sample = sys_sample(file2)
        except:
            pass        
        cur_num_remaining -= 1
        counter = 0
        flag = 1
    if(counter>=50):
        try:
            cur_file = target_list.pop()
            file2, file_len2, bytez2 = read_file(malware_path+cur_file)
            pool_sample = sys_sample(file2)
        except:
            pass        
        cur_num_remaining -= 1 
        counter = 0
        flag = 2       
    print('>>>>>>>>>>>>>>>>>>>>>>>>>>> model score:',fresult)
    for c in range(args.chunk_len):
        output, hidden = decoder(inp[:,c], hidden)
        loss += criterion(output.view(batch_size, -1), target[:,c])
    loss.backward()
    decoder_optimizer.step()
    return loss.data / chunk_len, flag
decoder = CharRNN(
    256,
    100,
    256,
    model='lstm',
    n_layers=2,
)
decoder_optimizer = torch.optim.Adam(decoder.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
decoder

CharRNN(
  (encoder): Embedding(256, 100)
  (rnn): LSTM(100, 100, num_layers=2)
  (decoder): Linear(in_features=100, out_features=256, bias=True)
)

In [9]:
def generate_gan(model, prime_str, predict_len, temperature=0.8):
    # Initialize the hidden state correctly based on the model type
    input = torch.LongTensor(1, len(prime_str))
    hidden = model.init_hidden(1)
    prime_input = Variable(char_tensor(prime_str).unsqueeze(0))
    predicted = prime_str
    prime_input = prime_input.type(torch.LongTensor)
    # Prime the model if necessary
    for p in range(len(prime_str) - 1):
        _, hidden = model(prime_input[:, p], hidden)
    inp = prime_input[:, -1]
    output_list=[]
    # Generate new elements
    for p in range(predict_len):
        output, hidden = model(inp, hidden)
        output_list.append(output)
        output_dist = output.data.view(-1).div(temperature).exp()
        top_i = torch.multinomial(output_dist, 1)[0]
        predicted=np.append(predicted, top_i)
        inp = Variable(char_tensor(top_i)).unsqueeze(0)

        
    return [predicted.tolist()],output_list

In [10]:
def start_train():
    for epoch in range(1,n_epochs+1):
        input, target=random_training_set(chunk_len,batch_size)
        loss,_=train(input,target)
        print('Epoch: #d ',epoch)
        print('loss is %.4f', loss)

In [11]:
from MalConv import MalConv
malware_path = 'C:/Users/hayat/malware'
benign_path = 'C:/Users/hayat/benign'
bytez2= read_and_preprocess(malware_path)
pool_sample = sys_sample(file2)
bytez = read_and_preprocess(benign_path)
model = CharRNN(256, 100, 256, model="lstm", n_layers=2)  # Adjust model parameters as needed
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
malconv = MalConv(channels=256, window_size=512, embd_size=8).train()
start_train()

PermissionError: [Errno 13] Permission denied: 'C:/Users/hayat/malware'