In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("eliasdabbas/web-server-access-logs")

print("Path to dataset files:", path)

In [None]:
# Cell pertama - Install dependencies
!pip install pandas

In [None]:
#!/usr/bin/env python3
import os
import re
import csv
import pandas as pd
from collections import defaultdict
from google.colab import files
import zipfile
import time
from tqdm import tqdm
import kagglehub

class LogSecurityAnalyzer:
    def __init__(self):
        self.output_dir = "hasil_analisis"
        self.log_file = None
        self.results = defaultdict(list)
        self.total_lines = 0

        # Whitelist untuk bot legitimate
        self.legitimate_bots = [
            'googlebot', 'bingbot', 'slurp', 'duckduckbot', 'baiduspider',
            'yandexbot', 'facebookexternalhit', 'twitterbot', 'linkedinbot',
            'ahrefsbot', 'semrushbot', 'mj12bot', 'dotbot', 'applebot',
            'crawlbot', 'screaming frog', 'sitemap', 'spider'
        ]

        # Pattern untuk deteksi serangan (diperbaiki)
        self.patterns = {
            'sql_injection': [
                # Union-based SQL injection
                r"\bunion\s+(all\s+)?select\b",
                # Boolean-based blind SQL injection
                r"(\'\s*or\s*\'\s*=\s*\'|\'\s*or\s*1\s*=\s*1|\'\s*or\s*true\s*--)",
                r"(\bor\s+1\s*=\s*1|\band\s+1\s*=\s*1|\bor\s+true)",
                # Time-based blind SQL injection
                r"(sleep\s*\(|waitfor\s+delay|benchmark\s*\()",
                # Error-based SQL injection
                r"(extractvalue\s*\(|updatexml\s*\(|exp\s*\(|floor\s*\(rand)",
                # SQL commands
                r"(\bdrop\s+table\b|\bdelete\s+from\b|\binsert\s+into\b|\bupdate\s+set\b)",
                r"(\bcreate\s+table\b|\balter\s+table\b|\btruncate\s+table\b)",
                # SQL comments and terminators
                r"(--|\/\*.*?\*\/|\#.*?(\n|\r|$))",
                # SQL functions commonly used in attacks
                r"(\bhex\s*\(|\bchar\s*\(|\bascii\s*\(|\bsubstring\s*\(|\bconcat\s*\()",
                r"(\bload_file\s*\(|\binto\s+outfile\b|\binto\s+dumpfile\b)",
                # Classic SQL injection patterns
                r"(\'\s*;\s*drop|\'\s*;\s*delete|\'\s*;\s*insert)",
                # Database-specific injections
                r"(xp_cmdshell|sp_executesql|exec\s*\(|execute\s*\()"
            ],
            'directory_traversal': [
                # Path traversal patterns
                r"(\.\./|\.\.\%2[fF]|\.\.\%5[cC]|\.\.\%252[fF]){2,}",
                r"(\.\.\\\|\.\.\%5[cC]|\.\.\%255[cC]){2,}",
                # Unix system files
                r"(/etc/passwd|/etc/shadow|/etc/hosts|/proc/self/environ|/proc/version)",
                r"(/etc/group|/etc/issue|/etc/motd|/proc/self/cmdline)",
                # Windows system files
                r"(\\windows\\|\\winnt\\|c:\\windows|c:\\winnt)",
                r"(\\system32\\|\\syswow64\\|boot\.ini|win\.ini)",
                # Double encoding
                r"(\%2e\%2e\%2f|\%2e\%2e\%5c|\%252e\%252e\%252f)",
                # Null byte injection
                r"(\%00|\\0|\\x00)",
                # PHP wrappers
                r"(php://filter|php://input|data://|expect://|zip://)"
            ],
            'xss': [
                # Script tags
                r"<script[^>]*>.*?</script>",
                r"\%3[cC]script[^>]*\%3[eE].*?\%3[cC]/script\%3[eE]",
                # JavaScript protocols
                r"(javascript\s*:|data\s*:\s*text/html|vbscript\s*:)",
                # Event handlers
                r"(onload\s*=|onerror\s*=|onmouseover\s*=|onclick\s*=|onfocus\s*=)",
                r"(onkeydown\s*=|onkeyup\s*=|onsubmit\s*=|onchange\s*=)",
                # JavaScript functions
                r"(alert\s*\(|confirm\s*\(|prompt\s*\(|eval\s*\()",
                # DOM manipulation
                r"(document\.cookie|document\.write|window\.location|innerHTML)",
                # HTML tags with event handlers
                r"(\%3[cC]img[^>]*onerror|\%3[cC]svg[^>]*onload)",
                r"(<img[^>]*onerror|<svg[^>]*onload|<iframe[^>]*onload)",
                # CSS expressions
                r"(expression\s*\(|javascript\s*:|@import)",
                # Base64 encoded scripts
                r"(data:text/html;base64,)",
                # XSS with HTML entities
                r"(&lt;script|&gt;alert|&#x3c;script)"
            ],
            'akses_sensitif': [
                # Admin panels
                r"/(admin|administrator|wp-admin|wp-login\.php|login\.php|admin\.php)",
                r"/(panel|controlpanel|cpanel|webadmin|sysadmin|root)",
                # Configuration files
                r"/(config|configuration|settings|\.env|\.config)",
                r"/(database|db|backup|bak|\.sql|\.dump)",
                # Hidden files and directories
                r"/\.(git|svn|htaccess|htpasswd|ssh|aws|docker)",
                # Sensitive directories
                r"/(phpinfo|info\.php|test\.php|debug|logs|log)",
                # API endpoints
                r"/(api/v[0-9]+/admin|api/admin|admin/api)",
                # Common CMS admin paths
                r"/(drupal/admin|joomla/administrator|magento/admin)"
            ]
        }

    def is_legitimate_bot(self, user_agent):
        """Cek apakah user agent adalah bot legitimate"""
        if not user_agent:
            return False

        user_agent_lower = user_agent.lower()
        return any(bot in user_agent_lower for bot in self.legitimate_bots)

    def extract_user_agent(self, log_line):
        """Extract user agent dari log line"""
        # Pattern untuk Apache access log format
        ua_pattern = r'"([^"]*)"[^"]*"([^"]*)"$'
        match = re.search(ua_pattern, log_line)
        if match:
            return match.group(1)  # User agent biasanya di grup pertama setelah request

        # Fallback pattern jika format berbeda
        ua_pattern2 = r'"[^"]*"\s+"([^"]*)"'
        match2 = re.search(ua_pattern2, log_line)
        if match2:
            return match2.group(1)

        return ""

    def is_likely_false_positive(self, log_line, attack_type):
        """Cek apakah kemungkinan false positive"""

        # Extract user agent
        user_agent = self.extract_user_agent(log_line)

        # Skip jika dari bot legitimate
        if self.is_legitimate_bot(user_agent):
            return True

        # Additional checks untuk SQL injection
        if attack_type == 'sql_injection':
            # Skip jika hanya URL encoding normal tanpa SQL syntax
            if re.search(r'%[0-9A-F]{2}', log_line) and not re.search(r'(union|select|or\s*=|drop|delete)', log_line, re.IGNORECASE):
                return True

            # Skip jika hanya pipe character dalam filter/search normal
            if '/filter/' in log_line and '|' in log_line and not re.search(r'(union|select|or\s*1\s*=)', log_line, re.IGNORECASE):
                return True

        # Additional checks untuk XSS
        if attack_type == 'xss':
            # Skip jika response code menunjukkan error (kemungkinan blocked)
            if re.search(r'" (40[0-9]|50[0-9]) ', log_line):
                return True

        # Additional checks untuk akses sensitif
        if attack_type == 'akses_sensitif':
            # Skip jika berhasil diakses (200) - mungkin legitimate access
            if ' 200 ' in log_line:
                return True

        return False

    def download_kaggle_dataset(self):
        """Download dataset dari Kaggle"""
        print("📥 Downloading dataset dari Kaggle...")
        try:
            path = kagglehub.dataset_download("eliasdabbas/web-server-access-logs")
            print(f"✅ Dataset berhasil didownload ke: {path}")

            # Cari file log dalam direktori
            log_files = []
            for root, dirs, file_list in os.walk(path):
                for file in file_list:
                    if file.endswith(('.log', '.txt')) or 'access' in file.lower():
                        log_files.append(os.path.join(root, file))

            if log_files:
                self.log_file = log_files[0]  # Ambil file pertama
                print(f"📁 File log ditemukan: {self.log_file}")

                # Hitung jumlah baris
                print("📊 Menghitung jumlah baris...")
                with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
                    self.total_lines = sum(1 for _ in f)
                print(f"📈 Total baris: {self.total_lines:,}")
                return True
            else:
                print("❌ File log tidak ditemukan dalam dataset!")
                return False

        except Exception as e:
            print(f"❌ Error downloading dataset: {e}")
            return False

    def upload_log_file(self):
        """Upload file log dari komputer lokal"""
        print("📁 Silakan upload file log access Anda...")
        uploaded = files.upload()

        if uploaded:
            filename = list(uploaded.keys())[0]
            self.log_file = filename

            # Hitung jumlah baris
            with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
                self.total_lines = sum(1 for _ in f)

            print(f"✅ File '{filename}' berhasil diupload!")
            print(f"📈 Total baris: {self.total_lines:,}")
            return True
        else:
            print("❌ Tidak ada file yang diupload!")
            return False

    def create_sample_log(self):
        """Membuat sample log untuk testing dengan serangan nyata dan false positive"""
        sample_log = """127.0.0.1 - - [10/Oct/2023:13:55:36 +0700] "GET /index.php?id=1' OR '1'='1 HTTP/1.1" 200 2326 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
192.168.1.100 - - [10/Oct/2023:13:56:15 +0700] "GET /admin/login.php HTTP/1.1" 403 1234 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
10.0.0.5 - - [10/Oct/2023:14:02:22 +0700] "GET /test.php?file=../../../etc/passwd HTTP/1.1" 404 162 "-" "curl/7.68.0"
172.16.0.1 - - [10/Oct/2023:14:15:33 +0700] "GET /search.php?q=<script>alert('XSS')</script> HTTP/1.1" 200 5432 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
203.0.113.1 - - [10/Oct/2023:14:20:45 +0700] "POST /wp-login.php HTTP/1.1" 401 891 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
127.0.0.1 - - [10/Oct/2023:14:25:12 +0700] "GET /index.php?union=select * from users HTTP/1.1" 200 3421 "-" "sqlmap/1.0"
192.168.1.50 - - [10/Oct/2023:14:30:00 +0700] "GET /page.php?img=..%2f..%2fconfig.php HTTP/1.1" 500 245 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
210.10.1.5 - - [10/Oct/2023:14:35:30 +0700] "GET /admin/dashboard.php HTTP/1.1" 404 156 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
88.99.77.66 - - [10/Oct/2023:14:40:15 +0700] "GET /backup/database.sql HTTP/1.1" 403 234 "-" "wget/1.20.3"
54.36.149.41 - - [22/Jan/2019:03:56:14 +0330] "GET /filter/27|13%20%D9%85%DA%AF%D8%A7%D9%BE%DB%8C%DA%A9%D8%B3%D9%84 HTTP/1.1" 200 30577 "-" "Mozilla/5.0 (compatible; AhrefsBot/6.1; +http://ahrefs.com/robot/)"
66.249.66.194 - - [22/Jan/2019:03:56:18 +0330] "GET /filter/b41,b665,c150%7C%D8%A8%D8%AE%D8%A7%D8%B1%D9%BE%D8%B2,p56 HTTP/1.1" 200 34277 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
91.99.72.15 - - [10/Oct/2023:14:50:17 +0700] "GET /product.php?id=1' UNION SELECT username,password FROM admin-- HTTP/1.1" 200 5432 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
40.77.167.129 - - [22/Jan/2019:03:56:17 +0330] "GET /image/14925/productModel/100x100 HTTP/1.1" 200 1696 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)"
192.168.1.200 - - [10/Oct/2023:15:00:00 +0700] "GET /login.php?user=admin'-- HTTP/1.1" 403 256 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
10.0.0.10 - - [10/Oct/2023:15:05:30 +0700] "GET /page.php?file=javascript:alert(document.cookie) HTTP/1.1" 200 1234 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36\""""

        with open('sample_access.log', 'w') as f:
            f.write(sample_log)

        self.log_file = 'sample_access.log'
        self.total_lines = len(sample_log.split('\n'))
        print("📝 Sample log file telah dibuat: 'sample_access.log'")
        print(f"📈 Total baris: {self.total_lines}")
        print("📋 Sample ini berisi serangan nyata dan legitimate bot traffic")
        return True

    def analyze_log_batch(self, batch_size=50000):
        """Analisis file log dalam batch untuk menangani file besar"""
        if not self.log_file or not os.path.exists(self.log_file):
            print("❌ File log tidak ditemukan!")
            return False

        os.makedirs(self.output_dir, exist_ok=True)

        print(f"🔍 Menganalisis file: {self.log_file}")
        print(f"📊 Processing {self.total_lines:,} baris dalam batch {batch_size:,}")

        start_time = time.time()

        # Progress bar
        with tqdm(total=self.total_lines, desc="Analyzing logs") as pbar:
            with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
                batch_lines = []

                for line_num, line in enumerate(f, 1):
                    batch_lines.append((line_num, line.strip()))

                    if len(batch_lines) >= batch_size:
                        self._process_batch(batch_lines)
                        pbar.update(len(batch_lines))
                        batch_lines = []

                # Process remaining lines
                if batch_lines:
                    self._process_batch(batch_lines)
                    pbar.update(len(batch_lines))

        elapsed_time = time.time() - start_time
        print(f"⏱️ Analisis selesai dalam {elapsed_time:.2f} detik")

        return True

    def _process_batch(self, batch_lines):
        """Process satu batch dari log lines dengan filter false positive"""
        for line_num, line in batch_lines:
            for attack_type, patterns in self.patterns.items():
                for pattern in patterns:
                    if re.search(pattern, line, re.IGNORECASE):
                        # Cek false positive sebelum menambah ke hasil
                        if not self.is_likely_false_positive(line, attack_type):
                            self.results[attack_type].append({
                                'line': line_num,
                                'log': line,
                                'attack_type': attack_type
                            })
                        break

    def create_csv_files(self):
        """Membuat file CSV dari hasil analisis"""
        print("[+] Mengubah hasil ke format CSV...")

        all_results = []

        for attack_type, detections in self.results.items():
            if not detections:
                # Buat file CSV kosong jika tidak ada hasil
                csv_file = f"{self.output_dir}/{attack_type}.csv"
                with open(csv_file, 'w', newline='', encoding='utf-8') as f:
                    writer = csv.writer(f)
                    writer.writerow(['Line', 'Log', 'Serangan'])
                continue

            # Buat CSV individual
            csv_file = f"{self.output_dir}/{attack_type}.csv"
            with open(csv_file, 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                writer.writerow(['Line', 'Log', 'Serangan'])

                for detection in detections:
                    writer.writerow([
                        detection['line'],
                        detection['log'][:1000],  # Truncate long logs
                        attack_type
                    ])
                    all_results.append(detection)

        # Gabungkan semua hasil
        final_csv = f"{self.output_dir}/hasil_akhir.csv"
        with open(final_csv, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(['Line', 'Log', 'Serangan'])

            if all_results:
                # Sort by line number
                all_results.sort(key=lambda x: x['line'])

                for result in all_results:
                    writer.writerow([
                        result['line'],
                        result['log'][:1000],  # Truncate long logs
                        result['attack_type']
                    ])

        print(f"✅ File CSV tersimpan di: {final_csv}")
        return final_csv

    def show_summary(self):
        """Tampilkan ringkasan hasil analisis"""
        print("\n" + "="*60)
        print("📈 RINGKASAN HASIL ANALISIS KEAMANAN")
        print("="*60)

        total_attacks = 0
        attack_names = {
            'sql_injection': 'SQL Injection',
            'directory_traversal': 'Directory Traversal',
            'xss': 'Cross-Site Scripting (XSS)',
            'akses_sensitif': 'Akses Halaman Sensitif'
        }

        for attack_type in ['sql_injection', 'directory_traversal', 'xss', 'akses_sensitif']:
            detections = self.results.get(attack_type, [])
            count = len(detections)
            total_attacks += count
            attack_name = attack_names[attack_type]
            print(f"{attack_name:30}: {count:6,} deteksi")

        print("-" * 60)
        print(f"{'Total Serangan':30}: {total_attacks:6,} deteksi")
        if self.total_lines > 0:
            print(f"{'Persentase Serangan':30}: {(total_attacks/self.total_lines*100):6.2f}%")
        print("="*60)

        return total_attacks

    def show_detailed_results(self, limit=5):
        """Tampilkan hasil detail per jenis serangan"""
        attack_names = {
            'sql_injection': 'SQL INJECTION',
            'directory_traversal': 'DIRECTORY TRAVERSAL',
            'xss': 'CROSS-SITE SCRIPTING (XSS)',
            'akses_sensitif': 'AKSES HALAMAN SENSITIF'
        }

        for attack_type in ['sql_injection', 'directory_traversal', 'xss', 'akses_sensitif']:
            detections = self.results.get(attack_type, [])
            if detections:
                attack_name = attack_names[attack_type]
                print(f"\n🚨 {attack_name} TERDETEKSI:")
                print("-" * 80)

                # Show top attacks
                for i, detection in enumerate(detections[:limit], 1):
                    log_preview = detection['log'][:100] + "..." if len(detection['log']) > 100 else detection['log']
                    print(f"{i}. Baris {detection['line']:,}: {log_preview}")

                if len(detections) > limit:
                    print(f"   ... dan {len(detections) - limit:,} deteksi lainnya")

    def create_security_report(self):
        """Buat laporan keamanan dalam format teks"""
        report_file = f"{self.output_dir}/security_report.txt"

        with open(report_file, 'w', encoding='utf-8') as f:
            f.write("="*80 + "\n")
            f.write("LAPORAN ANALISIS KEAMANAN WEB SERVER LOG\n")
            f.write("="*80 + "\n\n")

            f.write(f"File Log: {self.log_file}\n")
            f.write(f"Total Baris: {self.total_lines:,}\n")
            f.write(f"Tanggal Analisis: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")

            total_attacks = sum(len(detections) for detections in self.results.values())
            f.write(f"RINGKASAN:\n")
            f.write(f"- Total Serangan Terdeteksi: {total_attacks:,}\n")
            if self.total_lines > 0:
                f.write(f"- Persentase Serangan: {(total_attacks/self.total_lines*100):.2f}%\n\n")

            f.write("PERBAIKAN YANG DILAKUKAN:\n")
            f.write("- Pattern SQL injection diperbaiki untuk mengurangi false positive\n")
            f.write("- Whitelist bot legitimate ditambahkan\n")
            f.write("- Filter false positive berdasarkan user agent dan context\n\n")

            attack_names = {
                'sql_injection': 'SQL INJECTION',
                'directory_traversal': 'DIRECTORY TRAVERSAL',
                'xss': 'CROSS-SITE SCRIPTING (XSS)',
                'akses_sensitif': 'AKSES HALAMAN SENSITIF'
            }

            f.write("DETAIL PER JENIS SERANGAN:\n")
            f.write("-" * 50 + "\n")

            for attack_type in ['sql_injection', 'directory_traversal', 'xss', 'akses_sensitif']:
                detections = self.results.get(attack_type, [])
                if detections:
                    attack_name = attack_names[attack_type]
                    f.write(f"\n{attack_name}:\n")
                    f.write(f"Total: {len(detections):,} deteksi\n")

                    # Top 5 IP addresses for this attack
                    ips = defaultdict(int)
                    for detection in detections:
                        ip_match = re.match(r'^(\d+\.\d+\.\d+\.\d+)', detection['log'])
                        if ip_match:
                            ips[ip_match.group(1)] += 1

                    if ips:
                        f.write("Top IP Addresses:\n")
                        for ip, count in sorted(ips.items(), key=lambda x: x[1], reverse=True)[:5]:
                            f.write(f"  {ip}: {count} kali\n")

        print(f"📄 Laporan keamanan tersimpan di: {report_file}")

    def download_results(self):
        """Download hasil analisis"""
        print("\n📥 Mengunduh hasil analisis...")

        zip_filename = "hasil_analisis_keamanan.zip"

        with zipfile.ZipFile(zip_filename, 'w') as zipf:
            for root, dirs, file_list in os.walk(self.output_dir):
                for file in file_list:
                    zipf.write(os.path.join(root, file), file)

        files.download(zip_filename)
        print(f"✅ File {zip_filename} siap diunduh!")

    def run_analysis(self, data_source="kaggle"):
        """Jalankan analisis lengkap"""
        print("🔐 LOG SECURITY ANALYZER - IMPROVED VERSION")
        print("=" * 50)
        print("✨ Perbaikan:")
        print("- Pattern SQL injection diperbaiki")
        print("- Whitelist bot legitimate ditambahkan")
        print("- Filter false positive ditingkatkan")
        print("=" * 50)

        # Load data berdasarkan sumber
        if data_source == "kaggle":
            if not self.download_kaggle_dataset():
                return
        elif data_source == "upload":
            if not self.upload_log_file():
                return
        elif data_source == "sample":
            self.create_sample_log()

        # Jalankan analisis
        if self.analyze_log_batch():
            self.create_csv_files()
            self.create_security_report()
            total = self.show_summary()

            if total > 0:
                self.show_detailed_results()

                # Preview hasil dalam DataFrame
                df = pd.read_csv(f"{self.output_dir}/hasil_akhir.csv")
                if not df.empty:
                    print(f"\n📊 PREVIEW HASIL (Top 10):")
                    display(df.head(10))

                    print(f"\n📊 STATISTIK SERANGAN PER JENIS:")
                    attack_stats = df['Serangan'].value_counts()
                    display(attack_stats)
            else:
                print("✅ Tidak ada serangan terdeteksi dalam log file!")

            # Download option
            print(f"\n📁 File hasil tersedia di folder: {self.output_dir}")
            download = input("Ingin download hasil analisis? (y/n): ")
            if download.lower() == 'y':
                self.download_results()

def main():
    analyzer = LogSecurityAnalyzer()

    print("Pilih sumber data:")
    print("1. Dataset Kaggle (10M+ baris)")
    print("2. Upload file log sendiri")
    print("3. Gunakan sample log untuk testing (termasuk false positive)")

    choice = input("Masukkan pilihan (1/2/3): ")

    if choice == "1":
        analyzer.run_analysis(data_source="kaggle")
    elif choice == "2":
        analyzer.run_analysis(data_source="upload")
    elif choice == "3":
        analyzer.run_analysis(data_source="sample")
    else:
        print("Pilihan tidak valid!")

# Jalankan program
if __name__ == "__main__":
    main()