In [2]:
import PyPDF2
import re
import os
from collections import defaultdict
from typing import Dict, Union
from pathlib import Path

In [3]:
try:
    from Crypto.Cipher import AES
except ImportError:
    print("Note: PyCryptodome not installed. Some encrypted PDFs may not be processed.")

class PDFFeatureExtractor:
    """A class to extract various features from PDF files for analysis"""
    
    def __init__(self, pdf_path):
        self.pdf_path = pdf_path
        self.raw_content = ""
        self.features = {}
        
    def get_object_content(self, pdf_reader):
        """Extract raw content from PDF objects"""
        content = ""
        try:
            # Get trailer dictionary
            if hasattr(pdf_reader, 'trailer'):
                content += str(pdf_reader.trailer)
            
            # Get resolved objects
            for i in range(len(pdf_reader.pages)):
                page = pdf_reader.pages[i]
                content += str(page.get_object())
                
                # Extract content from page's /Contents
                if '/Contents' in page:
                    contents = page['/Contents']
                    if isinstance(contents, list):
                        for obj in contents:
                            content += str(obj)
                    else:
                        content += str(contents)
            
            return content
            
        except Exception as e:
            print(f"Warning: Error extracting object content: {str(e)}")
            return content
    
    def extract_all_features(self):
        """Extract all available features from the PDF"""
        try:
            with open(self.pdf_path, 'rb') as file:
                # Read the entire file content
                file_content = file.read()
                self.raw_content = file_content.decode('latin-1')  # Use latin-1 to handle binary data
                
                # Create PDF reader object
                pdf_reader = PyPDF2.PdfReader(file)
                print(pdf_reader)

                # Check if PDF is encrypted
                self.features['is_encrypted'] = pdf_reader.is_encrypted
                
                # Handle encrypted PDFs
                if pdf_reader.is_encrypted:
                    try:
                        pdf_reader.decrypt('')
                        self.features['encryption_status'] = 'Successfully decrypted'
                    except:
                        print("Warning: PDF is encrypted and couldn't be automatically decrypted")
                        self.features['encryption_status'] = 'Failed to decrypt'
                        return self.extract_basic_features(pdf_reader)
                
                # Add object content to raw_content
                self.raw_content += self.get_object_content(pdf_reader)
                
                # Extract features
                self.extract_basic_features(pdf_reader)
                self.count_keywords()
                self.extract_structural_features()
                self.extract_javascript_features()
                self.extract_encoding_features()
                
                return self.features
                
        except Exception as e:
            print(f"Error processing PDF: {str(e)}")
            return None
    
    def extract_basic_features(self, pdf_reader):
        """Extract basic features that don't require decryption"""
        try:
            self.features['file_size'] = os.path.getsize(self.pdf_path)
            self.features['num_pages'] = len(pdf_reader.pages)
            self.features['version'] = pdf_reader.pdf_header
            
            # Try to extract metadata if available
            try:
                if pdf_reader.metadata:
                    self.features['metadata'] = dict(pdf_reader.metadata)
            except:
                self.features['metadata'] = 'Unable to extract metadata'
            
            return self.features
            
        except Exception as e:
            print(f"Error extracting basic features: {str(e)}")
            return None
    
    def count_keywords(self):
        """Count various important PDF keywords"""
        keywords = {
            'procset': r'/ProcSet\b',
            'js': r'/JS\b',
            'javascript': r'/JavaScript\b',
            'action': r'/Action\b',
            'acroform': r'/AcroForm\b',
            'jbig2decode': r'/JBIG2Decode\b',
            'launch': r'/Launch\b',
            'embedfile': r'/EmbeddFile\b',
            'xfa': r'/XFA\b',
            'objstm': r'/ObjStm\b',
            'richmedia': r'/RichMedia\b',
            'openaction': r'/OpenAction\b',
            'submitform': r'/SubmitForm\b',
            'uri': r'/URI\b'
        }
        
        for key, pattern in keywords.items():
            self.features[f'{key}_count'] = len(re.findall(pattern, self.raw_content))
    
    def extract_structural_features(self):
        """Extract structural features from the PDF"""
        # Count various structural elements
        structural_patterns = {
            'stream_count': r'stream\b',
            'indirect_obj_count': r'\d+\s+\d+\s+obj\b',
            'xref_count': r'xref\b',
            'trailer_count': r'trailer\b',
        }
        
        for key, pattern in structural_patterns.items():
            self.features[key] = len(re.findall(pattern, self.raw_content))
        
        # Check for incremental updates
        self.features['updates'] = len(re.findall(r'%%EOF', self.raw_content)) - 1
        
        # Count objects with endobj
        self.features['num_objects'] = len(re.findall(r'endobj', self.raw_content))
    
    def extract_javascript_features(self):
        """Extract JavaScript-related features"""
        js_patterns = {
            'eval': r'eval\s*\(',
            'unescape': r'unescape\s*\(',
            'function': r'function\s*\w+\s*\(',
            'document_write': r'document\.write\s*\(',
            'window_location': r'window\.location',
            'hidden_iframe': r'iframe.*?style.*?hidden',
            'shell_code': r'shellcode|shell_code|shellcode_|sc_',
            'escape': r'escape\s*\(',
            'fromcharcode': r'fromcharcode|String\.fromCharCode'
        }
        
        for key, pattern in js_patterns.items():
            self.features[f'js_{key}_count'] = len(re.findall(pattern, self.raw_content, re.IGNORECASE))
    
    def extract_encoding_features(self):
        """Extract features related to encoding"""
        encodings = {
            'ascii_hex': r'/ASCIIHexDecode\b',
            'ascii_85': r'/ASCII85Decode\b',
            'lzw': r'/LZWDecode\b',
            'flate': r'/FlateDecode\b',
            'run_length': r'/RunLengthDecode\b',
            'ccitt_fax': r'/CCITTFaxDecode\b',
            'dct': r'/DCTDecode\b'
        }
        
        for key, pattern in encodings.items():
            self.features[f'{key}_count'] = len(re.findall(pattern, self.raw_content))

def analyze_pdf(pdf_path):
    """
    Analyze a PDF file and print its features
    
    Args:
        pdf_path (str): Path to the PDF file
    """
    extractor = PDFFeatureExtractor(pdf_path)
    features = extractor.extract_all_features()
    
    if features:
        print("\nPDF Analysis Results:")
        print("-" * 50)
        
        # Print encryption status
        print("\nEncryption Status:")
        print(f"Is Encrypted: {features.get('is_encrypted', 'Unknown')}")
        if 'encryption_status' in features:
            print(f"Encryption Status: {features['encryption_status']}")
        
        # Print basic information
        print("\nBasic Information:")
        print(f"File Size: {features.get('file_size', 0)/1024:.2f} KB")
        print(f"Number of Pages: {features.get('num_pages', 'Unknown')}")
        print(f"PDF Version: {features.get('version', 'Unknown')}")
        
        # Print structural information
        print("\nStructural Features:")
        print(f"Number of Objects: {features.get('num_objects', 'Unknown')}")
        print(f"Number of Streams: {features.get('stream_count', 0)}")
        print(f"Number of Indirect Objects: {features.get('indirect_obj_count', 0)}")
        print(f"Number of Updates: {features.get('updates', 0)}")
        
        # Print keyword counts
        print("\nKeyword Counts:")
        for key, value in features.items():
            if key.endswith('_count') and not key.startswith('js_'):
                print(f"{key.replace('_count', '').title()}: {value}")
        
        # Print JavaScript features
        print("\nJavaScript Indicators:")
        for key, value in features.items():
            if key.startswith('js_'):
                print(f"{key.replace('js_', '').replace('_count', '').title()}: {value}")
        
        # Print metadata if available
        if 'metadata' in features and features['metadata'] != 'Unable to extract metadata':
            print("\nMetadata:")
            for key, value in features['metadata'].items():
                print(f"{key}: {value}")
    
    return features

# Example usage
if __name__ == "__main__":
    pdf_path = "./TestFolder/Git.pdf"
    analyze_pdf(pdf_path)

Note: PyCryptodome not installed. Some encrypted PDFs may not be processed.
<PyPDF2._reader.PdfReader object at 0x000001BA81C022A0>

PDF Analysis Results:
--------------------------------------------------

Encryption Status:
Is Encrypted: False

Basic Information:
File Size: 368.91 KB
Number of Pages: 2
PDF Version: %PDF-1.3

Structural Features:
Number of Objects: 405
Number of Streams: 256
Number of Indirect Objects: 405
Number of Updates: 0

Keyword Counts:
Procset: 114
Javascript: 0
Action: 0
Acroform: 0
Jbig2Decode: 0
Launch: 0
Embedfile: 0
Xfa: 0
Objstm: 0
Richmedia: 0
Openaction: 0
Submitform: 0
Uri: 0
Stream: 256
Indirect_Obj: 405
Xref: 2
Trailer: 1
Ascii_Hex: 0
Ascii_85: 0
Lzw: 0
Flate: 130
Run_Length: 0
Ccitt_Fax: 0
Dct: 0

JavaScript Indicators:
Count: 0
Eval: 0
Unescape: 0
Function: 0
Document_Write: 0
Window_Location: 0
Hidden_Iframe: 0
Shell_Code: 0
Escape: 0
Fromcharcode: 0

Metadata:
/Title: IndirectObject(404, 0, 1900552397472)
/Producer: IndirectObject(405, 0, 190055

In [4]:
import os
import re
from typing import Dict, Any
import PyPDF2
from collections import Counter

class PDFFeatureExtractor:
    def __init__(self, pdf_path: str):
        """Initialize the feature extractor with path to PDF file."""
        self.pdf_path = pdf_path
        self.features = {}
        
    def extract_all_features(self) -> Dict[str, Any]:
        """Extract all features from the PDF file."""
        try:
            with open(self.pdf_path, 'rb') as file:
                pdf_content = file.read()
                pdf_reader = PyPDF2.PdfReader(file)
                
                self.features = {
                    'headerlength': self._get_header_length(),
                    'headercorrupt': self._check_header_corrupt(pdf_content),
                    'small_content': self._check_small_content(pdf_reader),
                    'content_corrupt': self._check_content_corrupt(pdf_content),
                    'stream_corrupt': self._check_stream_corrupt(pdf_content),
                    'malicecontent': self._check_malice_content(pdf_content),
                    'hidden_file': self._check_hidden_file(pdf_reader)
                }
                
                return self.features
        except Exception as e:
            raise Exception(f"Error processing PDF: {str(e)}")
    
    def _get_header_length(self) -> int:
        """Get the length of the PDF filename."""
        return len(os.path.basename(self.pdf_path))
    
    def _check_header_corrupt(self, content: bytes) -> int:
        """Check if PDF header version is valid."""
        header_match = re.search(b'%PDF-1.[0-7]', content[:10])
        return 0 if header_match else 1
    
    def _check_small_content(self, pdf_reader: PyPDF2.PdfReader) -> int:
        """Check if PDF has 14 or fewer objects."""
        # Count number of objects
        num_objects = len(pdf_reader.pages)  # This is a simplification
        return 1 if num_objects <= 14 else 0
    
    def _check_content_corrupt(self, content: bytes) -> int:
        """Check if number of objects matches number of endobjects."""
        objects = len(re.findall(b'obj', content))
        endobjects = len(re.findall(b'endobj', content))
        return 1 if objects != endobjects else 0
    
    def _check_stream_corrupt(self, content: bytes) -> int:
        """Check if number of streams matches number of endstreams."""
        streams = len(re.findall(b'stream', content))
        endstreams = len(re.findall(b'endstream', content))
        return 1 if streams != endstreams else 0
    
    def _check_malice_content(self, content: bytes) -> int:
        """Check for presence of potentially malicious elements."""
        suspicious_elements = [
            b'/JS',
            b'/JavaScript',
            b'/AA',
            b'/Launch',
            b'/OpenAction'
        ]
        
        found_elements = sum(1 for element in suspicious_elements 
                           if element in content)
        return 1 if found_elements >= 2 else 0
    
    def _check_hidden_file(self, pdf_reader: PyPDF2.PdfReader) -> int:
        """Check for hidden files in PDF."""
        try:
            # Simple check for embedded files
            for page in pdf_reader.pages:
                if '/EmbeddedFiles' in page:
                    return 1
            return 0
        except:
            return 0

def batch_process_pdfs(pdf_directory: str) -> Dict[str, Dict[str, Any]]:
    """Process multiple PDFs in a directory and return their features."""
    results = {}
    for filename in os.listdir(pdf_directory):
        if filename.lower().endswith('.pdf'):
            pdf_path = os.path.join(pdf_directory, filename)
            try:
                extractor = PDFFeatureExtractor(pdf_path)
                results[filename] = extractor.extract_all_features()
            except Exception as e:
                print(f"Error processing {filename}: {str(e)}")
                results[filename] = None
    return results

# Example usage
if __name__ == "__main__":
    # Single PDF analysis
    pdf_path = "./TestFolder/Git.pdf"
    extractor = PDFFeatureExtractor(pdf_path)
    features = extractor.extract_all_features()
    # print(features)
    print(f"Features for {pdf_path}:")
    for feature, value in features.items():
        print(f"{feature}: {value}")
    
    # Batch processing
    # pdf_directory = "pdfs_folder"
    # results = batch_process_pdfs(pdf_directory)
    # print("\nBatch processing results:")
    # for pdf_name, features in results.items():
    #     if features:
    #         print(f"\n{pdf_name}:")
    #         for feature, value in features.items():
    #             print(f"{feature}: {value}")

Features for ./TestFolder/Git.pdf:
headerlength: 7
headercorrupt: 0
small_content: 1
content_corrupt: 1
stream_corrupt: 1
malicecontent: 0
hidden_file: 0


In [5]:
class PDFFeatureExtractor:
    """
    A class to extract features from PDF files for malware detection.
    """
    
    def __init__(self, pdf_path: str):
        """Initialize with path to PDF file."""
        self.pdf_path = pdf_path
        self.raw_content = None
        self.pdf_size_kb = None
        self._load_pdf()

    def _load_pdf(self) -> None:
        """Load the PDF file and store its raw content."""
        try:
            # Get file size in KB
            self.pdf_size_kb = os.path.getsize(self.pdf_path) / 1024

            # Read raw content
            with open(self.pdf_path, 'rb') as file:
                self.raw_content = file.read()
                
        except Exception as e:
            raise Exception(f"Error loading PDF: {str(e)}")

    def _count_keyword(self, keyword: str) -> int:
        """Count occurrences of a keyword in the PDF content."""
        return len(re.findall(bytes(keyword, 'utf-8'), self.raw_content))

    def _get_pdf_version(self) -> str:
        """Extract PDF version from header."""
        header = self.raw_content[:8].decode('utf-8', errors='ignore')
        version_match = re.search(r'%PDF-(\d+\.\d+)', header)
        return version_match.group(1) if version_match else "Unknown"

    def extract_features(self) -> Dict[str, Union[int, str, bool]]:
        """Extract all relevant features from the PDF."""
        features = {}
        
        # Basic structure keywords
        structure_keywords = [
            'obj', 'endobj', 'stream', 'endstream', 'xref', 'trailer',
            'startxref', '/Page', '/Encrypt', '/Size', '%EOF', '/Producer',
            '/ProcSet', '/ID', '/S', '/CreationDate'
        ]
        
        for keyword in structure_keywords:
            features[keyword] = self._count_keyword(keyword)

        # JavaScript and action-related features
        js_keywords = [
            '/JS', '/JavaScript', '/AA', '/OpenAction', '/AcroForm',
            '/Launch', '/Action'
        ]
        
        for keyword in js_keywords:
            features[keyword] = self._count_keyword(keyword)

        # Media and embedded content features
        media_keywords = [
            '/JBIG2Decode', '/RichMedia', '/EmbeddedFIle', '/XFA',
            '/Font', '/XObject', '/Image'
        ]
        
        for keyword in media_keywords:
            features[keyword] = self._count_keyword(keyword)

        # Additional structural elements
        additional_keywords = [
            '/Widget', '/FontDescriptor', '/Rect', '/Length',
            '/ModDate', '/Info', '/XML'
        ]
        
        for keyword in additional_keywords:
            features[keyword] = self._count_keyword(keyword)

        # Special characters
        features['dict_start'] = self._count_keyword('<<')
        features['dict_end'] = self._count_keyword('>>')
        features['comments'] = len(re.findall(b'%[^\n]*\n', self.raw_content))

        # File metadata
        try:
            with open(self.pdf_path, 'rb') as file:
                pdf_reader = PyPDF2.PdfReader(file)
                
                features['page_count'] = len(pdf_reader.pages)
                features['has_metadata'] = pdf_reader.metadata is not None
                features['file_size_kb'] = self.pdf_size_kb
                features['pdf_version'] = self._get_pdf_version()
                
                # Check for forms
                features['has_acroform'] = hasattr(pdf_reader, 'acroform')
                
                # Check for XFA forms
                if features['has_acroform'] and pdf_reader.acroform:
                    features['form_type'] = 'XFA' if '/XFA' in str(pdf_reader.acroform) else 'AcroForm'
                else:
                    features['form_type'] = 'none'
                
                # Get first page size
                if len(pdf_reader.pages) > 0:
                    page = pdf_reader.pages[0]
                    if '/MediaBox' in page:
                        mediabox = page['/MediaBox']
                        width, height = float(mediabox[2]), float(mediabox[3])
                        features['page_size'] = f"{width}x{height}"
                    else:
                        features['page_size'] = "unknown"
                    
                    # Page rotation
                    features['page_rotation'] = page.get('/Rotate', 0)
                
        except Exception as e:
            print(f"Warning: Error extracting PyPDF2 features: {str(e)}")
            features.update({
                'page_count': -1,
                'has_metadata': False,
                'form_type': 'unknown',
                'page_size': 'unknown',
                'page_rotation': 0
            })

        return features

def analyze_pdf(pdf_path: str) -> Dict[str, Union[int, str, bool]]:
    """
    Analyze a PDF file and extract all relevant features.
    
    Args:
        pdf_path: Path to the PDF file
        
    Returns:
        Dictionary containing all extracted features
    """
    extractor = PDFFeatureExtractor(pdf_path)
    return extractor.extract_features()

# Example usage
if __name__ == "__main__":
    import json
    
    def analyze_pdf_file(filepath: str) -> None:
        try:
            features = analyze_pdf(filepath)
            print(json.dumps(features, indent=2))
        except Exception as e:
            print(f"Error analyzing PDF {filepath}: {str(e)}")
    
    # Example usage with a PDF file
    pdf_path = "./TestFolder/Git.pdf"  # Replace with actual PDF path
    analyze_pdf_file(pdf_path)

{
  "obj": 810,
  "endobj": 405,
  "stream": 256,
  "endstream": 128,
  "xref": 2,
  "trailer": 1,
  "startxref": 1,
  "/Page": 4,
  "/Encrypt": 0,
  "/Size": 2,
  "%EOF": 1,
  "/Producer": 1,
  "/ProcSet": 114,
  "/ID": 1,
  "/S": 249,
  "/CreationDate": 1,
  "/JS": 0,
  "/JavaScript": 0,
  "/AA": 0,
  "/OpenAction": 0,
  "/AcroForm": 0,
  "/Launch": 0,
  "/Action": 0,
  "/JBIG2Decode": 0,
  "/RichMedia": 0,
  "/EmbeddedFIle": 0,
  "/XFA": 0,
  "/Font": 35,
  "/XObject": 114,
  "/Image": 0,
  "/Widget": 0,
  "/FontDescriptor": 10,
  "/Rect": 0,
  "/Length": 130,
  "/ModDate": 1,
  "/Info": 1,
  "/XML": 1,
  "dict_start": 624,
  "dict_end": 625,
  "comments": 621,
  "page_count": 2,
  "has_metadata": true,
  "file_size_kb": 368.9130859375,
  "pdf_version": "1.3",
  "has_acroform": false,
  "form_type": "none",
  "page_size": "612.0x792.0",
  "page_rotation": 0
}
