In [2]:
from pyspark.sql import SparkSession
import os

json_path = 'Files/properties/data/*.json'

# Read all JSONs, capturing corrupt records
df = spark.read.option("badRecordsPath", "/tmp/badRecords").option("mode", "PERMISSIVE").json(json_path)

# Show corrupt records if any
corrupt_df = df.filter(df['_corrupt_record'].isNotNull())
if corrupt_df.count() > 0:
    print("Corrupt files detected. Sample corrupt records:")
    corrupt_df.show(truncate=False)
else:
    print("No corrupt files found.")

# Filter out corrupt records and get columns from valid JSONs
valid_df = df.filter(df['_corrupt_record'].isNull()).drop('_corrupt_record')
print("Columns from valid JSON files:")
for col in valid_df.columns:
    print(col)

# Optionally, print schema for nested fields
valid_df.printSchema()

StatementMeta(, 4cf66bc3-7a37-4924-a620-ae370eb84dd4, 4, Finished, Available, Finished)

AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().