
**Potential Interview Questions**

1. Have you worked with corrupted record?
2. When do you say that it's corrupted record?
3. What happens when we encounter with corrupted records in different read mode?
4. How can we print bad records?
5. Where do you store corrupted records and how can we access it later?


 When do you say that it's corrupted record?
 ---------------------------------------------
 
 **Example of Corrupted record in JSON**

 {
  key:value
 }

 {
  key:value
 }

 {
  key:value


**Example of Corrupted record in CSV**

id,name,age,salary,address,nominee

1,Soumya,23,15000,Odisha,nominee1

2,Jyotsna,23,19000,Mumbai,nominee2

3,Pratisha,17,20000,Kolkata,India,nominee3

4,Pritam,22,100000,Uttarpradesh,India,nominee4

5,Vikash,31,30000,,nominee5

What happens when we encounter with corrupted records in different read mode?
-------------------------------------------------------------------------------

In [0]:
employee_details  = spark.read.format("csv")\
                           .option("header","true")\
                           .option("inferSchema","true")\
                           .option("mode","PERMISSIVE")\
                           .load('/FileStore/tables/employee_details.csv')

print('Total records in permissive mode is ',employee_details .count())

Total records in permissive mode is  5


In [0]:
employee_details = spark.read.format("csv")\
                           .option("header","true")\
                           .option("inferSchema","true")\
                           .option("mode","DROPMALFORMED")\
                           .load('/FileStore/tables/employee_details.csv')

# employee_details .count() only checks for existence of rows, not their validity. If some rows contain malformed data, employee_details .show() might skip them while 
# employee_details .count() includes them.
# In order to fix this, we can use dropna() which will drop all the problematic records.
# I have used how='all' because in our result dataframe one valid row contains null value so if we will not mention how parameter as 'all' then it will drop that record as well.
# Refer dropna() method for detailed reason
print('Total records in DROPMALFORMED mode is ',employee_details .dropna(how='all').count())


Total records in permissive mode is  3


In [0]:
employee_details  = spark.read.format("csv")\
                           .option("header","true")\
                           .option("inferSchema","true")\
                           .option("mode","FAILFAST")\
                           .load('/FileStore/tables/employee_details.csv')
# It throws an exception when it meets corrupted records. Since there are two corrupted records in our dataframe so it will throw an exception
employee_details .show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1934429077472934>:7[0m
[1;32m      1[0m person_details [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)\
[1;32m      2[0m                            [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      3[0m                            [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferSchema[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      4[0m                            [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mmode[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mFAILFAST[39m[38;5;124m"[39m)\
[1;32m      5[0m                            [38;5;241m.[39ml

How can we print bad records?
------------------------------

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema using StructType and StructField
emp_schema = StructType([
                      StructField("id", IntegerType(), True),
                      StructField("name", StringType(), True),
                      StructField("age", IntegerType(), True),
                      StructField("salary", IntegerType(), True),
                      StructField("address", StringType(), True),
                      StructField("nominee", StringType(), True),
                      StructField("corrupt_record",StringType(),True)
                    ])

# Note: The 'True' argument for nullable allows for null values in the columns

# Creating employee details dataframe
# It is mandatory to set the mode to PERMISSIVE in order to include the bad records in the DataFrame.

employee_details  = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "corrupt_record")\
    .schema(emp_schema) \
    .load('/FileStore/tables/employee_details.csv')

employee_details.show(truncate=False)

# Note: If we choose to name the column for corrupted records as "_corrupted_record" during schema definition,
# we can omit this option -> ".option("columnNameOfCorruptRecord", "corrupt_record")". The process will still function correctly.


+---+-------+---+------+-------+--------+--------------+
|id |name   |age|salary|address|nominee |corrupt_record|
+---+-------+---+------+-------+--------+--------------+
|1  |Soumya |23 |15000 |Odisha |nominee1|null          |
|2  |Jyotsna|23 |19000 |Mumbai |nominee2|null          |
|5  |Vikash |31 |30000 |null   |nominee5|null          |
+---+-------+---+------+-------+--------+--------------+



Where do you store corrupted records and how can we access it later?
-----------------------------------------------------------------------

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema using StructType and StructField
emp_schema = StructType([
                      StructField("id", IntegerType(), True),
                      StructField("name", StringType(), True),
                      StructField("age", IntegerType(), True),
                      StructField("salary", IntegerType(), True),
                      StructField("address", StringType(), True),
                      StructField("nominee", StringType(), True)
                    ])

# Creating employee details dataframe
# mode is not allowed if we use "badRecordsPath"

employee_details  = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("badRecordsPath","/FileStore/tables/bad_records")\
    .schema(emp_schema) \
    .load('/FileStore/tables/employee_details.csv')

employee_details.show(truncate=False)

# Note: It will always create a JSON file to store the corrupted records

+---+-------+---+------+-------+--------+
|id |name   |age|salary|address|nominee |
+---+-------+---+------+-------+--------+
|1  |Soumya |23 |15000 |Odisha |nominee1|
|2  |Jyotsna|23 |19000 |Mumbai |nominee2|
|5  |Vikash |31 |30000 |null   |nominee5|
+---+-------+---+------+-------+--------+



In [0]:
display(dbutils.fs.ls('/FileStore/tables/bad_records/20231212T140533/bad_records'))

path,name,size,modificationTime
dbfs:/FileStore/tables/bad_records/20231212T140533/bad_records/part-00000-e9467212-69c6-415b-ac0c-63192ec4b770,part-00000-e9467212-69c6-415b-ac0c-63192ec4b770,504,1702389935000


In [0]:
emp_bad_records_df = spark.read.format("json").load('dbfs:/FileStore/tables/bad_records/20231212T140533/bad_records/part-00000-e9467212-69c6-415b-ac0c-63192ec4b770')
emp_bad_records_df.show(truncate=False)

+-------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------+
|path                                       |reason                                                                                                                             |record                                        |
+-------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------+
|dbfs:/FileStore/tables/employee_details.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 3,Pratisha,17,20000,Kolkata,India,nominee3    |3,Pratisha,17,20000,Kolkata,India,nominee3    |
|dbfs:/FileStore/tables/employee_details.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_

In [0]:
from pyspark.sql.functions import col
bad_records = emp_bad_records_df.select(col('record'))
bad_records.show(truncate=False)

+----------------------------------------------+
|record                                        |
+----------------------------------------------+
|3,Pratisha,17,20000,Kolkata,India,nominee3    |
|4,Pritam,22,100000,Uttarpradesh,India,nominee4|
+----------------------------------------------+

