In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Initialize Spark session
spark = SparkSession.Builder() \
    .appName("PermissiveSchemaWithCorruptColumn") \
    .getOrCreate()

# Define a schema with a special column for corrupt records
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("_corrupt_record", StringType(), True)  # Special column for corrupt records
])

# Read CSV file with the schema applied
df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .option("mode", "PERMISSIVE") \
    .load("./test/data/all.csv")

# Isolate the bad data into a separate dataframe
bad_df = df.filter(df["_corrupt_record"].isNotNull())
bad_df = bad_df.select("_corrupt_record").cache()  # Select only the corrupt record column
bad_df.show()
bad_df.write.format("text").option("header", "true").mode("overwrite").save("./test/data/bad")

# Use exceptAll to avoid duplicates when adding new bad records
good_df = df.filter(df["_corrupt_record"].isNull())
good_df.show()
good_df.write.format("csv").option("header", "true").mode("overwrite").save("./test/data/good")

25/04/19 22:03:36 WARN CacheManager: Asked to cache already cached data.


+---------------+
|_corrupt_record|
+---------------+
|   2,Janee,30.0|
+---------------+

+---+-----+---+---------------+
| id| name|age|_corrupt_record|
+---+-----+---+---------------+
|  1| John| 25|           NULL|
|  3|  Bob| 40|           NULL|
|  4|Alice| 29|           NULL|
+---+-----+---+---------------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import from_csv, col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("ReprocessCorruptRecord") \
    .getOrCreate()

# Define the schema for parsing
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Input data simulating corrupt records
data = [
    ("3", "Jane", None, '2,Jane,30')  # _corrupt_record contains the original CSV row
]

# Define schema for the DataFrame
schema_with_corrupt_record = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("_corrupt_record", StringType(), True)
])

# Create DataFrame with explicit schema
df = spark.createDataFrame(data, schema=schema_with_corrupt_record)

# Parse the _corrupt_record column using the schema
parsed_df = df.withColumn(
    "reprocessed",
    from_csv(col("_corrupt_record"), schema)
)

# Show the results
parsed_df.show(truncate=False)

# Optionally, extract the parsed fields into separate columns
final_df = parsed_df.select(
    col("reprocessed.id").alias("id"),
    col("reprocessed.name").alias("name"),
    col("reprocessed.age").alias("age")
)
final_df.show()