
# 📘 Spark DataFrame Reading & Schema Handling - Notes & Tips

## 🔍 Data Reading Modes in Spark
When reading data using Spark, the `.option("mode", "<MODE>")` setting controls how Spark handles malformed or corrupt rows:

- **PERMISSIVE (default)**: Spark includes corrupt records with nulls and stores raw data in a special column `_corrupt_record`.
- **DROPMALFORMED**: Drops rows that don't match the schema.
- **FAILFAST**: Fails immediately if it encounters bad records.

### Example:
```python
.option("mode", "PERMISSIVE")     # Tolerant: fills in nulls, logs bad ones
.option("mode", "DROPMALFORMED")  # Skips bad records
.option("mode", "FAILFAST")       # Throws error on first bad record
```

---

## 🛠️ Handling Corrupt Records
Use the `badRecordsPath` option to store invalid records for debugging.

### Example:
```python
.option("badRecordsPath", "/FileStore/tables/bad_records")
```

Spark saves malformed rows as JSON, which you can later read like:
```python
spark.read.json("/FileStore/tables/bad_records/<timestamp-folder>")
```

---

## 🧱 Schema Creation Techniques

### 1. **DDL Format (String schema)**
```python
schema = "id INT, name STRING, age INT, salary INT, address STRING, nominee STRING"
```

### 2. **StructType Programmatic Schema**
```python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", IntegerType(), True),
    StructField("address", StringType(), True),
    StructField("nominee", StringType(), True)
])
```

### 3. **Infer Schema (not recommended for production)**
```python
.option("inferSchema", "true")  # Spark guesses the schema
```

---

These techniques help ensure your DataFrame reads are robust and ready for real-world messy data. Use `FAILFAST` in pipelines that need to abort on error, and `badRecordsPath` when you want to log & analyze.


In [0]:
spark

In [0]:
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv").option("header","false")\
# Setting read options
    .option("inferschema","false")\
# Setting read options
        .option("mode","FAILFAST")\
# Loading the data from specified path
        .load("/FileStore/tables/2015_summary.csv")
# Displaying the DataFrame content
df.show()

+--------------------+-------------------+-----+
|                 _c0|                _c1|  _c2|
+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United State

In [0]:
display(df)

_c0,_c1,_c2
DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40


In [0]:
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv").option("header","true")\
# Setting read options
    .option("inferschema","false")\
# Setting read options
        .option("mode","FAILFAST")\
# Loading the data from specified path
        .load("/FileStore/tables/2015_summary.csv")
# Displaying the DataFrame content
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [0]:
display(df)

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
from pyspark.sql.types import StructField,StructType,StringType,IntegerType

In [0]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: string (nullable = true)



In [0]:
my_schema=StructType(
    [
        StructField("DEST_COUNTRY_NAME",StringType(),True),
        StructField("ORIGIN_COUNTRY_NAME",StringType(),True),
        StructField("count",IntegerType(),True)
    ]
)

In [0]:
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv").option("header","false")\
# Setting read options
    .option("inferschema","false")\
# Defining schema manually
     .schema(my_schema)\
# Setting read options
        .option("mode","FAILFAST")\
# Loading the data from specified path
        .load("/FileStore/tables/2015_summary.csv")
# Displaying the DataFrame content
df.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-4490155068148989>:6[0m
[1;32m      1[0m df[38;5;241m=[39mspark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)[38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mfalse[39m[38;5;124m"[39m)\
[1;32m      2[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferschema[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mfalse[39m[38;5;124m"[39m)\
[1;32m      3[0m      [38;5;241m.[39mschema(my_schema)\
[1;32m      4[0m         [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mmode[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mFAILFAST[39m[38;5;124m"[39m)\
[1;32m      5[0m         [38;5;241m.[39mload([38;5;124m"[39m[38;5;124m/FileStore/tables/2015_summar

In [0]:
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv").option("header","false")\
# Setting read options
    .option("inferschema","false")\
# Defining schema manually
     .schema(my_schema)\
# Setting read options
        .option("mode","PERMISSIVE")\
# Loading the data from specified path
        .load("/FileStore/tables/2015_summary.csv")
# Displaying the DataFrame content
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| null|
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United State

In [0]:
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv").option("header","false")\
# Setting read options
    .option("inferschema","false")\
# Setting read options
     .option("skipRows",1)\
# Defining schema manually
     .schema(my_schema)\
# Setting read options
        .option("mode","PERMISSIVE")\
# Loading the data from specified path
        .load("/FileStore/tables/2015_summary.csv")
# Displaying the DataFrame content
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [0]:
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv").option("header","false")\
# Setting read options
    .option("inferschema","false")\
# Setting read options
     .option("skipRows",1)\
# Defining schema manually
     .schema(my_schema)\
# Setting read options
        .option("mode","FAILFAST")\
# Loading the data from specified path
        .load("/FileStore/tables/2015_summary.csv")
# Displaying the DataFrame content
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

DDL stands for Data Definition Language, and it comes from the good ol’ SQL world where you use commands like CREATE TABLE to define schemas and data types.

In Spark, a DDL-style schema is a string-based representation of a schema that looks like the column definitions you'd find in a SQL CREATE TABLE statement.

In [0]:
ddl_schema = "DEST_COUNTRY_NAME STRING,  ORIGIN_COUNTRY_NAME STRING,Count INT"
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv").option("header","false")\
# Setting read options
    .option("inferschema","false")\
# Setting read options
     .option("skipRows",1)\
# Defining schema manually
     .schema(ddl_schema)\
# Setting read options
        .option("mode","FAILFAST")\
# Loading the data from specified path
        .load("/FileStore/tables/2015_summary.csv")
# Displaying the DataFrame content
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|Count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [0]:
#handling Corrupted Data

In [0]:
# Reading data using Spark DataFrame Reader
#FailFast method Fails as soon as it encounnters bad data
df=spark.read.format("csv")\
# Setting read options
             .option("inferschema","true")\
# Setting read options
            .option("mode","FAILFAST")\
# Loading the data from specified path
            .load("/FileStore/tables/Employee.csv")
# Displaying the DataFrame content
df.show()



[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-334292990152322>:5[0m
[1;32m      1[0m df[38;5;241m=[39mspark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mcsv[39m[38;5;124m"[39m)\
[1;32m      2[0m              [38;5;241m.[39moption([38;5;124m"[39m[38;5;124minferschema[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)\
[1;32m      3[0m             [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mmode[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mFAILFAST[39m[38;5;124m"[39m)\
[1;32m      4[0m             [38;5;241m.[39mload([38;5;124m"[39m[38;5;124m/FileStore/tables/Employee.csv[39m[38;5;124m"[39m)
[0;32m----> 5[0m df[38;5;241m.[39mshow()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>

In [0]:
# Reading data using Spark DataFrame Reader
#Permissive method doesnt fail but it gives bad record
df=spark.read.format("csv")\
# Setting read options
             .option("inferschema","true")\
# Setting read options
              .option("header","true")\
# Setting read options
            .option("mode","PERMISSIVE")\
# Loading the data from specified path
            .load("/FileStore/tables/Employee.csv")
# Displaying the DataFrame content
df.show()


+---+--------+---+------+------------+--------+
| id|    name|age|salary|     address| nominee|
+---+--------+---+------+------------+--------+
|  1|  Manish| 26| 75000|       bihar|nominee1|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|
|  3|  Pritam| 22|150000|   Bangalore|   India|
|  4|Prantosh| 17|200000|     Kolkata|   India|
|  5|  Vikash| 31|300000|        null|nominee5|
+---+--------+---+------+------------+--------+



In [0]:
# Reading data using Spark DataFrame Reader
#dropmalformed drops the corrupt part
df=spark.read.format("csv")\
# Setting read options
             .option("inferschema","true")\
# Setting read options
              .option("header","true")\
# Setting read options
            .option("mode","DROPMALFORMED")\
# Loading the data from specified path
            .load("/FileStore/tables/Employee.csv")
# Displaying the DataFrame content
df.show()



+---+------+---+------+------------+--------+
| id|  name|age|salary|     address| nominee|
+---+------+---+------+------------+--------+
|  1|Manish| 26| 75000|       bihar|nominee1|
|  2|Nikita| 23|100000|uttarpradesh|nominee2|
|  5|Vikash| 31|300000|        null|nominee5|
+---+------+---+------+------------+--------+



In [0]:
from pyspark.sql.types import *

In [0]:
schema=StructType(
              [
                  StructField("ID",IntegerType(),True),
                  StructField("name",StringType(),True),
                  StructField("age",IntegerType(),True),
                  StructField("Salary",IntegerType(),True),
                  StructField("address",StringType(),True),
                  StructField("nominee",StringType(),True),
                  StructField("_corrupt_record",StringType(),True)





              ]


)

In [0]:
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv")\
# Setting read options
             .option("inferschema","false")\
# Setting read options
              .option("header","true")\
# Defining schema manually
               .schema(schema)\
# Setting read options
            .option("mode","PERMISSIVE")\
# Loading the data from specified path
            .load("/FileStore/tables/Employee.csv")
# Displaying the DataFrame content
df.show()



+---+--------+---+------+------------+--------+--------------+
| ID|    name|age|Salary|     address| nominee|Corrupt_Record|
+---+--------+---+------+------------+--------+--------------+
|  1|  Manish| 26| 75000|       bihar|nominee1|          null|
|  2|  Nikita| 23|100000|uttarpradesh|nominee2|          null|
|  3|  Pritam| 22|150000|   Bangalore|   India|      nominee3|
|  4|Prantosh| 17|200000|     Kolkata|   India|      nominee4|
|  5|  Vikash| 31|300000|        null|nominee5|          null|
+---+--------+---+------+------------+--------+--------------+



In [0]:
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv")\
# Setting read options
             .option("inferschema","true")\
# Setting read options
              .option("header","true")\
# Defining schema manually
               .schema(schema)\
# Setting read options
            .option("mode","PERMISSIVE")\
# Loading the data from specified path
            .load("/FileStore/tables/Employee.csv")
# Displaying the DataFrame content
df.show(truncate = False)



+---+--------+---+------+------------+--------+--------------+
|ID |name    |age|Salary|address     |nominee |Corrupt_Record|
+---+--------+---+------+------------+--------+--------------+
|1  |Manish  |26 |75000 |bihar       |nominee1|null          |
|2  |Nikita  |23 |100000|uttarpradesh|nominee2|null          |
|3  |Pritam  |22 |150000|Bangalore   |India   |nominee3      |
|4  |Prantosh|17 |200000|Kolkata     |India   |nominee4      |
|5  |Vikash  |31 |300000|null        |nominee5|null          |
+---+--------+---+------+------------+--------+--------------+



In [0]:
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv")\
# Setting read options
             .option("inferschema","true")\
# Setting read options
              .option("header","true")\
# Defining schema manually
               .schema(schema)\
# Setting read options
            .option("mode","PERMISSIVE")\
# Loading the data from specified path
            .load("/FileStore/tables/Employee.csv")
# Displaying the DataFrame content
df.show(truncate = False)



+---+--------+---+------+------------+--------+-------------------------------------------+
|ID |name    |age|Salary|address     |nominee |_corrupt_record                            |
+---+--------+---+------+------------+--------+-------------------------------------------+
|1  |Manish  |26 |75000 |bihar       |nominee1|null                                       |
|2  |Nikita  |23 |100000|uttarpradesh|nominee2|null                                       |
|3  |Pritam  |22 |150000|Bangalore   |India   |3,Pritam,22,150000,Bangalore,India,nominee3|
|4  |Prantosh|17 |200000|Kolkata     |India   |4,Prantosh,17,200000,Kolkata,India,nominee4|
|5  |Vikash  |31 |300000|null        |nominee5|null                                       |
+---+--------+---+------+------------+--------+-------------------------------------------+



In [0]:
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv")\
# Setting read options
             .option("inferschema","false")\
# Setting read options
              .option("header","true")\
# Defining schema manually
               .schema(schema)\
# Setting read options
            .option("mode","PERMISSIVE")\
# Loading the data from specified path
            .load("/FileStore/tables/Employee.csv")
# Displaying the DataFrame content
df.show(truncate = False)



+---+--------+---+------+------------+--------+-------------------------------------------+
|ID |name    |age|Salary|address     |nominee |_corrupt_record                            |
+---+--------+---+------+------------+--------+-------------------------------------------+
|1  |Manish  |26 |75000 |bihar       |nominee1|null                                       |
|2  |Nikita  |23 |100000|uttarpradesh|nominee2|null                                       |
|3  |Pritam  |22 |150000|Bangalore   |India   |3,Pritam,22,150000,Bangalore,India,nominee3|
|4  |Prantosh|17 |200000|Kolkata     |India   |4,Prantosh,17,200000,Kolkata,India,nominee4|
|5  |Vikash  |31 |300000|null        |nominee5|null                                       |
+---+--------+---+------+------------+--------+-------------------------------------------+



In [0]:
# Listing files in DBFS directory
dbutils.fs.ls('/')

Out[2]: [FileInfo(path='dbfs:/FileStore/', name='FileStore/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/', name='databricks-datasets/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-results/', name='databricks-results/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/', name='mnt/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/', name='user/', size=0, modificationTime=0)]

In [0]:
# Reading data using Spark DataFrame Reader
df=spark.read.format("csv")\
# Setting read options
             .option("inferschema","false")\
# Setting read options
              .option("header","true")\
# Defining schema manually
               .schema(schema)\
# Setting read options
            .option("badRecordsPath","/FileStore/tables/bad_recods")\
# Loading the data from specified path
            .load("/FileStore/tables/Employee.csv")
# Displaying the DataFrame content
df.show(truncate = False)



+---+------+---+------+------------+--------+---------------+
|ID |name  |age|Salary|address     |nominee |_corrupt_record|
+---+------+---+------+------------+--------+---------------+
|1  |Manish|26 |75000 |bihar       |nominee1|null           |
|2  |Nikita|23 |100000|uttarpradesh|nominee2|null           |
|5  |Vikash|31 |300000|null        |nominee5|null           |
+---+------+---+------+------------+--------+---------------+



In [0]:
# Listing files in DBFS directory
dbutils.fs.ls('/FileStore/tables/bad_recods')

Out[26]: [FileInfo(path='dbfs:/FileStore/tables/bad_recods/20250420T094451/', name='20250420T094451/', size=0, modificationTime=0)]

In [0]:
# Listing files in DBFS directory
files = dbutils.fs.ls("/FileStore/tables/bad_records")
for f in files:
    print(f.name)


20250420T092938/


In [0]:
# Listing files in DBFS directory
dbutils.fs.ls("/FileStore/tables/bad_records/20250420T092938")


Out[22]: [FileInfo(path='dbfs:/FileStore/tables/bad_records/20250420T092938/bad_records/', name='bad_records/', size=0, modificationTime=0)]

In [0]:
%fs
ls      /FileStore/tables/bad_recods/20250420T094451/bad_records/                                                     

path,name,size,modificationTime
dbfs:/FileStore/tables/bad_recods/20250420T094451/bad_records/part-00000-4c1e22bb-622b-4b94-b5b8-d0f018ea03ed,part-00000-4c1e22bb-622b-4b94-b5b8-d0f018ea03ed,484,1745142292000


In [0]:
# Replace with the actual subfolder inside bad_records
# Reading data using Spark DataFrame Reader
bad_data_df = spark.read.format("json").load("/FileStore/tables/bad_recods/20250420T094451/bad_records")
# Displaying the DataFrame content
bad_data_df.show(truncate=False)

+-----------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|path                               |reason                                                                                                                          |record                                     |
+-----------------------------------+--------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------+
|dbfs:/FileStore/tables/Employee.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 3,Pritam,22,150000,Bangalore,India,nominee3|3,Pritam,22,150000,Bangalore,India,nominee3|
|dbfs:/FileStore/tables/Employee.csv|org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 4,Prantosh,17,200000,Kolkata,India

In [0]:
display(bad_data_df)

path,reason,record
dbfs:/FileStore/tables/Employee.csv,"org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 3,Pritam,22,150000,Bangalore,India,nominee3","3,Pritam,22,150000,Bangalore,India,nominee3"
dbfs:/FileStore/tables/Employee.csv,"org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: 4,Prantosh,17,200000,Kolkata,India,nominee4","4,Prantosh,17,200000,Kolkata,India,nominee4"


In [0]:
\
\