In [0]:
spark

### **Read `csv` file in spark**


[Reference](https://spark.apache.org/docs/latest/sql-data-sources-csv.html)

🔶 DataFrame Reader API  &nbsp;&nbsp;➜&nbsp;&nbsp;  <code>spark.read</code>

<br>

<pre>
  spark.read.format(...)\
            .option("key", "value")\
            .schema("...")\
            .load("file path")
</pre>

<br>

**.format( )** &nbsp;&nbsp;&nbsp;&nbsp;---&nbsp;&nbsp;&nbsp; optional &nbsp;&nbsp;&nbsp;---&nbsp;&nbsp;&nbsp; <code>csv</code> , &nbsp;<code>json</code> , &nbsp;<code>jdbc/odbc</code> , &nbsp;<code>table</code> , &nbsp;<code>parquet</code> <br>
**.option( )** &nbsp;&nbsp;&nbsp;&nbsp;---&nbsp;&nbsp;&nbsp; optional &nbsp;&nbsp;&nbsp;---&nbsp;&nbsp;&nbsp; <code>header</code> , &nbsp;<code>inferschema</code> , &nbsp;<code>mode</code> <br>
**.schema( )** &nbsp;&nbsp;&nbsp;---&nbsp;&nbsp;&nbsp; optional &nbsp;&nbsp;&nbsp;---&nbsp;&nbsp;&nbsp;&nbsp; manual schema which you can pass <br>
**.load( )** &nbsp;&nbsp;-------------------------&nbsp;&nbsp;&nbsp;&nbsp; path to where your data is residing

In [0]:
flight_data = spark.read.format("csv").option("header", "false").option("inferschema", "false").option("mode", "FAILFAST").load("/FileStore/tables/flight_data.csv")

flight_data.show(5)

+-----------------+-------------------+-----+
|              _c0|                _c1|  _c2|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
flight_data = spark.read.format("csv")\
                        .option("header", "false")\
                        .option("inferschema", "false")\
                        .option("mode", "FAILFAST")\
                        .load("/FileStore/tables/flight_data.csv")
                        
flight_data.show(5)

+-----------------+-------------------+-----+
|              _c0|                _c1|  _c2|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
flight_data = spark.read.format("CSV")\
                        .option("header", "False")\
                        .option("inferschema", "False")\
                        .option("mode", "failfast")\
                        .load("/FileStore/tables/flight_data.csv")
                        
flight_data.show(5)

+-----------------+-------------------+-----+
|              _c0|                _c1|  _c2|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
type(flight_data)

Out[6]: pyspark.sql.dataframe.DataFrame

In [0]:
flight_data.columns

Out[7]: ['_c0', '_c1', '_c2']

In [0]:
flight_data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)



In [0]:
flight_data_header = spark.read.format("csv")\
                               .option("header", "true")\
                               .option("inferschema", "false")\
                               .option("mode", "FAILFAST")\
                               .load("/FileStore/tables/flight_data.csv")
                        
flight_data_header.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
flight_data_header.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: string (nullable = true)



In [0]:
flight_data_header.columns

Out[11]: ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [0]:
flight_data_header_schema = spark.read.format("csv")\
                                      .option("header", "true")\
                                      .option("inferschema", "true")\
                                      .option("mode", "FAILFAST")\
                                      .load("/FileStore/tables/flight_data.csv")

flight_data_header_schema.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
flight_data_header_schema.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



■ &nbsp;Note that the `count` column is inferred as **integer** type, which was not the case earlier

### Schema in spark

Possible Interview Questions ⭐
 - > How to create schema in PySpark? <br>
    What are the other ways to creating it?
 - > What is StructType & StructField in schema?
 - > What if I have header in my data?

🔶 There are **2 ways of creating a schema** :

1. using pyspark classes like <code>StrucType</code> & <code>StructField</code> 

      <code>StructField</code>  &nbsp;&nbsp;===>&nbsp;&nbsp;  **column** of type Struct class <br>

      <code>StructType </code>  &nbsp;&nbsp;&nbsp;===>&nbsp;&nbsp;  defines the structure of a **DataFrame** <br>
                                &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;**list** or collection of Struct Fields<br><br>


      <pre> <br>
            from pyspark.sql.types import StructType, StructField, IntegerType, StringType
            
            my_schema = StrucType([
                                    StructField("Id", IntegerType(), True),                        <i># Column Name ,  Data Type ,  Nullable = True</i>
                                    StructField("Name", StringType(), True),
                                    StrucField("Age", StringType(), True)
                                  ])

      </pre>

2. DDL

      <br>


      <pre><br>
            ddl_my_schema = "Id integer, Name string, Age string"

      </pre>

In [0]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType

my_schema = StructType(
                        [
                            StructField('DEST_COUNTRY_NAME', StringType(), True),
                            StructField('ORIGIN_COUNTRY_NAME', StringType(), True),
                            StructField('count', IntegerType(), True)
                        ]
            )

In [0]:
flight_data = spark.read.format("csv")\
                        .option("header", "false")\
                        .option("inferschema", "false")\
                        .schema(my_schema)\
                        .option("mode", "FAILFAST")\
                        .load("/FileStore/tables/flight_data.csv")

flight_data.show(5)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-2123660797405044>:8[0m
[1;32m      1[0m flight_data [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)\
[1;32m      2[0m                         [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mfalse[39m[38;5;124m"[39m)\
[1;32m      3[0m                         [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferschema[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mfalse[39m[38;5;124m"[39m)\
[1;32m      4[0m                         [38;5;241m.[39mschema(my_schema)\
[1;32m      5[0m                         [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mmode[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mFAILFAST[39m[38;5;124m"

🔶 &nbsp;Let's try to **resolve the error** :

In [0]:
%fs
ls /FileStore/tables/


path,name,size,modificationTime
dbfs:/FileStore/tables/employee_file.csv,employee_file.csv,230,1736739949000
dbfs:/FileStore/tables/flight_data.csv,flight_data.csv,7121,1736578138000


In [0]:
flight_data = spark.read.format("csv")\
                        .option("header", "false")\
                        .option("inferschema", "false")\
                        .schema(my_schema)\
                        .option("mode", "PERMISSIVE")\
                        .load("/FileStore/tables/flight_data.csv")

flight_data.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
+-----------------+-------------------+-----+
only showing top 5 rows



As you can see <code>mode = "FAILFAST"</code> was not working **since the first record is malformed** because of the <code>null</code> value in the <code>count</code> column which is although defined to be nullable.

⭐ One way to remove the malformed record is to use <code>skipRows</code> key in the <code>option()</code> method like the following :

In [0]:
flight_data = spark.read.format("csv")\
                        .option("header", "false")\
                        .option("skipRows", 1)\
                        .option("inferschema", "false")\
                        .schema(my_schema)\
                        .option("mode", "PERMISSIVE")\
                        .load("/FileStore/tables/flight_data.csv")

flight_data.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



⭐ However if you make **header True** , then the csv file can be loaded with <code>mode = "FailFast"</code> too and **you will not get to see any malformed record too**.

In [0]:
flight_data = spark.read.format("csv")\
                        .option("header", "true")\
                        .option("inferschema", "false")\
                        .schema(my_schema)\
                        .option("mode", "FAILFAST")\
                        .load("/FileStore/tables/flight_data.csv")

flight_data.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



### Error Handling &nbsp;&nbsp;-&nbsp;&nbsp; Handling corrupted records in spark

Possible Interview Questions ⭐
  - Have you ever worked with corrupt records?
  - When do you say a record is corrupt?
  - What happens when we encounter with corrupt records in different read modes?
  - How can we print bad records?
  - Where do you store corrupt records and how can we access it later?

🔶 &nbsp;**Mode** &nbsp;- There are 3 different read modes :
  > - <code>PERMISSIVE</code> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;&nbsp;&nbsp; default <br>
  > &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;&nbsp;&nbsp; the error values are stored as NULL and the entire row is saved as a column
  >
  > - <code>DROPMALFORMED</code> &nbsp;&nbsp;&nbsp;-&nbsp;&nbsp;&nbsp; drops the corrupted record &nbsp;(all rows having atleast one error will be dropped entirely)
  >
  > - <code>FAILFAST</code> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;-&nbsp;&nbsp;&nbsp; throw an error and fails the entire execution if there is any malformed record in the dataset

★ &nbsp;<code>badRecordsPath</code>  &nbsp;&nbsp;&nbsp;&nbsp;===>&nbsp;&nbsp;&nbsp;&nbsp; **Corrupted records are saved at a specific path**


Let's take the following **sample data** in the **comma-separated-value** (csv) for example :

<br>

<pre><br>
    id,name,age,salary,address,nominee                        ----->  total 6 columns
    1,Manish,26,75000,bihar,nominee1                          ----->  total 6 columns
    2,Nikita,23,100000,uttarpradesh,nominee2                  ----->  total 6 columns
    3,Pritam,22,150000,Bangalore,India,nominee3               ----->  total 7 columns            ----->  malformed record
    4,Prantosh,17,200000,Kolkata,India,nominee4               ----->  total 7 columns            ----->  malformed record
    5,Vikash,31,300000,,nominee5                              ----->  total 6 columns
<br>
</pre>

Let's now see many records will be loaded in the <code>PERMISSIVE</code> , <code>DROPMALFORMED</code> & <code>FAILFAST</code> modes respectively?

In [0]:
employee_df = spark.read.format("csv")\
                        .option("header", "true")\
                        .option("inferschema", "true")\
                        .option("mode", "permissive")\
                        .load("/FileStore/tables/employee_file.csv")
                        
employee_df.show(5)

+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        null|nominee5|
+---+--------+---+------+------------+--------+



In [0]:
employee_df = spark.read.format("csv")\
                        .option("header", "true")\
                        .option("inferschema", "true")\
                        .option("mode", "dropmalformed")\
                        .load("/FileStore/tables/employee_file.csv")
                        
employee_df.show(5)

+---+------+---+------+------------+--------+
| id|  name|age|salary|     address| nominee|
+---+------+---+------+------------+--------+
|  1|Manish| 26| 75000|       bihar|nominee1|
|  2|Nikita| 23|100000|uttarpradesh|nominee2|
|  5|Vikash| 31|300000|        null|nominee5|
+---+------+---+------+------------+--------+



In [0]:
employee_df = spark.read.format("csv")\
                        .option("header", "true")\
                        .option("inferschema", "true")\
                        .option("mode", "failfast")\
                        .load("/FileStore/tables/employee_file.csv")
                        
employee_df.show(5)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-1518284113161485>:7[0m
[1;32m      1[0m employee_df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)\
[1;32m      2[0m                         [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      3[0m                         [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferschema[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      4[0m                         [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mmode[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mfailfast[39m[38;5;124m"[39m)\
[1;32m      5[0m                         [38;5;241m.[39mload([38;5;1

🔶 Let's now see **how do we print bad records ?** 

In [0]:
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path: Union[str, List[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, NoneType] = None, inferSchema: Union[bool, str, NoneType] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[int, str, NoneType] = None, maxCharsPerColumn: Union[int, str, NoneType] = None, maxMalformedLogPerPartition: Union[int, str, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, NoneType] 

■ &nbsp;As you see, we can store the **corrupted record** in an entirely dedicated **separate column** &nbsp;<code>columnNameOfCorruptRecord</code>. <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;You can give any name of your choice to this column.

■ &nbsp;But for that we need to **first update the schema** by adding a column for a corrupted record.

In [0]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType

employee_schema = StructType(
                              [
                                StructField("id", IntegerType(), True),
                                StructField("name", StringType(), True),
                                StructField("age", IntegerType(), True),
                                StructField("salary", IntegerType(), True),
                                StructField("address", StringType(), True),
                                StructField("nominee", StringType(), True),
                                StructField("_corrupt_record", StringType(), True)             # extra column for storing the corrupt record(s)
                              ]
                  )

In [0]:
employee_df = spark.read.format("csv")\
                        .option("header", "true")\
                        .option("inferSchema", "false")\
                        .schema(employee_schema)\
                        .option("mode", "PERMISSIVE")\
                        .load("/FileStore/tables/employee_file.csv")

employee_df.show(5)

+---+--------+---+------+------------+--------+--------------------+
| id|    name|age|salary|     address| nominee|     _corrupt_record|
+---+--------+---+------+------------+--------+--------------------+
|  1|  Manish| 26| 75000|       bihar|nominee1|                null|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|                null|
|  3|  Pritam| 22|150000|   Bangalore|   India|3,Pritam,22,15000...|
|  4|Prantosh| 17|200000|     Kolkata|   India|4,Prantosh,17,200...|
|  5|  Vikash| 31|300000|        null|nominee5|                null|
+---+--------+---+------+------------+--------+--------------------+



In [0]:
employee_df.show(5, truncate = False)

+---+--------+---+------+------------+--------+-------------------------------------------+
|id |name    |age|salary|address     |nominee |_corrupt_record                            |
+---+--------+---+------+------------+--------+-------------------------------------------+
|1  |Manish  |26 |75000 |bihar       |nominee1|null                                       |
|2  |Nikita  |23 |100000|uttarpradesh|nominee2|null                                       |
|3  |Pritam  |22 |150000|Bangalore   |India   |3,Pritam,22,150000,Bangalore,India,nominee3|
|4  |Prantosh|17 |200000|Kolkata     |India   |4,Prantosh,17,200000,Kolkata,India,nominee4|
|5  |Vikash  |31 |300000|null        |nominee5|null                                       |
+---+--------+---+------+------------+--------+-------------------------------------------+



In real life scenario, you will be having significantly very high number of corrupt records which will be difficult to display like this. You may like to store it at some place for the reference.

🔶 So now the question is **where do you store corrupt records so that it can be accessed later?** <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; The answer is <code>badRecordsPath</code> in the <code>option( )</code> method. 

In [0]:
%fs
ls /FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/employee_file.csv,employee_file.csv,230,1736739949000
dbfs:/FileStore/tables/flight_data.csv,flight_data.csv,7121,1736578138000


In [0]:
employee_df = spark.read.format("csv")\
                        .option("header", "true")\
                        .option("inferSchema", "false")\
                        .schema(employee_schema)\
                        .option("mode", "PERMISSIVE")\
                        .option("badRecordsPath", "/FileStore/tables/")\
                        .load("/FileStore/tables/employee_file.csv")

employee_df.show(5)

[0;31m---------------------------------------------------------------------------[0m
[0;31mIllegalArgumentException[0m                  Traceback (most recent call last)
File [0;32m<command-907689892493747>:9[0m
[1;32m      1[0m employee_df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)\
[1;32m      2[0m                         [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      3[0m                         [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferSchema[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mfalse[39m[38;5;124m"[39m)\
[0;32m   (...)[0m
[1;32m      6[0m                         [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mbadRecordsPath[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124m/FileStore/tables/[39m[38;5;124m"[39m)\
[1;32m      7[0m               

In [0]:
employee_df = spark.read.format("csv")\
                        .option("header", "true")\
                        .option("inferSchema", "false")\
                        .schema(employee_schema)\
                        .option("badRecordsPath", "/FileStore/tables/")\
                        .load("/FileStore/tables/employee_file.csv")

employee_df.show(5)

+---+------+---+------+------------+--------+---------------+
| id|  name|age|salary|     address| nominee|_corrupt_record|
+---+------+---+------+------------+--------+---------------+
|  1|Manish| 26| 75000|       bihar|nominee1|           null|
|  2|Nikita| 23|100000|uttarpradesh|nominee2|           null|
|  5|Vikash| 31|300000|        null|nominee5|           null|
+---+------+---+------+------------+--------+---------------+



In [0]:
%fs
ls /FileStore/tables/                          

path,name,size,modificationTime
dbfs:/FileStore/tables/20250113T121639/,20250113T121639/,0,0
dbfs:/FileStore/tables/employee_file.csv,employee_file.csv,230,1736739949000
dbfs:/FileStore/tables/flight_data.csv,flight_data.csv,7121,1736578138000


In [0]:
%fs
ls /FileStore/tables/20250113T121639/

path,name,size,modificationTime
dbfs:/FileStore/tables/20250113T121639/bad_records/,bad_records/,0,0


In [0]:
%fs
ls /FileStore/tables/20250113T121639/bad_records/

path,name,size,modificationTime
dbfs:/FileStore/tables/20250113T121639/bad_records/part-00000-966eb689-84c8-41b4-af24-5df7942bc023,part-00000-966eb689-84c8-41b4-af24-5df7942bc023,494,1736770601000


In [0]:
bad_records_df = spark.read.format("json").load("/FileStore/tables/20250113T121639/bad_records/")
bad_records_df.show()

+--------------------+--------------------+--------------------+
|                path|              reason|              record|
+--------------------+--------------------+--------------------+
|dbfs:/FileStore/t...|org.apache.spark....|3,Pritam,22,15000...|
|dbfs:/FileStore/t...|org.apache.spark....|4,Prantosh,17,200...|
+--------------------+--------------------+--------------------+



In [0]:
bad_records_df.show(truncate=False)

+----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|path                                    |reason                                                                                                                          |record                                     |
+----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|dbfs:/FileStore/tables/employee_file.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 3,Pritam,22,150000,Bangalore,India,nominee3|3,Pritam,22,150000,Bangalore,India,nominee3|
|dbfs:/FileStore/tables/employee_file.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 4,Prantos