In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CorruptRecord") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "4") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/18 05:33:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# id,name,age,salary,address,nominee
employee_schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('salary', IntegerType(), True),
    StructField('address', StringType(), True),
    StructField('nominee', StringType(), True),
    StructField('_bad_record', StringType(), True)
])

In [3]:
employee_df = spark.read.format('csv')\
            .option('header', 'true')\
            .option('inferschemaa', 'false')\
            .schema(employee_schema)\
            .option('mode', 'PERMISSIVE')\
            .option('columnNameOfCorruptRecord', '_bad_record')\
            .load('/opt/spark-data/input/employee_data.csv')

employee_df.show(truncate = False)

                                                                                

+---+--------+---+------+---------+--------+--------------------------------------------+
|id |name    |age|salary|address  |nominee |_bad_record                                 |
+---+--------+---+------+---------+--------+--------------------------------------------+
|1  |Manish  |26 |75000 |bihar    |nominee1|NULL                                        |
|2  |Nikita  |23 |100000|up       |nominee2|NULL                                        |
|3  |Pritam  |22 |150000|Bangalore|India   |3,Pritam,22,150000,Bangalore,India,nominee3 |
|4  |Prantosh|17 |200000|Kolakata |India   |4,Prantosh,17,200000,Kolakata,India,nominee4|
|5  |Vikash  |31 |300000|NULL     |nominee5|NULL                                        |
+---+--------+---+------+---------+--------+--------------------------------------------+



In [4]:
## Example of storing bad records in a file

employee_schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('salary', IntegerType(), True),
    StructField('address', StringType(), True),
    StructField('nominee', StringType(), True)
])

In [5]:
employee_df = spark.read.format('csv')\
            .option('header', 'true')\
            .option('inferschemaa', 'false')\
            .schema(employee_schema)\
            # .option('mode', 'PERMISSIVE')\ Not Allowed with badRecordsPath --- atleast in databricks
            .option('badRecordsPath', '/opt/spark-data/output/')\
            .load('/opt/spark-data/input/employee_data.csv')

employee_df.show(truncate = False)

[Stage 1:>                                                          (0 + 1) / 1]

+---+--------+---+------+---------+--------+
|id |name    |age|salary|address  |nominee |
+---+--------+---+------+---------+--------+
|1  |Manish  |26 |75000 |bihar    |nominee1|
|2  |Nikita  |23 |100000|up       |nominee2|
|3  |Pritam  |22 |150000|Bangalore|India   |
|4  |Prantosh|17 |200000|Kolakata |India   |
|5  |Vikash  |31 |300000|NULL     |nominee5|
+---+--------+---+------+---------+--------+



                                                                                

In [6]:
## Did not work for me since `badRecordsPath` not available for vanilla spark, present in databricks 
## Solution for me:

In [7]:
employee_schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('salary', IntegerType(), True),
    StructField('address', StringType(), True),
    StructField('nominee', StringType(), True),
    StructField('_bad_record', StringType(), True)
])

employee_df = spark.read.format('csv')\
            .option('header', 'true')\
            .option('inferschemaa', 'false')\
            .schema(employee_schema)\
            .option('mode', 'PERMISSIVE')\
            .option('columnNameOfCorruptRecord', '_bad_record')\
            .load('/opt/spark-data/input/employee_data.csv')

employee_df_good = employee_df.filter('_bad_record IS NULL')
employee_df_bad = employee_df.filter('_bad_record IS NOT NULL')

In [8]:
employee_df_good.show(truncate = False)

+---+------+---+------+-------+--------+-----------+
|id |name  |age|salary|address|nominee |_bad_record|
+---+------+---+------+-------+--------+-----------+
|1  |Manish|26 |75000 |bihar  |nominee1|NULL       |
|2  |Nikita|23 |100000|up     |nominee2|NULL       |
|5  |Vikash|31 |300000|NULL   |nominee5|NULL       |
+---+------+---+------+-------+--------+-----------+



In [9]:
employee_df_bad.show(truncate = False)

+---+--------+---+------+---------+-------+--------------------------------------------+
|id |name    |age|salary|address  |nominee|_bad_record                                 |
+---+--------+---+------+---------+-------+--------------------------------------------+
|3  |Pritam  |22 |150000|Bangalore|India  |3,Pritam,22,150000,Bangalore,India,nominee3 |
|4  |Prantosh|17 |200000|Kolakata |India  |4,Prantosh,17,200000,Kolakata,India,nominee4|
+---+--------+---+------+---------+-------+--------------------------------------------+

