In [1]:
from transformers import pipeline
import spacy
from spacy import displacy
from spacy.tokens import Doc
from spacy.language import Language
import re
from typing import List, Dict, Tuple
from collections import defaultdict

In [2]:
with open('pii.log', 'r') as file:
    log_list = file.readlines()

## NER Model with Displacy without Pattern Matching

In [None]:

class HFNERComponent:
    def __init__(self, nlp: Language, model_name: str = "dslim/bert-base-NER"):
        self.ner = pipeline("ner", model=model_name, aggregation_strategy="none")
        
    def __call__(self, doc: Doc) -> Doc:
        text = doc.text
        try:
            ner_results = self.ner(text)
            
            # Convert HuggingFace BIO NER results to spaCy spans
            current_ent = {"start": None, "end": None, "label": None}
            spans = []
            
            for token in ner_results:
                tag = token["entity"]
                start = token["start"]
                end = token["end"]
                
                if tag == "O":
                    if current_ent["start"] is not None:
                        span = doc.char_span(
                            current_ent["start"],
                            current_ent["end"],
                            label=current_ent["label"],
                            alignment_mode="contract"
                        )
                        if span is not None:
                            spans.append(span)
                        current_ent = {"start": None, "end": None, "label": None}
                else:
                    bio_tag, entity_type = tag.split("-")
                    
                    if bio_tag == "B":
                        if current_ent["start"] is not None:
                            span = doc.char_span(
                                current_ent["start"],
                                current_ent["end"],
                                label=current_ent["label"],
                                alignment_mode="contract"
                            )
                            if span is not None:
                                spans.append(span)
                        current_ent = {"start": start, "end": end, "label": entity_type}
                    
                    elif bio_tag == "I":
                        if current_ent["start"] is not None and current_ent["label"] == entity_type:
                            current_ent["end"] = end
                        else:
                            current_ent = {"start": start, "end": end, "label": entity_type}
            
            if current_ent["start"] is not None:
                span = doc.char_span(
                    current_ent["start"],
                    current_ent["end"],
                    label=current_ent["label"],
                    alignment_mode="contract"
                )
                if span is not None:
                    spans.append(span)
            
            doc.ents = spans
            return doc
            
        except Exception as e:
            print(f"Error in NER processing: {str(e)}")
            return doc

@Language.factory("pii_ner")
def create_hf_ner_component(nlp: Language, name: str):
    return HFNERComponent(nlp)

class LogNERVisualizer:
    def __init__(self):
        self.nlp = spacy.blank("en")
        self.nlp.add_pipe("pii_ner", last=True)
        
        self.options = {
            "ents": ["PER", "ORG", "LOC", "MISC"],
            "colors": {
                "PER": "#fca3a3",   # Light red for persons
                "ORG": "#7aecec",   # Light blue for organizations
                "LOC": "#95ef95",   # Light green for locations
                "MISC": "#f0d0ff"   # Light purple for miscellaneous
            }
        }
    
    def process_logs(self, logs: List[str], output_path: str = "ner_visualization.html") -> Dict:
        """
        Process a list of log texts and create visualizations
        
        Args:
            logs: List of log texts to process
            output_path: Path to save the HTML visualization
            
        Returns:
            Dictionary with statistics about processed entities
        """
        stats = {
            "total_logs": len(logs),
            "total_entities": 0,
            "entity_types": {}
        }
        
        # Process each log and collect visualizations
        visualizations = []
        for i, log in enumerate(logs, 1):
            try:
                # Process the text
                doc = self.nlp(log)
                
                # Update statistics
                stats["total_entities"] += len(doc.ents)
                for ent in doc.ents:
                    stats["entity_types"][ent.label_] = stats["entity_types"].get(ent.label_, 0) + 1
                
                # Generate visualization
                if doc.ents:
                    html_content = displacy.render(doc, style="ent", options=self.options)
                    visualizations.append(f"<div class='log-entry'><h3>Log #{i}</h3>{html_content}</div>")
            
            except Exception as e:
                print(f"Error processing log #{i}: {str(e)}")
                continue
        
        # Create complete HTML document
        if visualizations:
            complete_html = f"""
            <!DOCTYPE html>
            <html>
            <head>
                <title>NER Visualization for Logs</title>
                <style>
                    body {{ font-family: Arial, sans-serif; margin: 20px; }}
                    .log-entry {{ 
                        margin-bottom: 30px;
                        padding: 15px;
                        border: 1px solid #ddd;
                        border-radius: 5px;
                    }}
                    .log-entry h3 {{ 
                        margin-top: 0;
                        color: #333;
                    }}
                    .statistics {{
                        background: #f5f5f5;
                        padding: 15px;
                        margin-bottom: 20px;
                        border-radius: 5px;
                    }}
                </style>
            </head>
            <body>
                <div class="statistics">
                    <h2>Processing Statistics</h2>
                    <p>Total Logs Processed: {stats['total_logs']}</p>
                    <p>Total Entities Found: {stats['total_entities']}</p>
                    <h3>Entity Types Distribution:</h3>
                    <ul>
                        {' '.join(f"<li>{k}: {v}</li>" for k, v in stats['entity_types'].items())}
                    </ul>
                </div>
                {' '.join(visualizations)}
            </body>
            </html>
            """
            
            with open(output_path, "w", encoding="utf-8") as f:
                f.write(complete_html)
            
            print(f"\nVisualization saved to {output_path}")
        
        return stats

# Example usage
if __name__ == "__main__":
    # Create visualizer and process logs
    visualizer = LogNERVisualizer()
    stats = visualizer.process_logs(log_list, "log_ner_visualization.html")
    
    # Print statistics
    print("\nProcessing Statistics:")
    print(f"Total Logs Processed: {stats['total_logs']}")
    print(f"Total Entities Found: {stats['total_entities']}")
    print("\nEntity Types Distribution:")
    for entity_type, count in stats['entity_types'].items():
        print(f"{entity_type}: {count}")

## NER Model with Displacy with Pattern Matching

In [5]:
class PatternMatcher:
    def __init__(self):
        self.patterns = {
            'EMAIL': r'(?:[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,63})',
            # Updated credit card pattern
            'CREDIT_CARD': r'''(?x)
                (?:
                    (?:4[0-9]{3}[-\s]?[0-9]{4}[-\s]?[0-9]{4}[-\s]?[0-9]{4})    # Visa
                    |
                    (?:5[1-5][0-9]{2}[-\s]?[0-9]{4}[-\s]?[0-9]{4}[-\s]?[0-9]{4}) # Mastercard
                    |
                    (?:3[47][0-9]{2}[-\s]?[0-9]{6}[-\s]?[0-9]{5})              # American Express
                    |
                    (?:6(?:011|5[0-9]{2})[-\s]?[0-9]{4}[-\s]?[0-9]{4}[-\s]?[0-9]{4}) # Discover
                    |
                    (?:[0-9]{4}[-\s]?[0-9]{4}[-\s]?[0-9]{4}[-\s]?[0-9]{4})    # Random
                )
                (?:\s*\(?.*?\)?)?  # Optional text in parentheses
            ''',
            'IPV4': r'(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})\.){3}(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})',
            'SSN': r'(?!000|666|9\d{2})\d{3}-(?!00)\d{2}-(?!0000)\d{4}',
            'PHONE': r'(?:\+?1[-.]?)?\s*(?:\([0-9]{3}\)[-.]?|[0-9]{3}[-.])[0-9]{3}[-. ][0-9]{4}'
        }
        self.compiled_patterns = {
            label: re.compile(pattern) for label, pattern in self.patterns.items()
        }

    def find_matches(self, text: str) -> List[Tuple[str, int, int, str]]:
        matches = []
        for label, pattern in self.compiled_patterns.items():
            for match in pattern.finditer(text):
                matches.append((match.group(), match.start(), match.end(), label))
        return matches

class HFNERComponent:
    def __init__(self, nlp: Language, model_name: str = "dslim/bert-base-NER"):
        self.ner = pipeline("ner", model=model_name, aggregation_strategy="none")
        self.pattern_matcher = PatternMatcher()
        
    def __call__(self, doc: Doc) -> Doc:
        text = doc.text
        spans = []
        
        try:
            # Get NER results
            ner_results = self.ner(text)
            current_ent = {"start": None, "end": None, "label": None}
            
            # Process NER results
            for token in ner_results:
                tag = token["entity"]
                start = token["start"]
                end = token["end"]
                
                if tag == "O":
                    if current_ent["start"] is not None:
                        span = doc.char_span(
                            current_ent["start"],
                            current_ent["end"],
                            label=current_ent["label"],
                            alignment_mode="contract"
                        )
                        if span is not None:
                            spans.append(span)
                        current_ent = {"start": None, "end": None, "label": None}
                else:
                    bio_tag, entity_type = tag.split("-")
                    
                    if bio_tag == "B":
                        if current_ent["start"] is not None:
                            span = doc.char_span(
                                current_ent["start"],
                                current_ent["end"],
                                label=current_ent["label"],
                                alignment_mode="contract"
                            )
                            if span is not None:
                                spans.append(span)
                        current_ent = {"start": start, "end": end, "label": entity_type}
                    
                    elif bio_tag == "I":
                        if current_ent["start"] is not None and current_ent["label"] == entity_type:
                            current_ent["end"] = end
                        else:
                            current_ent = {"start": start, "end": end, "label": entity_type}
            
            if current_ent["start"] is not None:
                span = doc.char_span(
                    current_ent["start"],
                    current_ent["end"],
                    label=current_ent["label"],
                    alignment_mode="contract"
                )
                if span is not None:
                    spans.append(span)
            
            # Get regex pattern matches
            pattern_matches = self.pattern_matcher.find_matches(text)
            for match_text, start, end, label in pattern_matches:
                span = doc.char_span(start, end, label=label, alignment_mode="contract")
                if span is not None:
                    spans.append(span)
            
            # Sort spans by start position
            spans = sorted(spans, key=lambda x: x.start)
            
            # Remove overlapping spans (prioritize NER over regex matches)
            filtered_spans = []
            last_end = -1
            for span in spans:
                if span.start >= last_end:
                    filtered_spans.append(span)
                    last_end = span.end
            
            doc.ents = filtered_spans
            return doc
            
        except Exception as e:
            print(f"Error in processing: {str(e)}")
            return doc

@Language.factory("combined_ner")
def create_combined_ner(nlp: Language, name: str):
    return HFNERComponent(nlp)

class LogAnalyzer:
    def __init__(self):
        self.nlp = spacy.blank("en")
        self.nlp.add_pipe("combined_ner", last=True)
        
        self.options = {
            "ents": [
                # NER entities
                "PER", "ORG", "LOC", "MISC",
                # Regex patterns
                "EMAIL", "CREDIT_CARD", "IPV4", "SSN", "PHONE"
            ],
            "colors": {
                # NER entities
                "PER": "#fca3a3",    # Light red
                "ORG": "#7aecec",    # Light blue
                "LOC": "#95ef95",    # Light green
                "MISC": "#f0d0ff",   # Light purple
                # Regex patterns
                "EMAIL": "#ffeb99",       # Light yellow
                "CREDIT_CARD": "#FFB6C1", # Light pink
                "IPV4": "#FFA07A",        # Light salmon
                "SSN": "#FFA500",         # Light orange
                "PHONE": "#40E0D0"        # Turquiouse
            }
        }
    
    def process_logs(self, logs: List[str], output_path: str = "log_analysis.html") -> Dict:
        """Process a list of logs and create a combined visualization."""
        stats = {
            "total_logs": len(logs),
            "total_entities": 0,
            "entity_types": defaultdict(int),
            "pattern_types": defaultdict(int)
        }
        
        # Process each log and collect visualizations
        visualizations = []
        for i, log in enumerate(logs, 1):
            try:
                # Process the text
                doc = self.nlp(log)
                
                # Update statistics
                stats["total_entities"] += len(doc.ents)
                for ent in doc.ents:
                    if ent.label_ in self.options["colors"]:
                        if ent.label_ in ["EMAIL", "CREDIT_CARD", "IPV4", "SSN", "PHONE"]:
                            stats["pattern_types"][ent.label_] += 1
                        else:
                            stats["entity_types"][ent.label_] += 1
                
                # Generate visualization
                if doc.ents:
                    html_content = displacy.render(doc, style="ent", options=self.options)
                    visualizations.append(f"""
                        <div class='log-entry'>
                            <h3>Log #{i}</h3>
                            <div class='log-text'>{html_content}</div>
                        </div>
                    """)
            
            except Exception as e:
                print(f"Error processing log #{i}: {str(e)}")
                continue
        
        # Create complete HTML document
        if visualizations:
            complete_html = f"""
            <!DOCTYPE html>
            <html>
            <head>
                <title>Log Analysis Visualization</title>
                <style>
                    body {{ font-family: Arial, sans-serif; margin: 20px; line-height: 1.6; }}
                    .log-entry {{ 
                        margin-bottom: 30px;
                        padding: 15px;
                        border: 1px solid #ddd;
                        border-radius: 5px;
                    }}
                    .log-entry h3 {{ 
                        margin-top: 0;
                        color: #333;
                    }}
                    .statistics {{
                        background: #f5f5f5;
                        padding: 15px;
                        margin-bottom: 20px;
                        border-radius: 5px;
                    }}
                    .legend {{
                        display: grid;
                        grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
                        gap: 10px;
                        margin-top: 10px;
                    }}
                    .legend-item {{
                        display: flex;
                        align-items: center;
                        padding: 5px;
                    }}
                    .color-box {{
                        width: 20px;
                        height: 20px;
                        margin-right: 8px;
                        border-radius: 3px;
                    }}
                </style>
            </head>
            <body>
                <div class="statistics">
                    <h2>Analysis Summary</h2>
                    <p>Total Logs Processed: {stats['total_logs']}</p>
                    <p>Total Entities Found: {stats['total_entities']}</p>
                    
                    <h3>Named Entities Found:</h3>
                    <div class="legend">
                        {self._generate_entity_stats(stats['entity_types'])}
                    </div>
                    
                    <h3>Sensitive Data Patterns Found:</h3>
                    <div class="legend">
                        {self._generate_entity_stats(stats['pattern_types'])}
                    </div>
                </div>
                
                <h2>Processed Logs</h2>
                {' '.join(visualizations)}
            </body>
            </html>
            """
            
            with open(output_path, "w", encoding="utf-8") as f:
                f.write(complete_html)
        
        return stats
    
    def _generate_entity_stats(self, stats: Dict[str, int]) -> str:
        """Generate HTML for entity statistics."""
        html_parts = []
        for entity_type, count in stats.items():
            color = self.options["colors"].get(entity_type, "#ddd")
            html_parts.append(f"""
                <div class="legend-item">
                    <div class="color-box" style="background-color: {color}"></div>
                    <span>{entity_type}: {count}</span>
                </div>
            """)
        return "".join(html_parts)

# Example usage
if __name__ == "__main__":
    
    # Create analyzer and process logs
    analyzer = LogAnalyzer()
    stats = analyzer.process_logs(log_list, "log_analysis.html")
    
    # Print statistics
    print("\nProcessing Statistics:")
    print(f"Total Logs Processed: {stats['total_logs']}")
    print(f"Total Entities Found: {stats['total_entities']}")
    print("\nNamed Entities Found:")
    for entity_type, count in stats['entity_types'].items():
        print(f"{entity_type}: {count}")
    print("\nSensitive Data Patterns Found:")
    for pattern_type, count in stats['pattern_types'].items():
        print(f"{pattern_type}: {count}")

Some weights of the model checkpoint at dslim/bert-base-NER were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Hardware accelerator e.g. GPU is available in the environment, but no `device` argument is passed to the `Pipeline` object. Model will be on CPU.



Processing Statistics:
Total Logs Processed: 10000
Total Entities Found: 15347

Named Entities Found:
PER: 3354
LOC: 2422
ORG: 1794
MISC: 16

Sensitive Data Patterns Found:
EMAIL: 3187
PHONE: 895
IPV4: 2161
CREDIT_CARD: 1518
