In [1]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("PySpark").getOrCreate()

In [7]:
# Read a csv file into dataframe
emp = spark.read.format("csv").load("data/input/emp.csv")

In [8]:
emp.show()

+----------+---------+--------+--------------------+------------+--------------------+-------------------+-------------+------+
|       _c0|      _c1|     _c2|                 _c3|         _c4|                 _c5|                _c6|          _c7|   _c8|
+----------+---------+--------+--------------------+------------+--------------------+-------------------+-------------+------+
|EmployeeID|FirstName|LastName|               Email| PhoneNumber|            JobTitle|         Department|DateOfJoining|Salary|
|       101|     John|     Doe|john.doe@example.com|123-456-7890|   Software Engineer|                 IT|   2021-05-10| 85000|
|       102|     Jane|   Smith|jane.smith@exampl...|987-654-3210|      Data Scientist|     Data Analytics|   2019-11-20| 95000|
|       103|      Bob| Johnson|bob.johnson@examp...|555-123-4567|          HR Manager|    Human Resources|   2017-07-15| 78000|
|       104|    Alice|Williams|alice.williams@ex...|444-321-7654|Marketing Coordin...|          Marketin

In [9]:
emp.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)



In [10]:
emp = spark.read.format("csv").options(header=True).load("data/input/emp.csv")

In [11]:
emp.show()

+----------+---------+--------+--------------------+------------+--------------------+-------------------+-------------+------+
|EmployeeID|FirstName|LastName|               Email| PhoneNumber|            JobTitle|         Department|DateOfJoining|Salary|
+----------+---------+--------+--------------------+------------+--------------------+-------------------+-------------+------+
|       101|     John|     Doe|john.doe@example.com|123-456-7890|   Software Engineer|                 IT|   2021-05-10| 85000|
|       102|     Jane|   Smith|jane.smith@exampl...|987-654-3210|      Data Scientist|     Data Analytics|   2019-11-20| 95000|
|       103|      Bob| Johnson|bob.johnson@examp...|555-123-4567|          HR Manager|    Human Resources|   2017-07-15| 78000|
|       104|    Alice|Williams|alice.williams@ex...|444-321-7654|Marketing Coordin...|          Marketing|   2022-03-01| 60000|
|       105|  Charlie|   Brown|charlie.brown@exa...|333-456-7891|     Project Manager|         Operation

In [12]:
emp.printSchema()

root
 |-- EmployeeID: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- PhoneNumber: string (nullable = true)
 |-- JobTitle: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- DateOfJoining: string (nullable = true)
 |-- Salary: string (nullable = true)



In [14]:
# Let spark identify column datatype
option = {
    "header": True,
    "inferSchema": True
}
emp = spark.read.format("csv").options(**option).load("data/input/emp.csv")

In [15]:
emp.printSchema()

root
 |-- EmployeeID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- PhoneNumber: string (nullable = true)
 |-- JobTitle: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- DateOfJoining: date (nullable = true)
 |-- Salary: integer (nullable = true)



In [18]:
# Create schema
_schema = "EmployeeID int, FirstName string, LastName string, Email string, PhoneNumber string, JobTitle string, Department string, DateOfJoining date, Salary int"
emp = spark.read.format("csv").options(header=True).schema(_schema).load("data/input/emp.csv")

In [19]:
emp.printSchema()

root
 |-- EmployeeID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- PhoneNumber: string (nullable = true)
 |-- JobTitle: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- DateOfJoining: date (nullable = true)
 |-- Salary: integer (nullable = true)



In [36]:
# Handle bad records - PERMISSIVE (Default Mode)
_schema = "EmployeeID int, FirstName string, LastName string, Email string, PhoneNumber string, JobTitle string, Department string, DateOfJoining date, Salary int, _corrupt_record string"
emp_p = spark.read.format("csv").options(header=True).schema(_schema).load("data/input/emp_new.csv")

In [39]:
emp_p.where("_corrupt_record is null").show()

+----------+---------+--------+--------------------+------------+--------------------+-------------------+-------------+------+---------------+
|EmployeeID|FirstName|LastName|               Email| PhoneNumber|            JobTitle|         Department|DateOfJoining|Salary|_corrupt_record|
+----------+---------+--------+--------------------+------------+--------------------+-------------------+-------------+------+---------------+
|       101|     John|     Doe|john.doe@example.com|123-456-7890|   Software Engineer|                 IT|   2021-05-10| 85000|           NULL|
|       102|     Jane|   Smith|jane.smith@exampl...|987-654-3210|      Data Scientist|     Data Analytics|   2019-11-20| 95000|           NULL|
|       103|      Bob| Johnson|bob.johnson@examp...|555-123-4567|          HR Manager|    Human Resources|   2017-07-15| 78000|           NULL|
|       104|    Alice|Williams|alice.williams@ex...|444-321-7654|Marketing Coordin...|          Marketing|   2022-03-01| 60000|         

In [45]:
# Rename default _corrupt_record column
_schema = "EmployeeID int, FirstName string, LastName string, Email string, PhoneNumber string, JobTitle string, Department string, DateOfJoining date, Salary int, bad_record string"
emp_p_renamed = spark.read.format("csv").schema(_schema).option("columnNameOfCorruptRecord","bad_record").option("header",True).load("data/input/emp_new.csv")
emp_p_renamed.where("bad_record is null").show()

+----------+---------+--------+--------------------+------------+--------------------+-------------------+-------------+------+----------+
|EmployeeID|FirstName|LastName|               Email| PhoneNumber|            JobTitle|         Department|DateOfJoining|Salary|bad_record|
+----------+---------+--------+--------------------+------------+--------------------+-------------------+-------------+------+----------+
|       101|     John|     Doe|john.doe@example.com|123-456-7890|   Software Engineer|                 IT|   2021-05-10| 85000|      NULL|
|       102|     Jane|   Smith|jane.smith@exampl...|987-654-3210|      Data Scientist|     Data Analytics|   2019-11-20| 95000|      NULL|
|       103|      Bob| Johnson|bob.johnson@examp...|555-123-4567|          HR Manager|    Human Resources|   2017-07-15| 78000|      NULL|
|       104|    Alice|Williams|alice.williams@ex...|444-321-7654|Marketing Coordin...|          Marketing|   2022-03-01| 60000|      NULL|
|       105|  Charlie|   Br

In [46]:
# Handle bad records - DROPMALFORMED (Drops bad records directly)
_schema = "EmployeeID int, FirstName string, LastName string, Email string, PhoneNumber string, JobTitle string, Department string, DateOfJoining date, Salary int"
df_m = spark.read.format("csv").schema(_schema).option("header",True).option("mode","DROPMALFORMED").load("data/input/emp_new.csv")

In [47]:
df_m.show()

+----------+---------+--------+--------------------+------------+--------------------+-------------------+-------------+------+
|EmployeeID|FirstName|LastName|               Email| PhoneNumber|            JobTitle|         Department|DateOfJoining|Salary|
+----------+---------+--------+--------------------+------------+--------------------+-------------------+-------------+------+
|       101|     John|     Doe|john.doe@example.com|123-456-7890|   Software Engineer|                 IT|   2021-05-10| 85000|
|       102|     Jane|   Smith|jane.smith@exampl...|987-654-3210|      Data Scientist|     Data Analytics|   2019-11-20| 95000|
|       103|      Bob| Johnson|bob.johnson@examp...|555-123-4567|          HR Manager|    Human Resources|   2017-07-15| 78000|
|       104|    Alice|Williams|alice.williams@ex...|444-321-7654|Marketing Coordin...|          Marketing|   2022-03-01| 60000|
|       105|  Charlie|   Brown|charlie.brown@exa...|        NULL|     Project Manager|         Operation

In [48]:
# Handle bad records - FAILFAST (Fails job directly when encounter bad records)
_schema = "EmployeeID int, FirstName string, LastName string, Email string, PhoneNumber string, JobTitle string, Department string, DateOfJoining date, Salary int"
df_m = spark.read.format("csv").schema(_schema).option("header",True).option("mode","FAILFAST").load("data/input/emp_new.csv")

In [49]:
df_m.show()

Py4JJavaError: An error occurred while calling o229.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 25) (4f44685e94d2 executor driver): org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [108,John,Doe,john.doe@example.com,123-456-7890,Software Engineer,IT,null,85000].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. 
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1610)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:456)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.time.format.DateTimeParseException: Text 'Manager' could not be parsed at index 0
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:365)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:307)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:452)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 26 more
Caused by: java.time.format.DateTimeParseException: Text 'Manager' could not be parsed at index 0
	at java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2052)
	at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1880)
	at org.apache.spark.sql.catalyst.util.Iso8601DateFormatter.parse(DateFormatter.scala:57)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$18(UnivocityParser.scala:221)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:291)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$17(UnivocityParser.scala:219)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:346)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at jdk.internal.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [108,John,Doe,john.doe@example.com,123-456-7890,Software Engineer,IT,null,85000].
Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. 
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1610)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:79)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:456)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.time.format.DateTimeParseException: Text 'Manager' could not be parsed at index 0
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:365)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:307)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:452)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 26 more
Caused by: java.time.format.DateTimeParseException: Text 'Manager' could not be parsed at index 0
	at java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2052)
	at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1880)
	at org.apache.spark.sql.catalyst.util.Iso8601DateFormatter.parse(DateFormatter.scala:57)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$18(UnivocityParser.scala:221)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:291)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$17(UnivocityParser.scala:219)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:346)
	... 29 more
