Scenario: The Multi-Server Security Scanner
Imagine you are building a tool that scans a list of servers to check for vulnerabilities.

The Requirements:

Ping Server: Check if the server is alive (takes ~0.5s).

Scan Ports: If it's alive, scan it for open ports (takes ~1.5s).

Fetch SSL Certificate: Simultaneously with the port scan, grab the SSL certificate (takes ~1.0s).

Logging: Every scan result (success or failure) must be appended to a shared global list called audit_log.

The Constraints:

You are using an old library for the SSL check that is blocking (time.sleep).

The Port Scan is native async (asyncio.sleep).

You must not scan more than 3 servers at a time (to avoid being flagged as a DDoS attack).

The entire check for one server must not take longer than 2.2 seconds.

In [7]:
import asyncio
import time
import csv
# Shared resource
audit_log = []
# 1. TODO: Create a Lock for the audit_log
audit_lock = asyncio.Lock()

# 2. TODO: Create a Semaphore to limit to 3 servers at once
thread_limit = asyncio.Semaphore(3)

def blocking_ssl_check(ip):
    # This is our "old" library call
    time.sleep(1.0)
    return f"SSL Valid for {ip}"

async def async_port_scan(ip):
    # This is a modern async call
    await asyncio.sleep(1.5)
    return f"Ports 80, 443 open on {ip}"

async def scan_server(ip):
    # TODO: Wrap this in the semaphore
    async with thread_limit:
        try:
            print(f"--- Starting scan for {ip} ---")
            
            # TODO: Add a 2.2s timeout for the whole scan
            # TODO: Run SSL check (via thread) and Port Scan (async) in parallel
            task  = asyncio.gather(
                asyncio.to_thread(blocking_ssl_check, ip),
                async_port_scan(ip)
            )
            ssl_status, port_scan_status = await asyncio.wait_for(task, timeout=2.2)
            results = [
                ip, 
                ssl_status, 
                port_scan_status, 
                time.strftime("%Y-%m-%d %H:%M:%S")
                ]
    
            # TODO: Use the Lock to safely append to audit_log
            async with audit_lock:
                audit_log.append(results)
                print(f"Log updated for {ip}")
                await asyncio.to_thread(save_to_csv, results)
            
        except asyncio.TimeoutError:
            print(f"TIMED OUT: {ip} was too slow.")

def save_to_csv(data):
    with open('audit_results.csv', mode='a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(data)

async def main():
    with open('audit_results.csv', 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["IP Address", "SSL Status", "Open Ports", "Timestamp"])
    
    ips = [f"192.168.1.{i}" for i in range(1, 11)] # 10 servers
    # TODO: Run all 10 scans concurrently
    k = [scan_server(ip) for ip in ips]
    await asyncio.gather(*k)
    
    print(f"Final Audit Log: {len(audit_log)} entries.")
    

await main()

--- Starting scan for 192.168.1.1 ---
--- Starting scan for 192.168.1.2 ---
--- Starting scan for 192.168.1.3 ---
Log updated for 192.168.1.1
Log updated for 192.168.1.3
--- Starting scan for 192.168.1.4 ---
Log updated for 192.168.1.2
--- Starting scan for 192.168.1.5 ---
--- Starting scan for 192.168.1.6 ---
Log updated for 192.168.1.4
Log updated for 192.168.1.6
--- Starting scan for 192.168.1.7 ---
Log updated for 192.168.1.5
--- Starting scan for 192.168.1.8 ---
--- Starting scan for 192.168.1.9 ---
Log updated for 192.168.1.7
Log updated for 192.168.1.9
--- Starting scan for 192.168.1.10 ---
Log updated for 192.168.1.8
Log updated for 192.168.1.10
Final Audit Log: 10 entries.
