In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
import logging
from datetime import datetime
import os

# Constants for configuration
LOG_FILE_PATH = '/kaggle/working/data_loading.log'
INPUT_FILE_PATH = '/kaggle/input/all-amazon-review/All_Amazon_Review.json'
OUTPUT_FILE_PATH = '/kaggle/working/processed_data.parquet'
SPARK_APP_NAME = "Amazon Review Data Loading"
SPARK_MEMORY_CONFIG = {
    "spark.driver.memory": "8g",
    "spark.executor.memory": "8g",
    "spark.sql.shuffle.partitions": "200",
    "spark.memory.fraction": "0.8"
}

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

if os.access('/kaggle/working/', os.W_OK):
    file_handler = logging.FileHandler(LOG_FILE_PATH, mode='w')
    file_handler.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    print(f"Logging configured successfully. Log file: {LOG_FILE_PATH}")

# Stop any existing Spark session
try:
    spark = SparkSession.builder.getOrCreate()
    spark.stop()
    logging.info("Stopped existing Spark session.")
    print("Stopped existing Spark session.")
except Exception as e:
    logging.warning(f"No active Spark session found or error stopping session: {e}")
    print(f"No active Spark session found or error stopping session: {e}")

# Initialize Spark Session
try:
    spark_builder = SparkSession.builder.appName(SPARK_APP_NAME)
    
    # Set Spark configurations
    for key, value in SPARK_MEMORY_CONFIG.items():
        spark_builder.config(key, value)
    
    # Enable GPU if available
    spark_builder.config("spark.executor.resource.gpu.amount", "1")
    spark_builder.config("spark.task.resource.gpu.amount", "1")
    spark_builder.config("spark.executor.resource.gpu.discoveryScript", "/opt/sparkRapidsPlugin/getGpusResources.sh")
    
    spark = spark_builder.getOrCreate()
    logging.info("Spark session started successfully.")
    print("Spark session started successfully.")

    # Define schema for optimization
    schema = StructType([
        StructField("reviewerID", StringType(), True),
        StructField("asin", StringType(), True),
        StructField("reviewText", StringType(), True),
        StructField("overall", IntegerType(), True),
        StructField("unixReviewTime", LongType(), True)
    ])

    # Start data loading
    start_time = datetime.now()
    logging.info("Data loading started.")
    print("Data loading started.")

    # Read gzipped JSON file with schema
    df = spark.read.json(INPUT_FILE_PATH, schema=schema, mode="PERMISSIVE")
    logging.info("Data read successfully.")
    print("Data read successfully.")

    # Optimize partitions for memory
    df = df.coalesce(50)  # Reduce partitions to avoid memory overload
    logging.info("Data repartitioned successfully.")
    print("Data repartitioned successfully.")

    # Display only a few records to avoid crashing
    logging.info("Displaying sample data...")
    print("Displaying sample data...")
    df.limit(10).show(truncate=False)  # Reduce to 10 rows

    # Approximate record count without full-table scan
    logging.info("Displaying sample record count...")
    print(f"Sample records loaded: {df.take(10)}")

    # Check for missing values efficiently
    from pyspark.sql.functions import col, count, when
    logging.info("Checking for missing values...")
    print("Checking for missing values...")
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
    null_counts.show()

    # Save processed data in Parquet format (overwrite mode)
    logging.info("Saving processed data as Parquet...")
    print("Saving processed data as Parquet...")
    df.write.mode("overwrite").parquet(OUTPUT_FILE_PATH)
    logging.info("Data saved successfully.")
    print("Data saved successfully.")

    end_time = datetime.now()
    logging.info(f"Data loading completed in {end_time - start_time}.")
    print(f"Data loading completed in {end_time - start_time}.")

except Exception as e:
    logging.error(f"Error during data loading: {e}")
    print(f"Error occurred: {e}")

finally:
    if 'spark' in locals():
        spark.stop()
        logging.info("Spark session stopped.")
        print("Spark session stopped.")
    # Ensure logs are flushed
    for handler in logger.handlers:
        handler.flush()
        handler.close()
    logger.removeHandler(file_handler)


Logging configured successfully. Log file: /kaggle/working/data_loading.log
Stopped existing Spark session.
Spark session started successfully.
Data loading started.
Data read successfully.
Data repartitioned successfully.
Displaying sample data...
