## Handling corrupted records

In [2]:
from pyspark.sql import SparkSession

In [4]:
spark = (
    SparkSession.builder \
        .appName("Handling corrupted records") \
        .master("local[*]") \
        .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/03 23:48:43 WARN Utils: Your hostname, Shrees-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 10.28.47.103 instead (on interface en0)
25/11/03 23:48:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/03 23:48:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/03 23:48:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
spark

In [6]:
data_df = spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .option("mode","PERMISSIVE")\
    .load("data.csv")

In [7]:
data_df.show()

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        NULL| nominee|
+---+--------+---+------+------------+--------+



In [8]:
from pyspark.sql.types import StringType, StructField, StructType, IntegerType

In [9]:
data_schema = StructType(
                            [StructField("id",IntegerType(),True),
                             StructField("name",StringType(),True),
                             StructField("age",IntegerType(),True),
                             StructField("salary",IntegerType(),True),
                             StructField("address",StringType(),True),
                             StructField("nominee",StringType(),True),
                             StructField("corrupt_record",StringType(),True)
                            ])

In [10]:
data_df = spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .schema(data_schema)\
    .option("mode","PERMISSIVE")\
    .load("data.csv")

In [11]:
data_df.show(5)

+---+--------+---+------+------------+--------+--------------+
| id|    name|age|salary|     address| nominee|corrupt_record|
+---+--------+---+------+------------+--------+--------------+
|  1|  Manish| 26| 75000|       bihar|nominee1|          NULL|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|          NULL|
|  3|  Pritam| 22|150000|   Bangalore|   India|      nominee3|
|  4|Prantosh| 17|200000|     Kolkata|   India|      nominee4|
|  5|  Vikash| 31|300000|        NULL| nominee|          NULL|
+---+--------+---+------+------------+--------+--------------+



25/11/03 23:48:48 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 6, schema size: 7
CSV file: file:///Users/shreejhariya/iCloud_Drive_(Archive)/Documents/Workspace/Pyspark/data.csv


In [40]:
# Saving corrupted data (bad_records) to different file location
data_df = spark.read.format("csv")\
    .option("header","true")\
    .option("inferschema","true")\
    .schema(data_schema)\
    .option("badRecordsPath","/iCloud_Drive_(Archive)/Documents/Workspace/bad_records")\
    .load("data.csv")

In [42]:
data_df.show()

+---+--------+---+------+------------+--------+--------------+
| id|    name|age|salary|     address| nominee|corrupt_record|
+---+--------+---+------+------------+--------+--------------+
|  1|  Manish| 26| 75000|       bihar|nominee1|          NULL|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|          NULL|
|  3|  Pritam| 22|150000|   Bangalore|   India|      nominee3|
|  4|Prantosh| 17|200000|     Kolkata|   India|      nominee4|
|  5|  Vikash| 31|300000|        NULL| nominee|          NULL|
+---+--------+---+------+------------+--------+--------------+



25/11/03 23:52:57 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 6, schema size: 7
CSV file: file:///Users/shreejhariya/iCloud_Drive_(Archive)/Documents/Workspace/Pyspark/data.csv


In [44]:
%ls /iCloud_Drive_(Archive)/Documents/Workspace/Pyspark/bad_records

zsh:1: no matches found: /iCloud_Drive_(Archive)/Documents/Workspace/Pyspark/bad_records
