In [None]:
# Databricks notebook source
# MAGIC %md
# MAGIC ## Apify Actors Data Pipeline - PySpark Implementation
# MAGIC Production-ready implementation with error handling, logging, and schema validation

# COMMAND ----------

# Configuration and imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, get_json_object, to_timestamp, 
    unix_timestamp, round as spark_round, 
    concat, lpad, lit, collect_list, array_distinct,
    udf, struct, from_json
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    DoubleType, TimestampType, LongType, MapType,
    ArrayType
)
import requests
import logging
from typing import Dict, List, Optional
from datetime import datetime
import json
from functools import wraps

# COMMAND ----------

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# COMMAND ----------

# Access secret from Databricks secrets
# In production, use: dbutils.secrets.get(scope="your-scope", key="apify-api-key")
# apify_secret = dbutils.secrets.get(scope="api-keys", key="apify-api-key") if 'dbutils' in dir() else ''
apify_secret = apify_secret
# COMMAND ----------

class ApifyActorsPySpark:
    """
    PySpark implementation of Apify Actors data pipeline.
    Optimized for distributed processing and Databricks runtime.
    """
    
    # Define schemas for data validation
    ACTORS_SCHEMA = StructType([
        StructField("id", StringType(), True),
        StructField("userId", StringType(), True),
        StructField("name", StringType(), True),
        StructField("username", StringType(), True),
        StructField("title", StringType(), True),


        StructField("raw_stats", MapType(StringType(), StringType()), True),

        StructField("total_runs", IntegerType(), True),

        StructField("last_run_started_at", TimestampType(), True),       
        StructField("createdAt", TimestampType(), True),
        StructField("modifiedAt", TimestampType(), True)
    ])
    
    RUNS_SCHEMA = StructType([
        StructField("id", StringType(), False),
        StructField("userId", StringType(), False),
        StructField("actId", StringType(), True),
        StructField("actorTaskId", StringType(), True),
        StructField("status", StringType(), True),
        StructField("buildId", StringType(), True),
        StructField("buildNumber", StringType(), True),\
        StructField("defaultDatasetId", StringType(), True),\
        StructField("defaultRequestQueueId", StringType(), True),\                

        StructField("buildNumberInt", StringType(), True),

        StructField("meta", MapType(StringType(), StringType()), True),

        StructField("startedAt", StringType(), True),
        StructField("finishedAt", StringType(), True)

        StructField("usageTotalUsd", DoubleType(), True),
    ])
    
    def __init__(self, spark: SparkSession = None):
        """
        Initialize ApifyActorsPySpark with Spark session and configuration.
        
        Args:
            spark: SparkSession instance (uses global spark if not provided)
        """
        self.spark = spark or globals().get('spark')
        if not self.spark:
            raise ValueError("No Spark session found. Please provide spark session or ensure it's available globally")
        
        self.apify_secret = apify_secret
        self.logger = logging.getLogger(self.__class__.__name__)
        self.base_url = "https://api.apify.com/v2"
        
        # Validate API key
        if not self.apify_secret:
            self.logger.error("Apify API key not configured")
            raise ValueError("Apify API key is required")
        
        self.logger.info("ApifyActorsPySpark initialized successfully")
    
    def _make_api_request(self, endpoint: str, max_retries: int = 3) -> Optional[Dict]:
        """
        Make API request with retry logic and error handling.
        
        Args:
            endpoint: API endpoint to call
            max_retries: Maximum number of retry attempts
            
        Returns:
            Response data as dictionary or None if failed
        """
        url = f"{self.base_url}/{endpoint}"
        headers = {
            'Accept': 'application/json',
            'Authorization': f'Bearer {self.apify_secret}'
        }
        
        for attempt in range(max_retries):
            try:
                response = requests.get(url, headers=headers, timeout=30)
                response.raise_for_status()
                return response.json()
            except requests.exceptions.RequestException as e:
                self.logger.warning(f"API request failed (attempt {attempt + 1}/{max_retries}): {str(e)}")
                if attempt == max_retries - 1:
                    self.logger.error(f"Failed to fetch data from {endpoint} after {max_retries} attempts")
                    raise
        return None
    
    def get_actors(self):
        """
        Fetch and process actors data using PySpark.
        
        Original pandas implementation:
        # actors = pd.DataFrame(items)
        # actors['total_runs'] = actors['stats'].apply(lambda x: x.get('totalRuns') if isinstance(x, dict) else None)
        # actors['last_run_started_at'] = actors['stats'].apply(lambda x: x.get('lastRunStartedAt') if isinstance(x, dict) else None)
        
        Returns:
            PySpark DataFrame with actors data
        """
        try:
            self.logger.info("Fetching actors data...")
            
            # Fetch data from API
            response_data = self._make_api_request("acts")
            items = response_data.get("data", {}).get("items", [])
            
            if not items:
                self.logger.warning("No actors data returned from API")
                # Return empty DataFrame with schema
                return self.spark.createDataFrame([], self.ACTORS_SCHEMA)
            
            # Create PySpark DataFrame with schema validation
            actors_df = self.spark.createDataFrame(items, self.ACTORS_SCHEMA)
            
            # Cache DataFrame since we'll perform multiple operations
            actors_df = actors_df.cache()
            
            # Extract nested stats fields using Spark SQL functions
            # This replaces pandas apply() with native Spark operations
            actors_df = actors_df \
                .withColumn("total_runs", 
                    when(col("stats").isNotNull(), 
                         col("stats").getItem("totalRuns").cast(IntegerType()))
                    .otherwise(None)) \
                .withColumn("last_run_started_at",
                    when(col("stats").isNotNull(),
                         col("stats").getItem("lastRunStartedAt"))
                    .otherwise(None)) \
                .withColumnRenamed("stats", "raw_stats")
            
            self.logger.info(f"Successfully processed {actors_df.count()} actors")
            
            return actors_df
            
        except Exception as e:
            self.logger.error(f"Error fetching actors: {str(e)}")
            raise
    
    def get_actor_runs(self):
        """
        Fetch actor runs data.
        
        Original pandas implementation:
        # return pd.DataFrame(response.json().get('data', {}).get('items', []))
        
        Returns:
            PySpark DataFrame with actor runs
        """
        try:
            self.logger.info("Fetching actor runs data...")
            
            response_data = self._make_api_request("actor-runs")
            items = response_data.get('data', {}).get('items', [])
            
            if not items:
                self.logger.warning("No actor runs data returned from API")
                return self.spark.createDataFrame([], self.RUNS_SCHEMA)
            
            # Create DataFrame with schema validation
            runs_df = self.spark.createDataFrame(items, self.RUNS_SCHEMA)
            
            self.logger.info(f"Successfully fetched {runs_df.count()} actor runs")
            
            return runs_df
            
        except Exception as e:
            self.logger.error(f"Error fetching actor runs: {str(e)}")
            raise
    
    def get_unique_actor_runs(self):
        """
        Get unique actor run IDs.
        
        Original pandas implementation:
        # return self.get_actor_runs()['id'].unique().tolist()
        
        Returns:
            List of unique run IDs
        """
        try:
            runs_df = self.get_actor_runs()
            
            # Use Spark's distinct() instead of pandas unique()
            unique_ids = runs_df.select("id").distinct().collect()
            
            # Convert to list (collect returns Row objects)
            return [row.id for row in unique_ids]
            
        except Exception as e:
            self.logger.error(f"Error getting unique actor runs: {str(e)}")
            raise
    
    def get_run(self):
        """
        Fetch detailed run information for all unique runs.
        
        Original pandas implementation:
        # runs['startedAt'] = pd.to_datetime(runs['startedAt'])
        # runs['time_to_load_in_sec'] = (runs['finishedAt'] - runs['startedAt']).dt.total_seconds()
        # runs['pages_scraped'] = runs['usage'].apply(lambda x: x.get('DATASET_WRITES') if isinstance(x, dict) else None)
        
        Returns:
            PySpark DataFrame with enriched run details
        """
        try:
            self.logger.info("Fetching detailed run information...")
            
            runs_list = []
            unique_runs = self.get_unique_actor_runs()
            
            if not unique_runs:
                self.logger.warning("No unique runs found")
                return self.spark.createDataFrame([], self.RUNS_SCHEMA)
            
            # Fetch details for each run (consider batching for large datasets)
            for run_id in unique_runs:
                try:
                    response_data = self._make_api_request(f"actor-runs/{run_id}")
                    data = response_data.get('data', {})
                    if data:
                        runs_list.append(data)
                except Exception as e:
                    self.logger.warning(f"Failed to fetch run {run_id}: {str(e)}")
                    continue
            
            if not runs_list:
                self.logger.warning("No run details fetched")
                return self.spark.createDataFrame([], self.RUNS_SCHEMA)
            
            # Create DataFrame
            runs_df = self.spark.createDataFrame(runs_list)
            
            # Cache for multiple operations
            runs_df = runs_df.cache()
            
            # Convert timestamps using PySpark functions
            runs_df = runs_df \
                .withColumn("startedAt", to_timestamp(col("startedAt"))) \
                .withColumn("finishedAt", to_timestamp(col("finishedAt")))
            
            # Calculate time metrics using Spark SQL functions
            runs_df = runs_df \
                .withColumn("time_to_load_in_sec", 
                    unix_timestamp(col("finishedAt")) - unix_timestamp(col("startedAt"))) \
                .withColumn("time_to_load_in_min", 
                    col("time_to_load_in_sec") / 60)
            
            # Extract pages_scraped from usage map
            runs_df = runs_df \
                .withColumn("pages_scraped",
                    when(col("usage").isNotNull(),
                         col("usage").getItem("DATASET_WRITES").cast(IntegerType()))
                    .otherwise(None))
            
            # Calculate pages per second
            runs_df = runs_df \
                .withColumn("pages_scraped_per_sec",
                    when(col("time_to_load_in_sec") > 0,
                         spark_round(col("pages_scraped") / col("time_to_load_in_sec"), 2))
                    .otherwise(None))
            
            # Create duration formatted string using PySpark functions
            # This is more complex than UDF but more performant
            runs_df = runs_df \
                .withColumn("total_seconds", (col("time_to_load_in_min") * 60).cast(IntegerType())) \
                .withColumn("hours", (col("total_seconds") / 3600).cast(IntegerType())) \
                .withColumn("remaining_seconds", col("total_seconds") % 3600) \
                .withColumn("mins", (col("remaining_seconds") / 60).cast(IntegerType())) \
                .withColumn("secs", (col("remaining_seconds") % 60).cast(IntegerType())) \
                .withColumn("duration",
                    when(col("time_to_load_in_min").isNotNull(),
                         concat(
                             lpad(col("hours"), 2, "0"), lit(":"),
                             lpad(col("mins"), 2, "0"), lit(":"),
                             lpad(col("secs"), 2, "0")
                         ))
                    .otherwise(None)) \
                .drop("total_seconds", "hours", "remaining_seconds", "mins", "secs")
            
            # Format timestamps as strings
            runs_df = runs_df \
                .withColumn("startedAt_str", 
                    date_format(col("startedAt"), "yyyy-MM-dd HH:mm:ss")) \
                .withColumn("finishedAt_str", 
                    date_format(col("finishedAt"), "yyyy-MM-dd HH:mm:ss"))
            
            self.logger.info(f"Successfully processed {runs_df.count()} run details")
            
            # Optimize partitioning for downstream operations
            runs_df = runs_df.repartition(4, "actId")
            
            return runs_df
            
        except Exception as e:
            self.logger.error(f"Error in get_run: {str(e)}")
            raise
    
    def save_to_delta(self, df, table_name: str, mode: str = "overwrite"):
        """
        Save DataFrame to Delta Lake table.
        
        Args:
            df: PySpark DataFrame to save
            table_name: Target Delta table name
            mode: Write mode (overwrite, append, etc.)
        """
        try:
            df.write \
                .mode(mode) \
                .option("overwriteSchema", "true") \
                .saveAsTable(f"apify.{table_name}")
            
            self.logger.info(f"Successfully saved data to apify.{table_name}")
            
        except Exception as e:
            self.logger.error(f"Error saving to Delta table: {str(e)}")
            raise

# COMMAND ----------

# Initialize the PySpark implementation
api = ApifyActorsPySpark(spark)

# COMMAND ----------

# Example usage with error handling
try:
    # Get runs data
    runs_df = api.get_run()
    display(runs_df.select("id", "actId", "startedAt_str", "finishedAt_str", 
                           "duration", "pages_scraped", "pages_scraped_per_sec"))
    
    # Optionally save to Delta Lake
    # api.save_to_delta(runs_df, "actor_runs")
    
except Exception as e:
    logging.error(f"Pipeline failed: {str(e)}")
    raise

# COMMAND ----------

# Get actor runs
try:
    actor_runs_df = api.get_actor_runs()
    display(actor_runs_df)
except Exception as e:
    logging.error(f"Failed to get actor runs: {str(e)}")

# COMMAND ----------

# Get actors
try:
    actors_df = api.get_actors()
    display(actors_df)
except Exception as e:
    logging.error(f"Failed to get actors: {str(e)}")