In [0]:
# How to handle corrupt records and read it in different modes 

'''
id,name,age,salary,address,nominee
1,Manish,26,75000,bihar,nominee1
2,Nikita,23,100000,uttarpradesh,nominee2
3,Pritam,22,150000,Bangalore,India,nominee3
4,Prantosh,17,200000,Kolkata,India,nominee4
5,Vikash,31,300000,,nominee5

This is how my data looks like look at id 3,4 they are corrupted now below we will see how can we handle it in diff ways
'''

In [0]:
# When used permissivive mode

df = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","PERMISSIVE")\
            .load("/FileStore/tables/csv.txt")

df.show()

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        null|nominee5|
+---+--------+---+------+------------+--------+



In [0]:
# when used in dropmalformed not take into consideration the corrupt data so those data will be excluded

df1 = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","DROPMALFORMED")\
            .load("/FileStore/tables/csv.txt")

df1.show()

# as you can see only 3 rows the 2 corrupted rows not considered In FAILFAST you wont even get this and directly error will be thrown 

+---+------+---+------+------------+--------+
| id|  name|age|salary|     address| nominee|
+---+------+---+------+------------+--------+
|  1|Manish| 26| 75000|       bihar|nominee1|
|  2|Nikita| 23|100000|uttarpradesh|nominee2|
|  5|Vikash| 31|300000|        null|nominee5|
+---+------+---+------+------------+--------+



In [0]:
df.show()

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        null|nominee5|
+---+--------+---+------+------------+--------+



In [0]:
# How can we print corrupted records 

# We will use permissive mode and in that there is a feature where in you can store corrupted data below is the process on how you do it 

# First create a schema use that schema and read file so whenever there is records in corrupted coloumn that means that row has corrupted records

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
                        StructField("id",IntegerType(),True),
                        StructField("name",StringType(),True),
                        StructField("age",IntegerType(),True),
                        StructField("salary",IntegerType(),True),
                        StructField("address",StringType(),True),
                        StructField("nominee",StringType(),True),
                        StructField("_corrupt_record",StringType(),True) # name should be this different name will not give full record but will just put the record which is next
                            ])

df2 = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .option("mode","PERMISSIVE")\
            .schema(schema)\
            .load("/FileStore/tables/csv.txt")

df2.show(truncate= False)  # Truncate false will not cut data and show full value

# here you can see 3, 4th row has corrupted coloumn filled 

+---+--------+---+------+------------+--------+-------------------------------------------+
|id |name    |age|salary|address     |nominee |_corrupt_record                            |
+---+--------+---+------+------------+--------+-------------------------------------------+
|1  |Manish  |26 |75000 |bihar       |nominee1|null                                       |
|2  |Nikita  |23 |100000|uttarpradesh|nominee2|null                                       |
|3  |Pritam  |22 |150000|Bangalore   |India   |3,Pritam,22,150000,Bangalore,India,nominee3|
|4  |Prantosh|17 |200000|Kolkata     |India   |4,Prantosh,17,200000,Kolkata,India,nominee4|
|5  |Vikash  |31 |300000|null        |nominee5|null                                       |
+---+--------+---+------+------------+--------+-------------------------------------------+



In [0]:
# How can we store corrupt data in some other file for reference to maybe later clean it 

# we use a option in spark while readin from csv so what it does is whenever it detects corrupt data in _coorupt_record coloumn it sends that data into another file storage that we specified we can then delete this data from our main df for further analysis and get back to this data later 

df2 = spark.read.format("csv")\
            .option("header","true")\
            .option("inferschema","true")\
            .schema(schema)\
            .option("badRecordsPath","/FileStore/tables/bad_records")\
            .load("/FileStore/tables/csv.txt")

# so here when we use option of badrecords 1st we need schema and sceondly we do not need which mode to run on since badrecord has its own default mode set to work 

# This badrecords are default stored in json format 

df2.show()

+---+------+---+------+------------+--------+---------------+
| id|  name|age|salary|     address| nominee|_corrupt_record|
+---+------+---+------+------------+--------+---------------+
|  1|Manish| 26| 75000|       bihar|nominee1|           null|
|  2|Nikita| 23|100000|uttarpradesh|nominee2|           null|
|  5|Vikash| 31|300000|        null|nominee5|           null|
+---+------+---+------+------------+--------+---------------+



In [0]:
%fs
ls /FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/2010_summary.csv,2010_summary.csv,7121,1741277334000
dbfs:/FileStore/tables/bad_records/,bad_records/,0,0
dbfs:/FileStore/tables/csv.txt,csv.txt,230,1741448093000


In [0]:
# Copy path from catlog in databricks where your files are present
%fs
ls /FileStore/tables/bad_records/20250308T161530/bad_records/



path,name,size,modificationTime
dbfs:/FileStore/tables/bad_records/20250308T161530/bad_records/part-00000-787ea353-c81e-474d-b4b5-56fab9ca38a1,part-00000-787ea353-c81e-474d-b4b5-56fab9ca38a1,474,1741450532000


In [0]:
# How can we read this bad data

bad_df = spark.read.format("json").load("/FileStore/tables/bad_records/20250308T161530/bad_records/part-00000-787ea353-c81e-474d-b4b5-56fab9ca38a1")

bad_df.show()

#This is how we can store and see bad records 

+--------------------+--------------------+--------------------+
|                path|              reason|              record|
+--------------------+--------------------+--------------------+
|dbfs:/FileStore/t...|org.apache.spark....|3,Pritam,22,15000...|
|dbfs:/FileStore/t...|org.apache.spark....|4,Prantosh,17,200...|
+--------------------+--------------------+--------------------+

