In [0]:
flight_data_path = '/FileStore/tables/flight_data.csv'

In [0]:
#initiate spark session
spark

In [0]:
flight_df = spark.read.format('csv')\
            .option('header','true')\
            .option("inferschema",'false')\
            .option("mode","FAILFAST")\
            .load(flight_data_path)
        

In [0]:
flight_df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [0]:
#show top 5 rows

flight_df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
#print schema

flight_df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: string (nullable = true)



In [0]:
#infer schema is true

flight_df_schema_True = spark.read.format('csv')\
            .option('header','true')\
            .option("inferschema",'true')\
            .option("mode","FAILFAST")\
            .load(flight_data_path)

In [0]:
flight_df_schema_True.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [0]:
#create manual schema

from pyspark.sql.types import StructField,StructType,StringType,IntegerType

my_schema = StructType([
                    StructField("DEST_COUNTRY_NAME",StringType(),False),
                    StructField("ORIGIN_COUNTRY_NAME",StringType(),False),
                    StructField("count",IntegerType(),False)])

In [0]:
#manual schema

flight_df_man_schema = spark.read.format('csv')\
            .option('header','false')\
            .option('skipRows',1)\
            .option("inferschema",'false')\
            .schema(my_schema)\
            .option("mode","PERMISSIVE")\
            .load(flight_data_path)

In [0]:
flight_df_man_schema.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
%fs
ls /FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/2015_summary.csv,2015_summary.csv,7080,1687933946000
dbfs:/FileStore/tables/employee-1.csv,employee-1.csv,200,1688192849000
dbfs:/FileStore/tables/employee.csv,employee.csv,204,1688192595000
dbfs:/FileStore/tables/flight_data-1.csv,flight_data-1.csv,7080,1687938593000
dbfs:/FileStore/tables/flight_data-2.csv,flight_data-2.csv,7080,1687938743000
dbfs:/FileStore/tables/flight_data-3.csv,flight_data-3.csv,7080,1687938780000
dbfs:/FileStore/tables/flight_data.csv,flight_data.csv,7080,1687933980000


In [0]:
# employee data path
employee_data_path = '/FileStore/tables/employee-1.csv'

In [0]:
#Topic = Handling corrupted data - permissive

employee_df = spark.read.format('csv')\
            .option('header','true')\
            .option("inferschema",'true')\
            .option("mode","PERMISSIVE")\
            .load(employee_data_path)
employee_df.show()

+---+------+----+------+---------+---------+
| id|  name|age |salary|  address|  nominee|
+---+------+----+------+---------+---------+
|  1|manish|  26| 75000|   bihar |    nom 1|
|  2|nikita|  24| 60000|       up|    nom 2|
|  3|vishak|  27| 65000|bengaluru|karnatkta|
|  4|  ashu|  31| 80000|   bhopal|       mp|
|  5|  riya|  29| 85000|     null|    nom 5|
+---+------+----+------+---------+---------+



In [0]:
#Topic = Handling corrupted data - Dropmalformed

employee_df_dm = spark.read.format('csv')\
            .option('header','true')\
            .option("inferschema",'true')\
            .option("mode","DROPMALFORMED")\
            .load(employee_data_path)
employee_df_dm.show()

+---+------+----+------+-------+-------+
| id|  name|age |salary|address|nominee|
+---+------+----+------+-------+-------+
|  1|manish|  26| 75000| bihar |  nom 1|
|  2|nikita|  24| 60000|     up|  nom 2|
|  5|  riya|  29| 85000|   null|  nom 5|
+---+------+----+------+-------+-------+



In [0]:
#Topic = Handling corrupted data - Dropmalformed

employee_df_ff = spark.read.format('csv')\
            .option('header','true')\
            .option("inferschema",'true')\
            .option("mode","FAILFAST")\
            .load(employee_data_path)
employee_df_ff.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-2995567228245922>:8[0m
[1;32m      1[0m [38;5;66;03m#Topic = Handling corrupted data - Dropmalformed[39;00m
[1;32m      3[0m employee_df_ff [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m'[39m[38;5;124mcsv[39m[38;5;124m'[39m)\
[1;32m      4[0m             [38;5;241m.[39moption([38;5;124m'[39m[38;5;124mheader[39m[38;5;124m'[39m,[38;5;124m'[39m[38;5;124mtrue[39m[38;5;124m'[39m)\
[1;32m      5[0m             [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferschema[39m[38;5;124m"[39m,[38;5;124m'[39m[38;5;124mtrue[39m[38;5;124m'[39m)\
[1;32m      6[0m             [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mmode[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mFAILFAST[39m[38;5;124m"[39m)\
[1;32m      7[0m  

In [0]:
#Print bad record by creating schema

from pyspark.sql.types import StringType,StructField,StructType,IntegerType


In [0]:
emp_schema = StructType([
    StructField("id",IntegerType(),True),
    StructField("name",StringType(),True),
    StructField("age",IntegerType(),True),
    StructField("salary",IntegerType(),True),
    StructField("address",StringType(),True),
    StructField("nominee",StringType(),True),
    StructField("_corrupt_record",StringType(),True)
    ])

In [0]:
#Topic = Handling corrupted data - look at corrupt records

employee_df_cr = spark.read.format('csv')\
            .option('header','true')\
            .option("inferschema",'true')\
            .option("mode","Permissive")\
            .schema(emp_schema)\
            .load(employee_data_path)
employee_df_cr.show()

+---+------+---+------+---------+---------+--------------------+
| id|  name|age|salary|  address|  nominee|     _corrupt_record|
+---+------+---+------+---------+---------+--------------------+
|  1|manish| 26| 75000|   bihar |    nom 1|                null|
|  2|nikita| 24| 60000|       up|    nom 2|                null|
|  3|vishak| 27| 65000|bengaluru|karnatkta|3,vishak,27,65000...|
|  4|  ashu| 31| 80000|   bhopal|       mp|4,ashu,31,80000,b...|
|  5|  riya| 29| 85000|     null|    nom 5|                null|
+---+------+---+------+---------+---------+--------------------+



In [0]:
#Topic = Handling corrupted data - look at corrupt records,dont truncate

employee_df_cr = spark.read.format('csv')\
            .option('header','true')\
            .option("inferschema",'true')\
            .option("mode","Permissive")\
            .schema(emp_schema)\
            .load(employee_data_path)
employee_df_cr.show(truncate=False)

+---+------+---+------+---------+---------+-------------------------------------------+
|id |name  |age|salary|address  |nominee  |_corrupt_record                            |
+---+------+---+------+---------+---------+-------------------------------------------+
|1  |manish|26 |75000 |bihar    |nom 1    |null                                       |
|2  |nikita|24 |60000 |up       |nom 2    |null                                       |
|3  |vishak|27 |65000 |bengaluru|karnatkta|3,vishak,27,65000,bengaluru,karnatkta,nom 3|
|4  |ashu  |31 |80000 |bhopal   | mp      |4,ashu,31,80000,bhopal, mp,nom 4           |
|5  |riya  |29 |85000 |null     |nom 5    |null                                       |
+---+------+---+------+---------+---------+-------------------------------------------+



In [0]:
#Topic = Handling corrupted data - look at corrupt records,dont truncate,badrecordspath

employee_df_cr = spark.read.format('csv')\
            .option('header','true')\
            .option("inferschema",'true')\
            .schema(emp_schema)\
            .option("badRecordsPath","/FileStore/tables/bad_records")\
            .load(employee_data_path)
employee_df_cr.show(truncate=False)

+---+------+---+------+-------+-------+---------------+
|id |name  |age|salary|address|nominee|_corrupt_record|
+---+------+---+------+-------+-------+---------------+
|1  |manish|26 |75000 |bihar  |nom 1  |null           |
|2  |nikita|24 |60000 |up     |nom 2  |null           |
|5  |riya  |29 |85000 |null   |nom 5  |null           |
+---+------+---+------+-------+-------+---------------+



In [0]:
%fs
ls /FileStore/tables/bad_records/20230701T112717/bad_records

path,name,size,modificationTime
dbfs:/FileStore/tables/bad_records/20230701T112717/bad_records/part-00000-1e6b3f7f-5bed-4aa6-800f-ce34fdc8a39a,part-00000-1e6b3f7f-5bed-4aa6-800f-ce34fdc8a39a,466,1688210839000


In [0]:
bad_data_df = spark.read.format("json").load("/FileStore/tables/bad_records/20230701T112717/bad_records")

bad_data_df.show()


+--------------------+--------------------+--------------------+
|                path|              reason|              record|
+--------------------+--------------------+--------------------+
|dbfs:/FileStore/t...|org.apache.spark....|3,vishak,27,65000...|
|dbfs:/FileStore/t...|org.apache.spark....|4,ashu,31,80000,b...|
+--------------------+--------------------+--------------------+

