Skip to content

Antivirus #5011

@kerlytatianaalava-netizen

Description

Prerequisites

  • Write a descriptive title.

Description of the new feature/enhancement

#!/usr/bin/env python3
"""
antivirus_corporativo.py
Sistema antivirus corporativo con detección avanzada, múltiples motores, API REST y dashboard web.
Para empresas grandes e intermedias.
"""

from logging import config
import os
import sys
import json
import hashlib
import shutil
import argparse
import logging
import configparser
import sqlite3
import re
import math
import threading
import time
import zipfile
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import smtplib
from email.mime.nonmultipart import MIMENonMultipart
from email.mime.multipart import MIMEMultipart
from http.server import HTTPServer, BaseHTTPREquestHandler
import yara # type: ignore # pip install yara-python

Configuración corporativa

CONFIG_FILE = "/etc/antivirus_corporativo/antivirus.conf"
SIGNATURES_DIR = "/etc/antivirus_corporativo/signatures/"
QUARANTINE_DIR = "/var/lib/antivirus_corporativo/quarantine/"
DATABASE_FILE = "/var/lib/antivirus_corporativo/antivirus.db"
LOG_FILE = "/var/log/antivirus_corporativo.log"
MAX_READ_BYTES = 500 * 1024 * 1024 # 500 MB

Inicializar logging

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - [%(threadName)s] - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(name)

class CorporateConfig:
"""Gestor de configuración corporativa"""

def __init__(self):
    self.config = configparser.ConfigParser()
    self.load_config()

def load_config(self):
    """Carga la configuración corporativa"""
    if not os.path.exists(CONFIG_FILE):
        self.create_default_config()
    
    self.config.read(CONFIG_FILE)
    
    self.settings = {
        # Escaneo
        "scan_directories": self.config.get("scan", "directories", fallback="/home,/var/www,/opt,/usr/local").split(","),
        "exclude_directories": self.config.get("scan", "exclude", fallback="/proc,/sys,/dev,/tmp,/var/tmp").split(","),
        "max_file_size": self.config.getint("scan", "max_file_size", fallback=500),
        "file_extensions": self.config.get("scan", "extensions", 
            fallback="exe,bat,cmd,ps1,py,js,vbs,doc,docx,xls,xlsx,pdf,zip,rar,7z,msi,dll").split(","),
        "max_threads": self.config.getint("scan", "max_threads", fallback=10),
        
        # Cuarentena
        "quarantine_retention": self.config.getint("quarantine", "retention_days", fallback=90),
        "auto_cleanup": self.config.getboolean("quarantine", "auto_cleanup", fallback=True),
        
        # Detección
        "entropy_threshold": self.config.getfloat("detection", "entropy_threshold", fallback=7.2),
        "threat_score_threshold": self.config.getint("detection", "threat_score_threshold", fallback=35),
        "enable_yara": self.config.getboolean("detection", "enable_yara", fallback=True),
        "enable_virustotal": self.config.getboolean("detection", "enable_virustotal", fallback=False),
        
        # Alertas
        "alert_emails": self.config.get("alerts", "recipients", 
            fallback="security@empresa.com,soc@empresa.com,admin@empresa.com").split(","),
        "smtp_server": self.config.get("alerts", "smtp_server", fallback="smtp.empresa.com"),
        "smtp_port": self.config.getint("alerts", "smtp_port", fallback=587),
        "smtp_user": self.config.get("alerts", "smtp_user", fallback=""),
        "smtp_password": self.config.get("alerts", "smtp_password", fallback=""),
        
        # API
        "api_host": self.config.get("api", "host", fallback="0.0.0.0"),
        "api_port": self.config.getint("api", "port", fallback=8080),
        "api_secret": self.config.get("api", "secret", fallback="corporate_antivirus_secret_2024"),
        
        # VirusTotal
        "virustotal_api_key": self.config.get("virustotal", "api_key", fallback=""),
        "virustotal_enabled": self.config.getboolean("virustotal", "enabled", fallback=False)
    }

def create_default_config(self):
    """Crea configuración por defecto para entornos corporativos"""
    os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True)
    
    self.config['scan'] = {
        'directories': '/home,/var/www,/opt,/usr/local',
        'exclude': '/proc,/sys,/dev,/tmp,/var/tmp',
        'max_file_size': '500',
        'extensions': 'exe,bat,cmd,ps1,py,js,vbs,doc,docx,xls,xlsx,pdf,zip,rar,7z,msi,dll',
        'max_threads': '10'
    }
    self.config['schedule'] = {
        'daily_scan': '02:00',
        'weekly_full_scan': 'sunday 04:00',
        'real_time_monitoring': 'true'
    }
    self.config['quarantine'] = {
        'retention_days': '90',
        'auto_cleanup': 'true',
        'max_quarantine_size_gb': '50'
    }
    self.config['detection'] = {
        'entropy_threshold': '7.2',
        'threat_score_threshold': '35',
        'enable_yara': 'true',
        'enable_virustotal': 'false'
    }
    self.config['alerts'] = {
        'enabled': 'true',
        'recipients': 'security@empresa.com,soc@empresa.com,admin@empresa.com',
        'smtp_server': 'smtp.empresa.com',
        'smtp_port': '587',
        'smtp_user': 'antivirus@empresa.com',
        'smtp_password': ''
    }
    self.config['api'] = {
        'enabled': 'true',
        'host': '0.0.0.0',
        'port': '8080',
        'secret': 'corporate_antivirus_secret_2024'
    }
    self.config['virustotal'] = {
        'enabled': 'false',
        'api_key': '',
        'max_requests_per_minute': '4'
    }
    
    with open(CONFIG_FILE, 'w') as f:
        self.config.write(f)
    
    logger.info(f"Configuración corporativa creada en: {CONFIG_FILE}")

class SignatureManager:
"""Gestor avanzado de firmas con múltiples fuentes"""

def __init__(self, signatures_dir=SIGNATURES_DIR):
    self.signatures_dir = signatures_dir
    self.yara_rules = None
    self.load_all_signatures()

def load_all_signatures(self):
    """Carga todas las firmas disponibles"""
    os.makedirs(self.signatures_dir, exist_ok=True)
    
    self.signatures = {
        "hashes": set(),
        "strings": set(),
        "yara_rules": None
    }
    
    # Cargar firmas JSON
    json_files = [f for f in os.listdir(self.signatures_dir) if f.endswith('.json')]
    for json_file in json_files:
        self.load_json_signatures(os.path.join(self.signatures_dir, json_file))
    
    # Cargar reglas YARA
    if config.settings["enable_yara"]:
        self.load_yara_rules()
    
    logger.info(f"Firmas cargadas: {len(self.signatures['hashes'])} hashes, "
               f"{len(self.signatures['strings'])} strings, YARA: {self.signatures['yara_rules'] is not None}")

def load_json_signatures(self, file_path):
    """Carga firmas desde archivo JSON"""
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)
            self.signatures["hashes"].update(data.get("hashes", []))
            self.signatures["strings"].update(data.get("strings", []))
    except Exception as e:
        logger.error(f"Error cargando firmas JSON {file_path}: {e}")

def load_yara_rules(self):
    """Compila y carga reglas YARA"""
    try:
        yara_files = [f for f in os.listdir(self.signatures_dir) if f.endswith('.yar')]
        if yara_files:
            rules_content = ""
            for yara_file in yara_files:
                with open(os.path.join(self.signatures_dir, yara_file), 'r') as f:
                    rules_content += f.read() + "\n"
            
            self.signatures["yara_rules"] = yara.compile(source=rules_content)
            logger.info(f"Reglas YARA cargadas: {len(yara_files)} archivos")
    except Exception as e:
        logger.error(f"Error cargando reglas YARA: {e}")

def update_signatures(self):
    """Actualiza firmas desde repositorios corporativos"""
    # Aquí se integraría con repositorios internos de firmas
    logger.info("Actualizando firmas desde repositorios corporativos...")
    self.load_all_signatures()

class AdvancedDetectionEngine:
"""Motor de detección avanzado con múltiples técnicas"""

def __init__(self, signature_manager, config):
    self.sig_manager = signature_manager
    self.config = config
    self.virustotal_cache = {}

def calculate_entropy(self, data):
    """Calcula entropía para detectar ofuscación"""
    if len(data) == 0:
        return 0
    counter = Counter(data)
    entropy = 0
    for count in counter.values():
        p_x = count / len(data)
        entropy += -p_x * math.log2(p_x)
    return entropy

def analyze_file_headers(self, file_path, data):
    """Análisis avanzado de encabezados"""
    file_extension = Path(file_path).suffix.lower()
    
    magic_numbers = {
        b'MZ': 'exe', b'\x7fELF': 'elf', b'PK\x03\x04': 'zip',
        b'%PDF': 'pdf', b'\x89PNG': 'png', b'GIF8': 'gif',
        b'\xff\xd8\xff': 'jpg', b'BM': 'bmp', b'RIFF': 'avi',
        b'\x1f\x8b\x08': 'gz', b'Rar!\x1a\x07': 'rar'
    }
    
    detected_type = None
    for magic, ext in magic_numbers.items():
        if data.startswith(magic):
            detected_type = ext
            break
    
    anomalies = []
    if detected_type and detected_type != file_extension[1:]:
        anomalies.append(f"filetype_mismatch:{file_extension}_should_be:{detected_type}")
    
    return anomalies

def heuristic_analysis(self, file_path, content, binary_data):
    """Análisis heurístico corporativo"""
    flags = []
    filename = Path(file_path).name.lower()
    
    # 1. Análisis de nombre de archivo
    suspicious_names = ['invoice', 'payment', 'urgent', 'document', 'scan', 
                       'factura', 'pago', 'transferencia', 'fedex', 'dhl']
    if any(name in filename for name in suspicious_names):
        flags.append("suspicious_filename")
    
    # 2. Doble extensión
    if re.search(r'\.(exe|bat|cmd|ps1|vbs|js|scr|pif)\.(txt|jpg|pdf|doc|png|zip)$', filename):
        flags.append("double_extension")
    
    # 3. Patrones de contenido sospechoso
    suspicious_patterns = [
        (r'powershell.*-encodedcommand', "powershell_encoded"),
        (r'certutil.*-decode', "certutil_decode"),
        (r'wscript\.shell', "wscript_shell"),
        (r'activexobject', "activex_object"),
        (r'eval\(.*\)', "eval_function"),
        (r'fromCharCode\(.*\)', "fromcharcode"),
        (r'base64_decode', "base64_decode"),
        (r'cmd\.exe.*/c', "cmd_execute"),
        (r'microsoft\.wscript', "microsoft_wscript"),
        (r'regsvr32.*/s', "regsvr32_silent")
    ]
    
    for pattern, description in suspicious_patterns:
        if re.search(pattern, content, re.IGNORECASE):
            flags.append(f"suspicious_pattern:{description}")
    
    # 4. Análisis de estructura PE (para Windows)
    if binary_data.startswith(b'MZ'):
        flags.extend(self.analyze_pe_structure(binary_data))
    
    return flags

def analyze_pe_structure(self, data):
    """Análisis básico de estructura PE"""
    flags = []
    try:
        # Verificar características sospechosas en PE
        if b'.aspack' in data or b'.upx' in data:
            flags.append("pe_packed")
        
        # Buscar secciones sospechosas
        suspicious_sections = [b'.pack', b'.upx', b'.themida', b'.vmp']
        if any(section in data for section in suspicious_sections):
            flags.append("suspicious_sections")
            
    except Exception:
        pass
    
    return flags

def yara_scan(self, file_path, data):
    """Escaneo con reglas YARA"""
    matches = []
    if self.sig_manager.signatures["yara_rules"]:
        try:
            yara_matches = self.sig_manager.signatures["yara_rules"].match(data=data)
            for match in yara_matches:
                matches.append(f"yara:{match.rule}")
        except Exception as e:
            logger.warning(f"Error en escaneo YARA {file_path}: {e}")
    
    return matches

def virustotal_check(self, file_hash):
    """Consulta VirusTotal (si está configurado)"""
    if not self.config.settings["virustotal_enabled"]:
        return None
    
    # Lógica de VirusTotal aquí (implementación básica)
    # En producción, usaría la API real con rate limiting
    return None

def comprehensive_scan(self, file_path):
    """Escaneo comprehensivo corporativo"""
    result = {
        "path": file_path,
        "infected": False,
        "threat_score": 0,
        "reasons": [],
        "entropy": 0,
        "file_size_mb": 0,
        "scan_time": 0,
        "heuristic_flags": [],
        "yara_matches": [],
        "timestamp": datetime.now().isoformat()
    }
    
    start_time = time.time()
    
    try:
        # Verificar tamaño
        file_size = os.path.getsize(file_path) / (1024 * 1024)
        result["file_size_mb"] = round(file_size, 2)
        
        if file_size > self.config.settings["max_file_size"]:
            result["reasons"].append(f"file_too_large:{file_size:.1f}MB")
            return result
        
        # Calcular hash
        file_hash = self.sha256_of_file(file_path)
        if not file_hash:
            result["reasons"].append("cannot_read_file")
            return result
        
        # 1. Detección por hash
        if file_hash in self.sig_manager.signatures["hashes"]:
            result["infected"] = True
            result["threat_score"] += 100
            result["reasons"].append(f"hash_match:{file_hash}")
        
        # Leer datos para análisis
        with open(file_path, "rb") as f:
            data = f.read(min(MAX_READ_BYTES, self.config.settings["max_file_size"] * 1024 * 1024))
            
            # 2. Análisis de texto
            try:
                text = data.decode("utf-8", errors="ignore").lower()
            except:
                text = ""
            
            # Búsqueda de cadenas
            for s in self.sig_manager.signatures["strings"]:
                if s.lower() in text:
                    result["threat_score"] += 10
                    result["reasons"].append(f"string_match:{s}")
            
            # 3. Análisis de entropía
            entropy = self.calculate_entropy(data[:8192])
            result["entropy"] = round(entropy, 2)
            if entropy > self.config.settings["entropy_threshold"]:
                result["threat_score"] += 30
                result["reasons"].append(f"high_entropy:{entropy:.2f}")
            
            # 4. Análisis de encabezados
            header_issues = self.analyze_file_headers(file_path, data[:100])
            for issue in header_issues:
                result["threat_score"] += 20
                result["reasons"].append(issue)
            
            # 5. Análisis heurístico
            heuristic_flags = self.heuristic_analysis(file_path, text, data)
            result["heuristic_flags"] = heuristic_flags
            for flag in heuristic_flags:
                result["threat_score"] += 15
                result["reasons"].append(f"heuristic:{flag}")
            
            # 6. Escaneo YARA
            if self.config.settings["enable_yara"]:
                yara_matches = self.yara_scan(file_path, data)
                result["yara_matches"] = yara_matches
                for match in yara_matches:
                    result["threat_score"] += 25
                    result["reasons"].append(match)
        
        # 7. Consulta VirusTotal
        vt_result = self.virustotal_check(file_hash)
        if vt_result:
            result["threat_score"] += vt_result.get("positives", 0) * 2
            result["reasons"].append(f"virustotal:{vt_result.get('positives', 0)}/{vt_result.get('total', 0)}")
        
        # Determinar infección
        if not result["infected"] and result["threat_score"] >= self.config.settings["threat_score_threshold"]:
            result["infected"] = True
            result["reasons"].append(f"threshold_exceeded:{result['threat_score']}")
            
    except Exception as e:
        result["reasons"].append(f"scan_error:{str(e)}")
        logger.error(f"Error escaneando {file_path}: {e}")
    
    result["scan_time"] = round(time.time() - start_time, 3)
    return result

def sha256_of_file(self, path, max_bytes=MAX_READ_BYTES):
    """Calcula SHA256 optimizado para corporativo"""
    h = hashlib.sha256()
    try:
        with open(path, "rb") as f:
            for chunk in iter(lambda: f.read(65536), b""):
                h.update(chunk)
        return h.hexdigest()
    except Exception as e:
        logger.warning(f"No se pudo calcular hash de {path}: {e}")
        return None

Continuará en la siguiente parte debido a límite de caracteres...

Proposed technical implementation details (optional)

me sale un error en el codigo y no se que esta mal y quiero q veas el error y cual seria la solucion

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions