In [None]:
import os
import time
import json
import findspark
import shutil
import glob

# -----------------------------
# Config (paths relative to root)
# -----------------------------
INPUT_FILE = "project/data/unstructured/unstructured_students_data.jsonl"
OUTPUT_DIR = "project/data/bronze_output"
OUTPUT_FILE_FINAL = os.path.join(OUTPUT_DIR, "bronze_students.jsonl")

# -----------------------------
# Delete output directory first (if exists)
# -----------------------------
if os.path.exists(OUTPUT_DIR):
    shutil.rmtree(OUTPUT_DIR)

# -----------------------------
# Initialize Spark
# -----------------------------
findspark.init()

try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    from pyspark.sql.types import *
    pyspark_available = True
except ImportError:
    print("PySpark not available. Install with: pip install pyspark")
    pyspark_available = False

if pyspark_available:
    spark = SparkSession.builder \
        .appName("Student Dataset Bronze Layer") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("WARN")
    print("Spark session initialized successfully!")
    print(f"Spark version: {spark.version}")
else:
    raise SystemExit("PySpark not available, exiting.")

# -----------------------------
# Read raw JSONL as RDD
# -----------------------------
print(f"Reading from: {INPUT_FILE}")
raw_rdd = spark.sparkContext.textFile(INPUT_FILE)
print(f"Raw RDD partition count: {raw_rdd.getNumPartitions()}")

# -----------------------------
# Parsing function - PURE RDD APPROACH
# -----------------------------
def create_bronze_record(line):
    """Process each line and return JSON string directly - no DataFrame conversion issues"""
    try:
        data = json.loads(line)
        data['_ingestion_timestamp'] = time.time()
        data['_source'] = 'jsonl_file'
        data['_status'] = 'valid'
        return json.dumps(data)
    except Exception as e:
        error_record = {
            '_raw_data': line,
            '_ingestion_timestamp': time.time(),
            '_source': 'jsonl_file',
            '_status': 'parse_error',
            '_error_message': str(e)
        }
        return json.dumps(error_record)

# -----------------------------
# Apply parsing and get quality metrics
# -----------------------------
# Process the RDD to get JSON strings
bronze_json_rdd = raw_rdd.map(create_bronze_record)

# Count total records
total_records = raw_rdd.count()

# Count valid vs error records by checking the JSON content
def count_records_status(json_str):
    """Helper function to count valid vs error records"""
    record = json.loads(json_str)
    return (1, 0) if record.get('_status') == 'valid' else (0, 1)

status_counts = bronze_json_rdd.map(count_records_status).reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))
valid_records, error_records = status_counts

print("\n=== Bronze Layer Data Quality ===")
print(f"Total records: {total_records}")
print(f"Valid records: {valid_records}")
print(f"Parse errors: {error_records}")
print(f"Success rate: {(valid_records / total_records) * 100:.1f}%")

# Show some examples of parse errors
if error_records > 0:
    print("\n=== Parse Error Examples ===")
    # Get some error records to show
    error_samples = bronze_json_rdd.filter(lambda json_str: json.loads(json_str).get('_status') == 'parse_error').take(3)
    for i, error_json in enumerate(error_samples):
        error_data = json.loads(error_json)
        print(f"Error example {i}:")
        print(f"  Raw data: {error_data.get('_raw_data', '')[:100]}...")  # Show first 100 chars
        print(f"  Error: {error_data.get('_error_message', '')}")
        print()

# -----------------------------
# Save Bronze layer as JSONL
# -----------------------------
print("Saving bronze layer...")
bronze_json_rdd.coalesce(1).saveAsTextFile(OUTPUT_DIR)

# Rename part file to final filename
part_files = glob.glob(os.path.join(OUTPUT_DIR, 'part-*'))
if part_files:
    part_file = part_files[0]
    os.rename(part_file, OUTPUT_FILE_FINAL)
    print(f"✓ Renamed {part_file} to {OUTPUT_FILE_FINAL}")
    
    # Clean up Spark output directory
    success_file = os.path.join(OUTPUT_DIR, '_SUCCESS')
    if os.path.exists(success_file):
        os.remove(success_file)
    if os.path.exists(OUTPUT_DIR) and not os.listdir(OUTPUT_DIR):
        os.rmdir(OUTPUT_DIR)
    
    print(f"✓ Bronze layer saved to: {OUTPUT_FILE_FINAL}")
    
    # Verify output
    if os.path.exists(OUTPUT_FILE_FINAL):
        file_size = os.path.getsize(OUTPUT_FILE_FINAL)
        line_count = sum(1 for _ in open(OUTPUT_FILE_FINAL))
        print(f"✓ Output file: {file_size} bytes, {line_count} lines")
        
        # Show first few lines
        print("\n=== First 3 lines of output ===")
        with open(OUTPUT_FILE_FINAL, 'r') as f:
            for i, line in enumerate(f):
                if i < 3:
                    # Parse and pretty print for readability
                    try:
                        record = json.loads(line.strip())
                        print(f"Line {i}: {json.dumps(record, indent=2)[:200]}...")
                    except:
                        print(f"Line {i}: {line.strip()[:100]}...")
                else:
                    break
else:
    print("❌ No output files were created!")

# -----------------------------
# Stop Spark
# -----------------------------
spark.stop()
print("Spark session stopped.")