Terraform AST Analysis for Mutation Testing
This notebook demonstrates advanced semantic analysis of Terraform configurations using the TerraformAstAnalyzer class. We'll explore how to extract meaningful insights from Terraform code that can help design effective mutation operators for testing.
1. Introduction and Setup
The TerraformAstAnalyzer provides a way to parse Terraform files into an Abstract Syntax Tree (AST) and extract semantic information, including:

Resource relationships and dependencies
Full attribute paths and values
Dynamic blocks and meta-arguments
Changes between different versions of configurations

In [None]:
import os
import sys
import json
import random
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import Markdown, display
import re
from pathlib import Path


sys.path.append(str(Path(__file__).parent.parent))
from src.analysis.terraform_ast import TerraformAstAnalyzer
from src.analysis.diff_utils import (
    parse_patch_to_dataframe,
    analyze_terraform_changes,
    enrich_dataframe_with_terraform_ast
)

2. Parsing Terraform Configuration Files
Let's start by analyzing a simple Terraform configuration. We'll create an example file and parse it with the TerraformAstAnalyzer:

In [2]:
# Create a sample Terraform configuration
sample_tf = """
resource "aws_vpc" "main" {
  cidr_block = "10.0.0.0/16"
  tags = {
    Name = "main-vpc"
    Environment = "production"
  }
}

resource "aws_subnet" "primary" {
  vpc_id     = aws_vpc.main.id
  cidr_block = "10.0.1.0/24"
  availability_zone = "us-east-1a"
  
  tags = {
    Name = "primary-subnet"
  }
}

resource "aws_security_group" "web" {
  name        = "web-sg"
  description = "Allow web traffic"
  vpc_id      = aws_vpc.main.id

  ingress {
    from_port   = 80
    to_port     = 80
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

  ingress {
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = ["0.0.0.0/0"]
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
  
  depends_on = [aws_vpc.main]
}

resource "aws_instance" "web" {
  ami           = "ami-0c55b159cbfafe1f0"
  instance_type = "t2.micro"
  subnet_id     = aws_subnet.primary.id
  
  vpc_security_group_ids = [
    aws_security_group.web.id
  ]
  
  count = 2
  
  tags = {
    Name = "web-server-${count.index}"
  }
}
"""

# Write to a temporary file
with open("sample.tf", "w") as f:
    f.write(sample_tf)

# Parse the file using TerraformAstAnalyzer
analyzer = TerraformAstAnalyzer()
ast = analyzer.parse_file("sample.tf")

# Display basic information about the parsed file
print(f"Resources found: {len(ast.get('resource', {}))} types")
for res_type, instances in ast.get('resource', {}).items():
    print(f"  {res_type}: {len(instances)} instance(s)")

Resources found: 0 types


3. Extracting Semantic Information
Now that we've parsed the file, let's explore the semantic information we can extract:

In [3]:
# Display the first 10 attributes with their paths
print("Attributes with their paths:")
for attr in analyzer.attributes[:10]:
    print(f"  {attr.path} = {attr.value}")

if len(analyzer.attributes) > 10:
    print(f"  ... and {len(analyzer.attributes) - 10} more attributes")

Attributes with their paths:


In [4]:
# Display all references between resources
print("\nResource references:")
for ref in analyzer.references:
    print(f"  {ref.source_path} -> {ref.target_path} (via {ref.attribute_path}, type: {ref.reference_type})")

# Visualize the dependency graph
def visualize_dependencies(graph):
    """Visualize a resource dependency graph."""
    plt.figure(figsize=(10, 8))
    
    # Use a layout that spreads nodes out
    pos = nx.spring_layout(graph, seed=42)
    
    # Draw nodes with different colors based on resource type
    node_colors = []
    resource_types = {}
    
    for node in graph.nodes():
        resource_type = node.split('.')[0] if '.' in node else node
        if resource_type not in resource_types:
            resource_types[resource_type] = len(resource_types)
        node_colors.append(resource_types[resource_type])
    
    nx.draw_networkx_nodes(graph, pos, node_color=node_colors, cmap=plt.cm.tab10, 
                          node_size=700, alpha=0.8)
    
    # Draw edges
    nx.draw_networkx_edges(graph, pos, edge_color='gray', arrowsize=15, width=1.5)
    
    # Draw labels
    nx.draw_networkx_labels(graph, pos, font_size=8)
    
    plt.title("Terraform Resource Dependencies")
    plt.axis('off')
    plt.show()

# Visualize the dependency graph if not empty
if analyzer.dependency_graph.nodes:
    visualize_dependencies(analyzer.dependency_graph)


Resource references:


In [5]:
# Check for resources with count or for_each
for resource_type, instances in ast.get('resource', {}).items():
    for resource_name, attrs in instances.items():
        resource_path = f"{resource_type}.{resource_name}"
        
        count = analyzer.get_resource_count(resource_path)
        for_each = analyzer.get_resource_for_each(resource_path)
        
        if count is not None:
            print(f"Resource {resource_path} uses count: {count}")
        
        if for_each is not None:
            print(f"Resource {resource_path} uses for_each: {for_each}")

# Look for dynamic blocks
dynamic_blocks = analyzer.resolve_dynamic_blocks()
if dynamic_blocks:
    print("\nDynamic blocks found:")
    for resource, blocks in dynamic_blocks.items():
        print(f"  {resource}: {', '.join(blocks)}")

4. Analyzing Changes Between Versions
A key aspect of mutation testing is understanding changes between different versions of the same file. Let's simulate this by creating a modified version of our sample:

In [6]:
# Create a modified version of the sample
modified_tf = sample_tf.replace(
    'cidr_block = "10.0.0.0/16"',
    'cidr_block = "172.16.0.0/16"\n  enable_dns_support = true'
).replace(
    'instance_type = "t2.micro"',
    'instance_type = "t3.small"'
).replace(
    'count = 2',
    'count = 3'
)

# Write to a temporary file
with open("modified.tf", "w") as f:
    f.write(modified_tf)

# Analyze changes
changes = analyze_terraform_changes(sample_tf, modified_tf)

# Print summary of changes
print("\nChange summary:")
print(f"  Added attributes: {len(changes['added'])}")
print(f"  Removed attributes: {len(changes['removed'])}")
print(f"  Modified attributes: {len(changes['modified'])}")
print(f"  Resources with dependency changes: {len(changes.get('dependency_changes', []))}")

# Print added attributes
if changes['added']:
    print("\nAdded attributes:")
    for item in changes['added']:
        print(f"  + {item['path']} = {item['value']}")

# Print removed attributes
if changes['removed']:
    print("\nRemoved attributes:")
    for item in changes['removed']:
        print(f"  - {item['path']} = {item['value']}")

# Print modified attributes
if changes['modified']:
    print("\nModified attributes:")
    for item in changes['modified']:
        print(f"  ~ {item['path']}: {item['old_value']} -> {item['new_value']}")


Change summary:
  Added attributes: 0
  Removed attributes: 0
  Modified attributes: 0
  Resources with dependency changes: 0


Analyzing Git Diff Fragments
Now let's examine how to analyze git diff fragments, which is crucial for understanding real-world Terraform changes:

In [7]:
# Create a sample git diff
sample_diff = """diff --git a/main.tf b/main.tf
index 1234567..abcdefg 100644
--- a/main.tf
+++ b/main.tf
@@ -10,7 +10,7 @@ resource "aws_vpc" "main" {
 
 resource "aws_subnet" "primary" {
   vpc_id     = aws_vpc.main.id
-  cidr_block = "10.0.1.0/24"
+  cidr_block = "10.0.2.0/24"
   availability_zone = "us-east-1a"
   
   tags = {
@@ -25,6 +25,12 @@ resource "aws_security_group" "web" {
   ingress {
     from_port   = 80
     to_port     = 80
+    protocol    = "tcp"
+    cidr_blocks = ["0.0.0.0/0"]
+  }
+
+  ingress {
+    from_port   = 443
+    to_port     = 443
     protocol    = "tcp"
     cidr_blocks = ["0.0.0.0/0"]
   }
"""

# Parse the diff to a DataFrame
diff_df = parse_patch_to_dataframe(sample_diff)

# Display the DataFrame
print("Basic diff DataFrame:")
display(diff_df.head(10))

# Enrich the DataFrame with Terraform semantic information
enriched_df = enrich_dataframe_with_terraform_ast(diff_df)

# Display the enriched DataFrame
print("\nEnriched diff DataFrame:")
display(enriched_df[enriched_df['change'].isin(['added', 'removed'])].head(10))

Basic diff DataFrame:


Unnamed: 0,file,old_lineno,new_lineno,hunk_id,change,content
0,,0.0,0.0,0.0,context,diff --git a/main.tf b/main.tf
1,,1.0,1.0,0.0,context,index 1234567..abcdefg 100644
2,a/main.tf,,,,meta,--- a/main.tf
3,a/main.tf,2.0,2.0,0.0,context,+++ b/main.tf
4,a/main.tf,,,1.0,meta,"@@ -10,7 +10,7 @@ resource ""aws_vpc"" ""main"" {"
5,a/main.tf,10.0,10.0,1.0,context,
6,a/main.tf,11.0,11.0,1.0,context,"resource ""aws_subnet"" ""primary"" {"
7,a/main.tf,12.0,12.0,1.0,context,vpc_id = aws_vpc.main.id
8,a/main.tf,13.0,,1.0,removed,"cidr_block = ""10.0.1.0/24"""
9,a/main.tf,,13.0,1.0,added,"cidr_block = ""10.0.2.0/24"""



Enriched diff DataFrame:


Unnamed: 0,file,old_lineno,new_lineno,hunk_id,change,content,attribute_path,resource_type,resource_name,change_type
8,a/main.tf,13.0,,1.0,removed,"cidr_block = ""10.0.1.0/24""",,,,
9,a/main.tf,,13.0,1.0,added,"cidr_block = ""10.0.2.0/24""",,,,
17,a/main.tf,,28.0,2.0,added,"protocol = ""tcp""",ingress[1].protocol,protocol,,added
18,a/main.tf,,29.0,2.0,added,"cidr_blocks = [""0.0.0.0/0""]",ingress[1].cidr_blocks[0],cidr_blocks[0],,added
19,a/main.tf,,30.0,2.0,added,},,,,
20,a/main.tf,,31.0,2.0,added,,,,,
21,a/main.tf,,32.0,2.0,added,ingress {,,,,
22,a/main.tf,,33.0,2.0,added,from_port = 443,,,,
23,a/main.tf,,34.0,2.0,added,to_port = 443,,,,


Identifying Bug Patterns for Mutation Operators
Now let's analyze some common bug patterns in Terraform configurations to guide mutation operator design:

In [8]:
# Create a collection of sample bug patterns
bug_samples = {
    "incorrect_reference": """
    resource "aws_instance" "web" {
      ami           = "ami-0c55b159cbfafe1f0"
      subnet_id     = aws_subnet.wrong.id  # Referencing non-existent resource
    }
    """,
    
    "invalid_cidr": """
    resource "aws_vpc" "main" {
      cidr_block = "10.0.0.0/8"  # Too large for most applications
    }
    """,
    
    "missing_required_tag": """
    resource "aws_instance" "db" {
      ami           = "ami-0c55b159cbfafe1f0"
      instance_type = "t2.micro"
      # Missing required tags
    }
    """,
    
    "security_group_too_open": """
    resource "aws_security_group" "alb" {
      ingress {
        protocol    = "-1"
        from_port   = 0
        to_port     = 0
        cidr_blocks = ["0.0.0.0/0"]  # Too permissive
      }
    }
    """,
    
    "hardcoded_credentials": """
    provider "aws" {
      access_key = "AKIAIOSFODNN7EXAMPLE"
      secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
      region     = "us-west-2"
    }
    """
}

# Analyze each bug pattern
results = []

for pattern_name, content in bug_samples.items():
    analyzer = TerraformAstAnalyzer()
    ast = analyzer.parse_hcl(content)
    
    # Extract attributes
    attributes = [
        {"name": attr.path, "value": str(attr.value)}
        for attr in analyzer.attributes
    ]
    
    # Extract references
    references = [
        {"source": ref.source_path, "target": ref.target_path}
        for ref in analyzer.references
    ]
    
    results.append({
        "pattern": pattern_name,
        "content": content,
        "attributes": attributes,
        "references": references
    })

# Display the results
for result in results:
    print(f"\n=== Bug Pattern: {result['pattern']} ===")
    print("Attributes:")
    for attr in result['attributes']:
        print(f"  {attr['name']} = {attr['value']}")
    
    print("References:")
    if result['references']:
        for ref in result['references']:
            print(f"  {ref['source']} -> {ref['target']}")
    else:
        print("  No references found")


=== Bug Pattern: incorrect_reference ===
Attributes:
References:
  No references found

=== Bug Pattern: invalid_cidr ===
Attributes:
References:
  No references found

=== Bug Pattern: missing_required_tag ===
Attributes:
References:
  No references found

=== Bug Pattern: security_group_too_open ===
Attributes:
References:
  No references found

=== Bug Pattern: hardcoded_credentials ===
Attributes:
  provider[0].aws.access_key = AKIAIOSFODNN7EXAMPLE
  provider[0].aws.secret_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
  provider[0].aws.region = us-west-2
References:
  No references found


Designing Mutation Operators
Based on our analysis, we can now design mutation operators specifically for Terraform:

In [9]:
# Define mutation operator templates
mutation_operators = [
    {
        "name": "CIDR_RANGE_MUTATION",
        "description": "Mutate CIDR block ranges",
        "target_pattern": r'cidr_block\s*=\s*"([0-9\.]+)/([0-9]+)"',
        "mutation_logic": "Modify the subnet mask to be wider or narrower",
        "sample_mutations": [
            'cidr_block = "10.0.0.0/16" -> cidr_block = "10.0.0.0/24"',
            'cidr_block = "10.0.0.0/24" -> cidr_block = "10.0.0.0/8"'
        ]
    },
    {
        "name": "REFERENCE_REPLACEMENT",
        "description": "Replace resource references with incorrect ones",
        "target_pattern": r'(\w+)_id\s*=\s*([a-zA-Z0-9_]+)\.([a-zA-Z0-9_]+)\.id',
        "mutation_logic": "Replace the reference with another resource of similar type",
        "sample_mutations": [
            'subnet_id = aws_subnet.primary.id -> subnet_id = aws_subnet.secondary.id',
            'vpc_id = aws_vpc.main.id -> vpc_id = aws_vpc.wrong.id'
        ]
    },
    {
        "name": "COUNT_MODIFICATION",
        "description": "Modify count parameter",
        "target_pattern": r'count\s*=\s*([0-9]+)',
        "mutation_logic": "Increase, decrease, or zero out the count value",
        "sample_mutations": [
            'count = 2 -> count = 1',
            'count = 2 -> count = 0',
            'count = 2 -> count = 3'
        ]
    },
    {
        "name": "SECURITY_GROUP_RULE_MUTATION",
        "description": "Mutate security group rules",
        "target_pattern": r'(from_port|to_port)\s*=\s*([0-9]+)',
        "mutation_logic": "Modify port ranges to introduce security vulnerabilities",
        "sample_mutations": [
            'from_port = 80 -> from_port = 0',
            'to_port = 443 -> to_port = 65535'
        ]
    },
    {
        "name": "TAG_REMOVAL",
        "description": "Remove required tags",
        "target_pattern": r'tags\s*=\s*\{([^}]*)\}',
        "mutation_logic": "Remove or modify tags that might be required by policy",
        "sample_mutations": [
            'tags = { Name = "example" } -> tags = {}',
            'tags = { Name = "example", Environment = "prod" } -> tags = { Name = "example" }'
        ]
    }
]

# Display the mutation operators
for op in mutation_operators:
    print(f"\n=== Mutation Operator: {op['name']} ===")
    print(f"Description: {op['description']}")
    print(f"Target Pattern: {op['target_pattern']}")
    print("Sample Mutations:")
    for mutation in op['sample_mutations']:
        print(f"  {mutation}")


=== Mutation Operator: CIDR_RANGE_MUTATION ===
Description: Mutate CIDR block ranges
Target Pattern: cidr_block\s*=\s*"([0-9\.]+)/([0-9]+)"
Sample Mutations:
  cidr_block = "10.0.0.0/16" -> cidr_block = "10.0.0.0/24"
  cidr_block = "10.0.0.0/24" -> cidr_block = "10.0.0.0/8"

=== Mutation Operator: REFERENCE_REPLACEMENT ===
Description: Replace resource references with incorrect ones
Target Pattern: (\w+)_id\s*=\s*([a-zA-Z0-9_]+)\.([a-zA-Z0-9_]+)\.id
Sample Mutations:
  subnet_id = aws_subnet.primary.id -> subnet_id = aws_subnet.secondary.id
  vpc_id = aws_vpc.main.id -> vpc_id = aws_vpc.wrong.id

=== Mutation Operator: COUNT_MODIFICATION ===
Description: Modify count parameter
Target Pattern: count\s*=\s*([0-9]+)
Sample Mutations:
  count = 2 -> count = 1
  count = 2 -> count = 0
  count = 2 -> count = 3

=== Mutation Operator: SECURITY_GROUP_RULE_MUTATION ===
Description: Mutate security group rules
Target Pattern: (from_port|to_port)\s*=\s*([0-9]+)
Sample Mutations:
  from_port = 80

Implementing a Simple Mutation Generator
Finally, let's implement a simple mutation generator for Terraform files:

In [10]:
def apply_mutation(content, operator):
    """Apply a mutation operator to Terraform content."""
    import random
    
    pattern = re.compile(operator["target_pattern"])
    matches = list(pattern.finditer(content))
    
    if not matches:
        return None, "No matches found for this operator"
    
    # Choose a random match to mutate
    match = random.choice(matches)
    
    # Apply basic mutations based on the operator type
    if operator["name"] == "CIDR_RANGE_MUTATION":
        network, mask = match.groups()
        mask = int(mask)
        new_mask = max(8, mask - 8) if random.random() < 0.5 else min(30, mask + 8)
        new_content = content[:match.start()] + f'cidr_block = "{network}/{new_mask}"' + content[match.end():]
        mutation_desc = f'Changed CIDR mask from /{mask} to /{new_mask}'
    
    elif operator["name"] == "COUNT_MODIFICATION":
        count = int(match.group(1))
        new_count = max(0, count - 1) if count > 0 and random.random() < 0.5 else count + 1
        new_content = content[:match.start()] + f'count = {new_count}' + content[match.end():]
        mutation_desc = f'Changed count from {count} to {new_count}'
    
    elif operator["name"] == "SECURITY_GROUP_RULE_MUTATION":
        port_type, port = match.groups()
        if port_type == "from_port":
            new_port = 0  # Open up the lower bound
        else:  # to_port
            new_port = 65535  # Open up the upper bound
        
        new_content = content[:match.start()] + f'{port_type} = {new_port}' + content[match.end():]
        mutation_desc = f'Changed {port_type} from {port} to {new_port}'
    
    elif operator["name"] == "TAG_REMOVAL":
        tags_content = match.group(1)
        new_content = content[:match.start()] + 'tags = {}' + content[match.end():]
        mutation_desc = f'Removed all tags'
    
    elif operator["name"] == "REFERENCE_REPLACEMENT":
        resource_type, resource_name = match.group(2), match.group(3)
        new_name = f"{resource_name}_wrong"
        new_content = content[:match.start(2)] + resource_type + content[match.end(2):match.start(3)] + new_name + content[match.end(3):]
        mutation_desc = f'Changed reference from {resource_type}.{resource_name} to {resource_type}.{new_name}'
    
    else:
        return None, "Unsupported operator"
    
    return new_content, mutation_desc

# Test the mutation generator on our sample
for _ in range(3):  # Generate 3 different mutations
    op = random.choice(mutation_operators)
    mutated, desc = apply_mutation(sample_tf, op)
    
    if mutated:
        print(f"\n=== Applied Mutation: {op['name']} ===")
        print(f"Description: {desc}")
        
        # Show diff
        from difflib import unified_diff
        diff = '\n'.join(unified_diff(
            sample_tf.splitlines(),
            mutated.splitlines(),
            fromfile='original.tf',
            tofile='mutated.tf',
            lineterm=''
        ))
        
        print("\nDiff:")
        print(diff)
        
        # Analyze if the mutation introduces a semantic change
        changes = analyze_terraform_changes(sample_tf, mutated)
        
        print("\nSemantic Changes:")
        if changes['modified']:
            for item in changes['modified']:
                print(f"  ~ {item['path']}: {item['old_value']} -> {item['new_value']}")
        else:
            print("  No semantic changes detected.")

NameError: name 'random' is not defined

9. Conclusion and Next Steps
In this notebook, we've explored how to use the TerraformAstAnalyzer for advanced semantic analysis of Terraform configurations. This approach provides several key advantages for mutation testing:

Precise Mutations: By understanding the AST, we can create mutations that are syntactically valid and semantically meaningful.
Resource Awareness: We can target specific resources and their dependencies.
Real-world Bug Patterns: We can base mutations on common bug patterns.

Next steps for improving the mutation testing framework:

Implement a full suite of mutation operators based on real-world bug patterns
Create a testing framework to measure the effectiveness of mutations
Integrate with CI/CD pipelines for continuous validation
Add coverage analysis to determine which parts of the infrastructure are tested
Prioritize mutations based on impact analysis

This approach will lead to more effective Infrastructure as Code testing and higher quality Terraform configurations.