In [0]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

corrupt_spark_records = SparkSession.builder.appName("handle corrupt in Apache Spark").getOrCreate()



In [0]:
columns = StructType([
    StructField("Emp_Id", IntegerType(), True),
    StructField("Emp_name", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("_corrupt_record", StringType(), True)
])

# /FileStore/tables/corrupt_records.csv
emp_df = corrupt_spark_records.read.format("csv")\
    .option("inferschema", True)\
    .option("header", True)\
    .schema(columns)\
    .option("columnNameOfCorruptRecord", "_corrupt_record")\
    .load("/FileStore/tables/corrupt_records.csv")

emp_df.show(truncate=False)
emp_df.printSchema()


# Using filtering to filter data - By Filtering the null value of the _corrupt_record column we can get cleaned data or proper data. so that we can proceed with further processing.

emp_df.where(col("_corrupt_record").isNull()).drop("_corrupt_record").show()

# Filter Corrupted records

emp_df.where(col("_corrupt_record").isNotNull()).show(truncate=False)


In [0]:
#  Dealing With Bad or Corrupt Records in Apache Spark
#  /FileStore/tables/bad_records_handling.csv

data_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("corrupt_record", StringType(), True)
])

#  To deal with these cases, we have the following option:

# 1. PERMISSIVE : This is the default mode. Spark will load and process both complete and corrupted data, but for corrupted data it will store null. 
print("Mode -> PERMISSIVE")
corrupt_record_PERMISSIVE = corrupt_spark_records.read.format("csv")\
    .option("header", True)\
    .option("mode", "PERMISSIVE")\
    .schema(data_schema)\
    .load("/FileStore/tables/bad_records_handling.csv")

corrupt_record_PERMISSIVE.show(truncate=False)

# 2. DROPMALFORMED : This mode will drop the corrupted records and will only show the correct records.

print("Mode -> DROPMALFORMED")
corrupt_record_DROPMALFORMED = corrupt_spark_records.read.format("csv")\
    .option("header", True)\
    .option("mode", "DROPMALFORMED")\
    .schema(data_schema)\
    .load("/FileStore/tables/bad_records_handling.csv")

corrupt_record_DROPMALFORMED.show(truncate=False)

# 3. FailFast: In this mode, Spark throws an exception and halts the data loading process when it finds any bad or corrupted records.

print("Mode -> FailFast")
corrupt_record_FailFast = corrupt_spark_records.read.format("csv")\
    .option("header", True)\
    .option("mode", "FailFast")\
    .schema(data_schema)\
    .load("/FileStore/tables/bad_records_handling.csv")

corrupt_record_FailFast.show(truncate=False)

# 4. columnNameOfCorruptRecord Option : This will Store all the corrupted records in new column. This extra column must be defined in schema.

columnNameOfCorruptRecord_df = corrupt_spark_records.read.format("csv")\
    .schema(data_schema)\
    .option("columnNameOfCorruptRecord", "corrupt_record")\
    .load("/FileStore/tables/bad_records_handling.csv")

columnNameOfCorruptRecord_df.show(truncate=False)

# 5.badRecordsPath: Spark processes only the correct records and corrupted or bad records are excluded. Corrupted or bad records will be stored in a file at the badRecordsPath location.

badRecordsPath_df = corrupt_spark_records.read.format("csv")\
    .schema(data_schema)\
    .option("badRecordsPath", "/FileStore/")\
    .load("/FileStore/tables/bad_records_handling.csv")

badRecordsPath_df.show(truncate=False)



Mode -> PERMISSIVE
Mode -> DROPMALFORMED
Mode -> FailFast


In [0]:
%fs
ls /FileStore/20231102T090905/bad_records/part-00000-9c447e30-98a5-4233-ad19-f546ad42c7dc/

path,name,size,modificationTime
dbfs:/FileStore/20231102T090905/bad_records/part-00000-9c447e30-98a5-4233-ad19-f546ad42c7dc,part-00000-9c447e30-98a5-4233-ad19-f546ad42c7dc,472,1698916147000
