## PROMPT
Generate a pipeline for Databricks:

Get Data information from two web addresses:

Space launches: https://api.spacexdata.com/v3/launches

Space rockets: https://api.spacexdata.com/v3/rockets

Combine each launch record with the name of the corresponding rocket.

Send the final result to the web address: https://httpbin.org/post

The script must provide status updates on its progress, report any errors encountered, confirm the outcome of the final data sending step, and measure/report execution times.


In [0]:
import requests
import json
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

def log_status(message):
    """Helper function to log status messages with timestamp"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}")

def fetch_data(url, description):
    """Fetch data from API with error handling"""
    try:
        start_time = time.time()
        log_status(f"Fetching {description} from {url}")
        
        response = requests.get(url)
        response.raise_for_status()  # Raises exception for 4XX/5XX errors
        
        data = response.json()
        elapsed = time.time() - start_time
        log_status(f"Successfully fetched {len(data) if isinstance(data, list) else 1} {description} records in {elapsed:.2f} seconds")
        return data
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error fetching {description}: {str(e)}")
        raise

def send_data(url, data):
    """Send data to endpoint with error handling"""
    try:
        start_time = time.time()
        log_status(f"Sending combined data to {url}")
        
        headers = {'Content-Type': 'application/json'}
        response = requests.post(url, data=json.dumps(data), headers=headers)
        response.raise_for_status()
        
        elapsed = time.time() - start_time
        log_status(f"Data successfully sent in {elapsed:.2f} seconds")
        log_status(f"Response status: {response.status_code}")
        log_status(f"Response content: {response.text[:200]}...")  # Truncate long responses
        return True
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error sending data: {str(e)}")
        if hasattr(e, 'response') and e.response:
            log_status(f"Response content: {e.response.text[:200]}...")
        return False

def main():
    # Record total pipeline execution time
    pipeline_start = time.time()
    
    try:
        # Initialize Spark session
        spark = SparkSession.builder.appName("SpaceXDataPipeline").getOrCreate()
        
        # 1. Fetch data from APIs
        launches_url = "https://api.spacexdata.com/v3/launches"
        rockets_url = "https://api.spacexdata.com/v3/rockets"
        
        launches_data = fetch_data(launches_url, "launches")
        rockets_data = fetch_data(rockets_url, "rockets")
        
        # 2. Process and combine data
        log_status("Starting data processing...")
        process_start = time.time()
        
        # Create DataFrames
        launches_df = spark.createDataFrame(launches_data)
        rockets_df = spark.createDataFrame(rockets_data)
        
        # Select only needed columns from rockets
        rockets_df = rockets_df.select(col("rocket_id"), col("rocket_name"))
        
        # Join launches with rocket names
        combined_df = launches_df.join(rockets_df, 
                                     launches_df.rocket.rocket_id == rockets_df.rocket_id,
                                     "left")
        
        # Drop the redundant rocket_id column
        combined_df = combined_df.drop("rocket_id")
        
        # Convert back to JSON for sending
        combined_data = [row.asDict() for row in combined_df.collect()]
        
        process_elapsed = time.time() - process_start
        log_status(f"Data processing completed in {process_elapsed:.2f} seconds")
        log_status(f"Combined {len(combined_data)} launch records with rocket names")
        
        # 3. Send data to endpoint
        target_url = "https://httpbin.org/post"
        send_success = send_data(target_url, combined_data)
        
        if send_success:
            log_status("Pipeline completed successfully")
        else:
            log_status("Pipeline completed with errors in sending step")
            
    except Exception as e:
        log_status(f"Pipeline failed with error: {str(e)}")
        raise
    finally:
        pipeline_elapsed = time.time() - pipeline_start
        log_status(f"Total pipeline execution time: {pipeline_elapsed:.2f} seconds")

if __name__ == "__main__":
    main()

DeepSeek need a correction of the code, so we try to give the output back to the LLM to write again the code

In [0]:
import requests
import json
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, MapType, ArrayType

def log_status(message):
    """Helper function to log status messages with timestamp"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}")

def fetch_data(url, description):
    """Fetch data from API with error handling"""
    try:
        start_time = time.time()
        log_status(f"Fetching {description} from {url}")
        
        response = requests.get(url)
        response.raise_for_status()  # Raises exception for 4XX/5XX errors
        
        data = response.json()
        elapsed = time.time() - start_time
        log_status(f"Successfully fetched {len(data) if isinstance(data, list) else 1} {description} records in {elapsed:.2f} seconds")
        return data
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error fetching {description}: {str(e)}")
        raise

def send_data(url, data):
    """Send data to endpoint with error handling"""
    try:
        start_time = time.time()
        log_status(f"Sending combined data to {url}")
        
        headers = {'Content-Type': 'application/json'}
        response = requests.post(url, data=json.dumps(data), headers=headers)
        response.raise_for_status()
        
        elapsed = time.time() - start_time
        log_status(f"Data successfully sent in {elapsed:.2f} seconds")
        log_status(f"Response status: {response.status_code}")
        log_status(f"Response content: {response.text[:200]}...")  # Truncate long responses
        return True
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error sending data: {str(e)}")
        if hasattr(e, 'response') and e.response:
            log_status(f"Response content: {e.response.text[:200]}...")
        return False

def get_launches_schema():
    """Define schema for launches data to handle complex nested structures"""
    return StructType([
        StructField("flight_number", IntegerType(), True),
        StructField("mission_name", StringType(), True),
        StructField("launch_year", StringType(), True),
        StructField("launch_date_utc", StringType(), True),
        StructField("rocket", StructType([
            StructField("rocket_id", StringType(), True),
            StructField("rocket_name", StringType(), True),
            StructField("rocket_type", StringType(), True)
        ]), True),
        StructField("launch_success", BooleanType(), True),
        StructField("links", MapType(StringType(), StringType()), True),
        StructField("details", StringType(), True)
    ])

def get_rockets_schema():
    """Define schema for rockets data"""
    return StructType([
        StructField("rocket_id", StringType(), True),
        StructField("rocket_name", StringType(), True),
        StructField("rocket_type", StringType(), True),
        StructField("active", BooleanType(), True),
        StructField("cost_per_launch", IntegerType(), True),
        StructField("success_rate_pct", IntegerType(), True),
        StructField("first_flight", StringType(), True),
        StructField("country", StringType(), True),
        StructField("company", StringType(), True)
    ])

def transform_launch_data(launch):
    """Transform launch data to match our schema"""
    return {
        "flight_number": launch.get("flight_number"),
        "mission_name": launch.get("mission_name"),
        "launch_year": launch.get("launch_year"),
        "launch_date_utc": launch.get("launch_date_utc"),
        "rocket": {
            "rocket_id": launch.get("rocket", {}).get("rocket_id"),
            "rocket_name": None,  # Will be filled in join
            "rocket_type": launch.get("rocket", {}).get("rocket_type")
        },
        "launch_success": launch.get("launch_success"),
        "links": launch.get("links", {}),
        "details": launch.get("details")
    }

def main():
    # Record total pipeline execution time
    pipeline_start = time.time()
    
    try:
        # Initialize Spark session
        spark = SparkSession.builder.appName("SpaceXDataPipeline").getOrCreate()
        
        # 1. Fetch data from APIs
        launches_url = "https://api.spacexdata.com/v3/launches"
        rockets_url = "https://api.spacexdata.com/v3/rockets"
        
        launches_data = fetch_data(launches_url, "launches")
        rockets_data = fetch_data(rockets_url, "rockets")
        
        # 2. Process and combine data
        log_status("Starting data processing...")
        process_start = time.time()
        
        # Transform data to match schemas
        transformed_launches = [transform_launch_data(launch) for launch in launches_data]
        
        # Create DataFrames with explicit schemas
        launches_df = spark.createDataFrame(transformed_launches, schema=get_launches_schema())
        rockets_df = spark.createDataFrame(rockets_data, schema=get_rockets_schema())
        
        # Select only needed columns from rockets
        rockets_df = rockets_df.select(
            col("rocket_id"), 
            col("rocket_name").alias("rocket_name_from_join"),
            col("rocket_type")
        )
        
        # Join launches with rocket names
        combined_df = launches_df.join(
            rockets_df, 
            launches_df.rocket.rocket_id == rockets_df.rocket_id,
            "left"
        )
        
        # Update the rocket_name in the nested struct
        combined_df = combined_df.withColumn(
            "rocket",
            col("rocket").withField(
                "rocket_name",
                col("rocket_name_from_join")
            )
        ).drop("rocket_name_from_join", "rocket_id")
        
        # Convert back to JSON for sending
        combined_data = [json.loads(row.json()) for row in combined_df.collect()]
        
        process_elapsed = time.time() - process_start
        log_status(f"Data processing completed in {process_elapsed:.2f} seconds")
        log_status(f"Combined {len(combined_data)} launch records with rocket names")
        
        # 3. Send data to endpoint
        target_url = "https://httpbin.org/post"
        send_success = send_data(target_url, combined_data)
        
        if send_success:
            log_status("Pipeline completed successfully")
        else:
            log_status("Pipeline completed with errors in sending step")
            
    except Exception as e:
        log_status(f"Pipeline failed with error: {str(e)}")
        raise
    finally:
        pipeline_elapsed = time.time() - pipeline_start
        log_status(f"Total pipeline execution time: {pipeline_elapsed:.2f} seconds")

if __name__ == "__main__":
    main()

Another error appear so i try again to give back to the LLM

In [0]:
import requests
import json
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, MapType

def log_status(message):
    """Helper function to log status messages with timestamp"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}")

def fetch_data(url, description):
    """Fetch data from API with error handling"""
    try:
        start_time = time.time()
        log_status(f"Fetching {description} from {url}")
        
        response = requests.get(url)
        response.raise_for_status()  # Raises exception for 4XX/5XX errors
        
        data = response.json()
        elapsed = time.time() - start_time
        log_status(f"Successfully fetched {len(data) if isinstance(data, list) else 1} {description} records in {elapsed:.2f} seconds")
        return data
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error fetching {description}: {str(e)}")
        raise

def send_data(url, data):
    """Send data to endpoint with error handling"""
    try:
        start_time = time.time()
        log_status(f"Sending combined data to {url}")
        
        headers = {'Content-Type': 'application/json'}
        response = requests.post(url, data=json.dumps(data), headers=headers)
        response.raise_for_status()
        
        elapsed = time.time() - start_time
        log_status(f"Data successfully sent in {elapsed:.2f} seconds")
        log_status(f"Response status: {response.status_code}")
        log_status(f"Response content: {response.text[:200]}...")  # Truncate long responses
        return True
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error sending data: {str(e)}")
        if hasattr(e, 'response') and e.response:
            log_status(f"Response content: {e.response.text[:200]}...")
        return False

def get_launches_schema():
    """Define schema for launches data to handle complex nested structures"""
    return StructType([
        StructField("flight_number", IntegerType(), True),
        StructField("mission_name", StringType(), True),
        StructField("launch_year", StringType(), True),
        StructField("launch_date_utc", StringType(), True),
        StructField("rocket", StructType([
            StructField("rocket_id", StringType(), True),
            StructField("rocket_name", StringType(), True),
            StructField("rocket_type", StringType(), True)
        ]), True),
        StructField("launch_success", BooleanType(), True),
        StructField("links", MapType(StringType(), StringType()), True),
        StructField("details", StringType(), True)
    ])

def get_rockets_schema():
    """Define schema for rockets data"""
    return StructType([
        StructField("rocket_id", StringType(), True),
        StructField("rocket_name", StringType(), True),
        StructField("rocket_type", StringType(), True),
        StructField("active", BooleanType(), True),
        StructField("cost_per_launch", IntegerType(), True),
        StructField("success_rate_pct", IntegerType(), True),
        StructField("first_flight", StringType(), True),
        StructField("country", StringType(), True),
        StructField("company", StringType(), True)
    ])

def transform_launch_data(launch):
    """Transform launch data to match our schema"""
    # Handle the links field which contains complex nested structures
    links = launch.get("links", {})
    simplified_links = {}
    
    # Convert all link values to strings
    for key, value in links.items():
        if isinstance(value, dict):
            # Handle nested dictionaries (like mission_patch_small)
            simplified_links[key] = json.dumps(value)
        elif value is None:
            simplified_links[key] = ""
        else:
            simplified_links[key] = str(value)
    
    return {
        "flight_number": launch.get("flight_number"),
        "mission_name": launch.get("mission_name"),
        "launch_year": launch.get("launch_year"),
        "launch_date_utc": launch.get("launch_date_utc"),
        "rocket": {
            "rocket_id": launch.get("rocket", {}).get("rocket_id"),
            "rocket_name": None,  # Will be filled in join
            "rocket_type": launch.get("rocket", {}).get("rocket_type")
        },
        "launch_success": launch.get("launch_success"),
        "links": simplified_links,
        "details": launch.get("details", "")
    }

def main():
    # Record total pipeline execution time
    pipeline_start = time.time()
    
    try:
        # Initialize Spark session
        spark = SparkSession.builder.appName("SpaceXDataPipeline").getOrCreate()
        
        # 1. Fetch data from APIs
        launches_url = "https://api.spacexdata.com/v3/launches"
        rockets_url = "https://api.spacexdata.com/v3/rockets"
        
        launches_data = fetch_data(launches_url, "launches")
        rockets_data = fetch_data(rockets_url, "rockets")
        
        # 2. Process and combine data
        log_status("Starting data processing...")
        process_start = time.time()
        
        # Transform data to match schemas
        transformed_launches = [transform_launch_data(launch) for launch in launches_data]
        
        # Create DataFrames with explicit schemas
        launches_df = spark.createDataFrame(transformed_launches, schema=get_launches_schema())
        rockets_df = spark.createDataFrame(rockets_data, schema=get_rockets_schema())
        
        # Select only needed columns from rockets
        rockets_df = rockets_df.select(
            col("rocket_id"), 
            col("rocket_name").alias("rocket_name_from_join"),
            col("rocket_type")
        )
        
        # Join launches with rocket names
        combined_df = launches_df.join(
            rockets_df, 
            launches_df.rocket.rocket_id == rockets_df.rocket_id,
            "left"
        )
        
        # Update the rocket_name in the nested struct
        combined_df = combined_df.withColumn(
            "rocket",
            col("rocket").withField(
                "rocket_name",
                col("rocket_name_from_join")
            )
        ).drop("rocket_name_from_join", "rocket_id")
        
        # Convert DataFrame to JSON properly
        combined_data = [json.loads(row) for row in combined_df.toJSON().collect()]
        
        process_elapsed = time.time() - process_start
        log_status(f"Data processing completed in {process_elapsed:.2f} seconds")
        log_status(f"Combined {len(combined_data)} launch records with rocket names")
        
        # 3. Send data to endpoint
        target_url = "https://httpbin.org/post"
        send_success = send_data(target_url, combined_data)
        
        if send_success:
            log_status("Pipeline completed successfully")
        else:
            log_status("Pipeline completed with errors in sending step")
            
    except Exception as e:
        log_status(f"Pipeline failed with error: {str(e)}")
        raise
    finally:
        pipeline_elapsed = time.time() - pipeline_start
        log_status(f"Total pipeline execution time: {pipeline_elapsed:.2f} seconds")

if __name__ == "__main__":
    main()

try with the 4 

In [0]:
import requests
import json
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, struct, to_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, MapType

def log_status(message):
    """Helper function to log status messages with timestamp"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}")

def fetch_data(url, description):
    """Fetch data from API with error handling"""
    try:
        start_time = time.time()
        log_status(f"Fetching {description} from {url}")
        
        response = requests.get(url)
        response.raise_for_status()
        
        data = response.json()
        elapsed = time.time() - start_time
        log_status(f"Successfully fetched {len(data) if isinstance(data, list) else 1} {description} records in {elapsed:.2f} seconds")
        return data
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error fetching {description}: {str(e)}")
        raise

def send_data(url, data):
    """Send data to endpoint with error handling"""
    try:
        start_time = time.time()
        log_status(f"Sending combined data to {url}")
        
        headers = {'Content-Type': 'application/json'}
        response = requests.post(url, data=json.dumps(data), headers=headers)
        response.raise_for_status()
        
        elapsed = time.time() - start_time
        log_status(f"Data successfully sent in {elapsed:.2f} seconds")
        log_status(f"Response status: {response.status_code}")
        log_status(f"Response content: {response.text[:200]}...")
        return True
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error sending data: {str(e)}")
        if hasattr(e, 'response') and e.response:
            log_status(f"Response content: {e.response.text[:200]}...")
        return False

def get_launches_schema():
    """Define schema for launches data"""
    return StructType([
        StructField("flight_number", IntegerType(), True),
        StructField("mission_name", StringType(), True),
        StructField("launch_year", StringType(), True),
        StructField("launch_date_utc", StringType(), True),
        StructField("rocket", StructType([
            StructField("rocket_id", StringType(), True),
            StructField("rocket_name", StringType(), True),
            StructField("rocket_type", StringType(), True)
        ]), True),
        StructField("launch_success", BooleanType(), True),
        StructField("links", MapType(StringType(), StringType()), True),
        StructField("details", StringType(), True)
    ])

def get_rockets_schema():
    """Define schema for rockets data"""
    return StructType([
        StructField("rocket_id", StringType(), True),
        StructField("rocket_name", StringType(), True),
        StructField("rocket_type", StringType(), True),
        StructField("active", BooleanType(), True),
        StructField("cost_per_launch", IntegerType(), True),
        StructField("success_rate_pct", IntegerType(), True),
        StructField("first_flight", StringType(), True),
        StructField("country", StringType(), True),
        StructField("company", StringType(), True)
    ])

def transform_launch_data(launch):
    """Transform launch data to match our schema"""
    links = launch.get("links", {})
    simplified_links = {}
    
    for key, value in links.items():
        if isinstance(value, dict):
            simplified_links[key] = json.dumps(value)
        elif value is None:
            simplified_links[key] = ""
        else:
            simplified_links[key] = str(value)
    
    return {
        "flight_number": launch.get("flight_number"),
        "mission_name": launch.get("mission_name"),
        "launch_year": launch.get("launch_year"),
        "launch_date_utc": launch.get("launch_date_utc"),
        "rocket": {
            "rocket_id": launch.get("rocket", {}).get("rocket_id"),
            "rocket_name": None,
            "rocket_type": launch.get("rocket", {}).get("rocket_type")
        },
        "launch_success": launch.get("launch_success"),
        "links": simplified_links,
        "details": launch.get("details", "")
    }

def convert_row_to_dict(row):
    """Convert Spark Row to Python dict"""
    def convert_value(value):
        if hasattr(value, '__dict__') or isinstance(value, dict):
            return dict(value)
        elif isinstance(value, list):
            return [convert_value(v) for v in value]
        return value
    
    return {field.name: convert_value(getattr(row, field.name)) for field in row.__fields__}

def main():
    pipeline_start = time.time()
    
    try:
        spark = SparkSession.builder.appName("SpaceXDataPipeline").getOrCreate()
        
        # 1. Fetch data
        launches_data = fetch_data("https://api.spacexdata.com/v3/launches", "launches")
        rockets_data = fetch_data("https://api.spacexdata.com/v3/rockets", "rockets")
        
        # 2. Process data
        log_status("Starting data processing...")
        process_start = time.time()
        
        # Transform and create DataFrames
        transformed_launches = [transform_launch_data(launch) for launch in launches_data]
        launches_df = spark.createDataFrame(transformed_launches, schema=get_launches_schema())
        rockets_df = spark.createDataFrame(rockets_data, schema=get_rockets_schema())
        
        # Join data
        rockets_df = rockets_df.select(
            col("rocket_id"), 
            col("rocket_name").alias("rocket_name_from_join")
        )
        
        combined_df = launches_df.join(
            rockets_df, 
            launches_df.rocket.rocket_id == rockets_df.rocket_id,
            "left"
        )
        
        # Update nested field
        combined_df = combined_df.withColumn(
            "rocket",
            col("rocket").withField(
                "rocket_name",
                col("rocket_name_from_join")
            )
        ).drop("rocket_name_from_join", "rocket_id")
        
        # Convert to Python dicts
        combined_data = [convert_row_to_dict(row) for row in combined_df.collect()]
        
        process_elapsed = time.time() - process_start
        log_status(f"Data processing completed in {process_elapsed:.2f} seconds")
        log_status(f"Combined {len(combined_data)} launch records with rocket names")
        
        # 3. Send data
        send_success = send_data("https://httpbin.org/post", combined_data)
        
        if send_success:
            log_status("Pipeline completed successfully")
        else:
            log_status("Pipeline completed with errors in sending step")
            
    except Exception as e:
        log_status(f"Pipeline failed with error: {str(e)}")
        raise
    finally:
        pipeline_elapsed = time.time() - pipeline_start
        log_status(f"Total pipeline execution time: {pipeline_elapsed:.2f} seconds")

if __name__ == "__main__":
    main()

try another


In [0]:
import requests
import json
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, MapType

def log_status(message):
    """Helper function to log status messages with timestamp"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}")

def fetch_data(url, description):
    """Fetch data from API with error handling"""
    try:
        start_time = time.time()
        log_status(f"Fetching {description} from {url}")
        
        response = requests.get(url)
        response.raise_for_status()
        
        data = response.json()
        elapsed = time.time() - start_time
        log_status(f"Successfully fetched {len(data) if isinstance(data, list) else 1} {description} records in {elapsed:.2f} seconds")
        return data
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error fetching {description}: {str(e)}")
        raise

def send_data(url, data):
    """Send data to endpoint with error handling"""
    try:
        start_time = time.time()
        log_status(f"Sending combined data to {url}")
        
        headers = {'Content-Type': 'application/json'}
        response = requests.post(url, data=json.dumps(data), headers=headers)
        response.raise_for_status()
        
        elapsed = time.time() - start_time
        log_status(f"Data successfully sent in {elapsed:.2f} seconds")
        log_status(f"Response status: {response.status_code}")
        log_status(f"Response content: {response.text[:200]}...")
        return True
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error sending data: {str(e)}")
        if hasattr(e, 'response') and e.response:
            log_status(f"Response content: {e.response.text[:200]}...")
        return False

def get_launches_schema():
    """Define schema for launches data"""
    return StructType([
        StructField("flight_number", IntegerType(), True),
        StructField("mission_name", StringType(), True),
        StructField("launch_year", StringType(), True),
        StructField("launch_date_utc", StringType(), True),
        StructField("rocket", StructType([
            StructField("rocket_id", StringType(), True),
            StructField("rocket_name", StringType(), True),
            StructField("rocket_type", StringType(), True)
        ]), True),
        StructField("launch_success", BooleanType(), True),
        StructField("links", MapType(StringType(), StringType()), True),
        StructField("details", StringType(), True)
    ])

def get_rockets_schema():
    """Define schema for rockets data"""
    return StructType([
        StructField("rocket_id", StringType(), True),
        StructField("rocket_name", StringType(), True),
        StructField("rocket_type", StringType(), True),
        StructField("active", BooleanType(), True),
        StructField("cost_per_launch", IntegerType(), True),
        StructField("success_rate_pct", IntegerType(), True),
        StructField("first_flight", StringType(), True),
        StructField("country", StringType(), True),
        StructField("company", StringType(), True)
    ])

def transform_launch_data(launch):
    """Transform launch data to match our schema"""
    links = launch.get("links", {})
    simplified_links = {}
    
    for key, value in links.items():
        if isinstance(value, dict):
            simplified_links[key] = json.dumps(value)
        elif value is None:
            simplified_links[key] = ""
        else:
            simplified_links[key] = str(value)
    
    return {
        "flight_number": launch.get("flight_number"),
        "mission_name": launch.get("mission_name"),
        "launch_year": launch.get("launch_year"),
        "launch_date_utc": launch.get("launch_date_utc"),
        "rocket": {
            "rocket_id": launch.get("rocket", {}).get("rocket_id"),
            "rocket_name": None,
            "rocket_type": launch.get("rocket", {}).get("rocket_type")
        },
        "launch_success": launch.get("launch_success"),
        "links": simplified_links,
        "details": launch.get("details", "")
    }

def row_to_dict(row):
    """Convert Spark Row to Python dictionary"""
    if hasattr(row, "__fields__"):
        # For newer Spark versions
        return {field.name: row_to_dict(getattr(row, field.name)) for field in row.__fields__}
    elif isinstance(row, dict):
        return {k: row_to_dict(v) for k, v in row.items()}
    elif isinstance(row, (list, tuple)):
        return [row_to_dict(v) for v in row]
    else:
        return row

def main():
    pipeline_start = time.time()
    
    try:
        spark = SparkSession.builder.appName("SpaceXDataPipeline").getOrCreate()
        
        # 1. Fetch data
        launches_data = fetch_data("https://api.spacexdata.com/v3/launches", "launches")
        rockets_data = fetch_data("https://api.spacexdata.com/v3/rockets", "rockets")
        
        # 2. Process data
        log_status("Starting data processing...")
        process_start = time.time()
        
        # Transform and create DataFrames
        transformed_launches = [transform_launch_data(launch) for launch in launches_data]
        launches_df = spark.createDataFrame(transformed_launches, schema=get_launches_schema())
        rockets_df = spark.createDataFrame(rockets_data, schema=get_rockets_schema())
        
        # Join data
        rockets_df = rockets_df.select(
            col("rocket_id"), 
            col("rocket_name").alias("rocket_name_from_join")
        )
        
        combined_df = launches_df.join(
            rockets_df, 
            launches_df.rocket.rocket_id == rockets_df.rocket_id,
            "left"
        )
        
        # Update nested field
        combined_df = combined_df.withColumn(
            "rocket",
            col("rocket").withField(
                "rocket_name",
                col("rocket_name_from_join")
            )
        ).drop("rocket_name_from_join", "rocket_id")
        
        # Convert to Python dicts
        combined_data = [row_to_dict(row) for row in combined_df.collect()]
        
        process_elapsed = time.time() - process_start
        log_status(f"Data processing completed in {process_elapsed:.2f} seconds")
        log_status(f"Combined {len(combined_data)} launch records with rocket names")
        
        # 3. Send data
        send_success = send_data("https://httpbin.org/post", combined_data)
        
        if send_success:
            log_status("Pipeline completed successfully")
        else:
            log_status("Pipeline completed with errors in sending step")
            
    except Exception as e:
        log_status(f"Pipeline failed with error: {str(e)}")
        raise
    finally:
        pipeline_elapsed = time.time() - pipeline_start
        log_status(f"Total pipeline execution time: {pipeline_elapsed:.2f} seconds")

if __name__ == "__main__":
    main()

try again

In [0]:
import requests
import json
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, MapType

def log_status(message):
    """Helper function to log status messages with timestamp"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] {message}")

def fetch_data(url, description):
    """Fetch data from API with error handling"""
    try:
        start_time = time.time()
        log_status(f"Fetching {description} from {url}")
        
        response = requests.get(url)
        response.raise_for_status()
        
        data = response.json()
        elapsed = time.time() - start_time
        log_status(f"Successfully fetched {len(data) if isinstance(data, list) else 1} {description} records in {elapsed:.2f} seconds")
        return data
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error fetching {description}: {str(e)}")
        raise

def send_data(url, data):
    """Send data to endpoint with error handling"""
    try:
        start_time = time.time()
        log_status(f"Sending combined data to {url}")
        
        headers = {'Content-Type': 'application/json'}
        response = requests.post(url, data=json.dumps(data), headers=headers)
        response.raise_for_status()
        
        elapsed = time.time() - start_time
        log_status(f"Data successfully sent in {elapsed:.2f} seconds")
        log_status(f"Response status: {response.status_code}")
        log_status(f"Response content: {response.text[:200]}...")
        return True
    
    except requests.exceptions.RequestException as e:
        log_status(f"Error sending data: {str(e)}")
        if hasattr(e, 'response') and e.response:
            log_status(f"Response content: {e.response.text[:200]}...")
        return False

def get_launches_schema():
    """Define schema for launches data"""
    return StructType([
        StructField("flight_number", IntegerType(), True),
        StructField("mission_name", StringType(), True),
        StructField("launch_year", StringType(), True),
        StructField("launch_date_utc", StringType(), True),
        StructField("rocket", StructType([
            StructField("rocket_id", StringType(), True),
            StructField("rocket_name", StringType(), True),
            StructField("rocket_type", StringType(), True)
        ]), True),
        StructField("launch_success", BooleanType(), True),
        StructField("links", MapType(StringType(), StringType()), True),
        StructField("details", StringType(), True)
    ])

def get_rockets_schema():
    """Define schema for rockets data"""
    return StructType([
        StructField("rocket_id", StringType(), True),
        StructField("rocket_name", StringType(), True),
        StructField("rocket_type", StringType(), True),
        StructField("active", BooleanType(), True),
        StructField("cost_per_launch", IntegerType(), True),
        StructField("success_rate_pct", IntegerType(), True),
        StructField("first_flight", StringType(), True),
        StructField("country", StringType(), True),
        StructField("company", StringType(), True)
    ])

def transform_launch_data(launch):
    """Transform launch data to match our schema"""
    links = launch.get("links", {})
    simplified_links = {}
    
    for key, value in links.items():
        if isinstance(value, dict):
            simplified_links[key] = json.dumps(value)
        elif value is None:
            simplified_links[key] = ""
        else:
            simplified_links[key] = str(value)
    
    return {
        "flight_number": launch.get("flight_number"),
        "mission_name": launch.get("mission_name"),
        "launch_year": launch.get("launch_year"),
        "launch_date_utc": launch.get("launch_date_utc"),
        "rocket": {
            "rocket_id": launch.get("rocket", {}).get("rocket_id"),
            "rocket_name": None,
            "rocket_type": launch.get("rocket", {}).get("rocket_type")
        },
        "launch_success": launch.get("launch_success"),
        "links": simplified_links,
        "details": launch.get("details", "")
    }

def row_to_dict(row):
    """Convert Spark Row to Python dictionary - Universal version"""
    if row is None:
        return None
    elif isinstance(row, (str, int, float, bool)):
        return row
    elif hasattr(row, "asDict"):  # For standard Spark Rows
        return {k: row_to_dict(v) for k, v in row.asDict().items()}
    elif isinstance(row, dict):
        return {k: row_to_dict(v) for k, v in row.items()}
    elif isinstance(row, (list, tuple)):
        return [row_to_dict(v) for v in row]
    else:
        return str(row)  # Fallback for any other type

def main():
    pipeline_start = time.time()
    
    try:
        spark = SparkSession.builder.appName("SpaceXDataPipeline").getOrCreate()
        
        # 1. Fetch data
        launches_data = fetch_data("https://api.spacexdata.com/v3/launches", "launches")
        rockets_data = fetch_data("https://api.spacexdata.com/v3/rockets", "rockets")
        
        # 2. Process data
        log_status("Starting data processing...")
        process_start = time.time()
        
        # Transform and create DataFrames
        transformed_launches = [transform_launch_data(launch) for launch in launches_data]
        launches_df = spark.createDataFrame(transformed_launches, schema=get_launches_schema())
        rockets_df = spark.createDataFrame(rockets_data, schema=get_rockets_schema())
        
        # Join data
        rockets_df = rockets_df.select(
            col("rocket_id"), 
            col("rocket_name").alias("rocket_name_from_join")
        )
        
        combined_df = launches_df.join(
            rockets_df, 
            launches_df.rocket.rocket_id == rockets_df.rocket_id,
            "left"
        )
        
        # Update nested field
        combined_df = combined_df.withColumn(
            "rocket",
            col("rocket").withField(
                "rocket_name",
                col("rocket_name_from_join")
            )
        ).drop("rocket_name_from_join", "rocket_id")
        
        # Convert to Python dicts using our universal converter
        combined_data = [row_to_dict(row) for row in combined_df.collect()]
        
        process_elapsed = time.time() - process_start
        log_status(f"Data processing completed in {process_elapsed:.2f} seconds")
        log_status(f"Combined {len(combined_data)} launch records with rocket names")
        
        # 3. Send data
        send_success = send_data("https://httpbin.org/post", combined_data)
        
        if send_success:
            log_status("Pipeline completed successfully")
        else:
            log_status("Pipeline completed with errors in sending step")
            
    except Exception as e:
        log_status(f"Pipeline failed with error: {str(e)}")
        raise
    finally:
        pipeline_elapsed = time.time() - pipeline_start
        log_status(f"Total pipeline execution time: {pipeline_elapsed:.2f} seconds")

if __name__ == "__main__":
    main()