In [1]:
import os
import json
import random
import datetime
import logging
from collections import defaultdict
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Tuple

# Constants
N = 5000  # Number of JSON files (reduced for testing)
M_MIN = 50  # Minimum number of records per file
M_MAX = 100  # Maximum number of records per file
K_MIN = 100  # Minimum number of cities
K_MAX = 200  # Maximum number of cities
NULL_PROBABILITY = random.uniform(0.001, 0.005)  # Probability of NULL in a flight record property
LOG_FILE = 'flight_data_processing.log'

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def generate_random_cities() -> List[str]:
    """Generate a list of random city names."""
    # Generate a list of cities with names formatted as 'City_X', where X is an index
    return [f"City_{i}" for i in range(random.randint(K_MIN, K_MAX))]

def maybe_null(value: Any) -> Any:
    """Return None with a given probability, otherwise return the value."""
    # Determine whether to return the original value or None based on NULL_PROBABILITY
    return value if random.random() > NULL_PROBABILITY else None

def generate_random_flight(cities: List[str]) -> Dict[str, Any]:
    """Generate a random flight record with some fields potentially set to None."""
    date = datetime.datetime.now().date().isoformat()  # Current date in ISO format
    origin_city = random.choice(cities)  # Randomly select an origin city
    destination_city = random.choice([city for city in cities if city != origin_city])  # Ensure destination is different
    flight_duration_secs = maybe_null(random.randint(1800, 7200))  # Random flight duration (30 min to 2 hours)
    passengers_on_board = maybe_null(random.randint(10, 300))  # Random number of passengers (10 to 300)

    # Create the flight record with some fields potentially set to None
    return {
        "date": maybe_null(date),
        "origin_city": maybe_null(origin_city),
        "destination_city": maybe_null(destination_city),
        "flight_duration_secs": flight_duration_secs,
        "# of passengers on board": passengers_on_board
    }

def generate_json_files(cities: List[str], output_dir: str) -> None:
    """Generate JSON files with random flight records."""
    # Ensure the output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    for i in range(N):
        num_records = random.randint(M_MIN, M_MAX)  # Random number of records per file
        flights = [generate_random_flight(cities) for _ in range(num_records)]  # Generate flight records
        file_name = f"{datetime.datetime.now().strftime('%m-%y')}-{random.choice(cities)}-flights.json"  # File name
        file_path = os.path.join(output_dir, file_name)  # Full file path
        
        # Write the flight records to a JSON file
        with open(file_path, "w") as file:
            json.dump(flights, file, indent=4)
    
    # Log the generation of files
    logging.info(f"Generated {N} flight JSON files.")

def analyze_file(file_path: Path) -> Tuple[int, int, Dict[str, List[int]], Dict[str, int], Dict[str, int]]:
    """Analyze a single JSON file to extract flight statistics."""
    try:
        with open(file_path, 'r') as file:
            flights = json.load(file)  # Load the flight records from the JSON file
        
        total_records = 0  # Counter for total records
        dirty_records = 0  # Counter for dirty (incomplete) records
        destination_city_stats = defaultdict(list)  # Dictionary to collect flight durations
        passengers_arrived = defaultdict(int)  # Dictionary to count passengers arrived
        passengers_departed = defaultdict(int)  # Dictionary to count passengers departed

        for flight in flights:
            # Check if the record has any None values
            if None in flight.values():
                dirty_records += 1
                continue
            
            total_records += 1  # Increment total records counter

            # Extract relevant fields
            destination_city = flight['destination_city']
            flight_duration = flight['flight_duration_secs']
            passengers = flight['# of passengers on board']

            # Collect flight duration for the destination city
            if flight_duration is not None:
                destination_city_stats[destination_city].append(flight_duration)

            # Update passenger counts for origin and destination cities
            if passengers is not None:
                origin_city = flight['origin_city']
                passengers_departed[origin_city] += passengers
                passengers_arrived[destination_city] += passengers

        return total_records, dirty_records, destination_city_stats, passengers_arrived, passengers_departed
    except Exception as e:
        logging.error(f"Error processing file {file_path}: {e}")
        return 0, 0, defaultdict(list), defaultdict(int), defaultdict(int)

def combine_results(results: List[Tuple[int, int, Dict[str, List[int]], Dict[str, int], Dict[str, int]]]) -> Dict[str, Any]:
    """Combine results from analyzing multiple files."""
    total_records = 0  # Counter for total records
    dirty_records = 0  # Counter for dirty records
    combined_destination_city_stats = defaultdict(list)  # Combined flight durations
    combined_passengers_arrived = defaultdict(int)  # Combined passengers arrived
    combined_passengers_departed = defaultdict(int)  # Combined passengers departed

    for result in results:
        total_records += result[0]
        dirty_records += result[1]
        
        # Combine flight durations for each city
        for city, durations in result[2].items():
            combined_destination_city_stats[city].extend(durations)
        
        # Combine passenger counts for each city
        for city, passengers in result[3].items():
            combined_passengers_arrived[city] += passengers
        for city, passengers in result[4].items():
            combined_passengers_departed[city] += passengers

    # Find top 25 destinations by the number of recorded flights
    top_25_destinations = sorted(combined_destination_city_stats, key=lambda x: len(combined_destination_city_stats[x]), reverse=True)[:25]
    
    # Calculate average and 95th percentile flight durations for top 25 destinations
    avg_flight_duration = {city: sum(combined_destination_city_stats[city]) / len(combined_destination_city_stats[city]) for city in top_25_destinations}
    p95_flight_duration = {city: sorted(combined_destination_city_stats[city])[int(0.95 * len(combined_destination_city_stats[city])) - 1] for city in top_25_destinations}

    # Find the cities with maximum passengers arrived and departed
    max_passengers_arrived = max(combined_passengers_arrived.items(), key=lambda x: x[1])
    max_passengers_departed = max(combined_passengers_departed.items(), key=lambda x: x[1])

    # Return the combined results
    return {
        "total_records": total_records,
        "dirty_records": dirty_records,
        "avg_flight_duration": avg_flight_duration,
        "p95_flight_duration": p95_flight_duration,
        "max_passengers_arrived": max_passengers_arrived,
        "max_passengers_departed": max_passengers_departed
    }

def analyze_files(directory: str) -> Dict[str, Any]:
    """Analyze JSON files in the specified directory to extract flight statistics."""
    start_time = datetime.datetime.now()  # Record start time
    file_paths = list(Path(directory).rglob('*.json'))  # List all JSON files in the directory

    results = []

    # Use ThreadPoolExecutor for concurrent processing of files
    with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
        futures = {executor.submit(analyze_file, file_path): file_path for file_path in file_paths}
        
        # Collect results as futures complete
        for future in as_completed(futures):
            result = future.result()
            results.append(result)

    # Combine results from all files
    combined_results = combine_results(results)
    duration = datetime.datetime.now() - start_time  # Calculate total run duration
    combined_results["run_duration"] = duration.total_seconds()

    return combined_results

def main() -> None:
    """Main function to generate JSON files and analyze them."""
    try:
        cities = generate_random_cities()  # Generate random cities
        output_dir = "./flights"  # Output directory for JSON files
        generate_json_files(cities, output_dir)  # Generate flight data JSON files
        logging.info(f"Generated {N} flight JSON files.")
        
        analysis_result = analyze_files(output_dir)  # Analyze the generated JSON files
        print(json.dumps(analysis_result, indent=4))  # Print the analysis results
        logging.info("Analysis completed successfully.")
    except Exception as e:
        logging.error(f"An error occurred: {e}")

# Test file generation and analysis in the notebook
if __name__ == "__main__":
    cities = generate_random_cities()  # Generate random cities
    output_dir = "./test_flights"  # Output directory for JSON files
    generate_json_files(cities, output_dir)  # Generate flight data JSON files
    analysis_result = analyze_files(output_dir)  # Analyze the generated JSON files
    print(json.dumps(analysis_result, indent=4))  


2024-08-08 22:42:04,291 - INFO - Generated 5000 flight JSON files.


{
    "total_records": 10472,
    "dirty_records": 157,
    "avg_flight_duration": {
        "City_45": 4557.848484848485,
        "City_49": 4331.489361702128,
        "City_54": 4486.602150537635,
        "City_59": 4557.258064516129,
        "City_68": 4540.271739130435,
        "City_79": 4509.802197802198,
        "City_80": 4683.197802197802,
        "City_46": 4506.175824175824,
        "City_22": 4884.344444444445,
        "City_103": 4649.366666666667,
        "City_23": 4926.4,
        "City_78": 4230.177777777778,
        "City_66": 4661.797752808989,
        "City_36": 4211.741573033708,
        "City_92": 4339.157303370786,
        "City_40": 4671.438202247191,
        "City_16": 4492.921348314607,
        "City_126": 4619.079545454545,
        "City_52": 4556.284090909091,
        "City_93": 4609.886363636364,
        "City_50": 4354.454545454545,
        "City_76": 4354.19540229885,
        "City_39": 4464.137931034483,
        "City_55": 4414.183908045977,
        "City