In [18]:
import findspark
findspark.init()

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("corrupt data handling").getOrCreate()

filepath="/Users/bindu/Documents/sales.txt"
df=spark.read.format("csv")\
.option("header","true")\
.option("delimiter","|")\
.load(filepath)

In [19]:
df.show()

+----+----+----------+------+----+
|  id|name|  sales_dt|amount|flag|
+----+----+----------+------+----+
|   1| bin|01-01-2024|  1000|   y|
| 2tt|   a|03-01-2024|  1320|   y|
|   3|   b|04-01-2024|  2000|   y|
|   4|   d|05-01-2024|  3000|   n|
|5qwe|   f|06-01-2024|  4000|   n|
|   6|   r|07-01-2024|  5000|   n|
+----+----+----------+------+----+



In [24]:
#Data in id 2 and 5 are corrupted
#handle by passing those data as null

from pyspark.sql.types import *
schema=StructType([
StructField('id',IntegerType()),
StructField('name',StringType()),
StructField('sales_dt',StringType()),
StructField('amount',IntegerType()),
StructField('flag',StringType())])

dfschema=spark.read.format("csv")\
.option("header","true")\
.option("delimiter","|")\
.schema(schema)\
.load(filepath)



In [25]:
dfschema.show()

+----+----+----------+------+----+
|  id|name|  sales_dt|amount|flag|
+----+----+----------+------+----+
|   1| bin|01-01-2024|  1000|   y|
|null|   a|03-01-2024|  1320|   y|
|   3|   b|04-01-2024|  2000|   y|
|   4|   d|05-01-2024|  3000|   n|
|null|   f|06-01-2024|  4000|   n|
|   6|   r|07-01-2024|  5000|   n|
+----+----+----------+------+----+



In [28]:
#remove the corrupted data column value alone using permissive option
dfpermissive=spark.read.format("csv")\
.option("header","true")\
.option("delimiter","|")\
.option('mode','PERMISSIVE').schema(schema)\
.load(filepath)

dfpermissive.show()


+----+----+----------+------+----+
|  id|name|  sales_dt|amount|flag|
+----+----+----------+------+----+
|   1| bin|01-01-2024|  1000|   y|
|null|   a|03-01-2024|  1320|   y|
|   3|   b|04-01-2024|  2000|   y|
|   4|   d|05-01-2024|  3000|   n|
|null|   f|06-01-2024|  4000|   n|
|   6|   r|07-01-2024|  5000|   n|
+----+----+----------+------+----+



In [29]:
#remove the corrupted data row itself using dropmalformed option
dfdrop=spark.read.format("csv")\
.option("header","true")\
.option("delimiter","|")\
.option('mode','DROPMALFORMED').schema(schema)\
.load(filepath)

dfdrop.show()

+---+----+----------+------+----+
| id|name|  sales_dt|amount|flag|
+---+----+----------+------+----+
|  1| bin|01-01-2024|  1000|   y|
|  3|   b|04-01-2024|  2000|   y|
|  4|   d|05-01-2024|  3000|   n|
|  6|   r|07-01-2024|  5000|   n|
+---+----+----------+------+----+



In [30]:
#fail the session itself
dfp=spark.read.format("csv")\
.option("header","true")\
.option("delimiter","|")\
.option('mode','FAILFAST').schema(schema)\
.load(filepath)

dfp.show()

Py4JJavaError: An error occurred while calling o165.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10) (host.docker.internal executor driver): org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:421)
	at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "2tt"
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:330)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:275)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 23 more
Caused by: java.lang.NumberFormatException: For input string: "2tt"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at scala.collection.StringOps$.toInt$extension(StringOps.scala:915)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6(UnivocityParser.scala:163)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6$adapted(UnivocityParser.scala:163)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:259)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$5(UnivocityParser.scala:163)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:312)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.malformedRecordsDetectedInRecordParsingError(QueryExecutionErrors.scala:1417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:68)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:421)
	at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:587)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "2tt"
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:330)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:275)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:417)
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
	... 23 more
Caused by: java.lang.NumberFormatException: For input string: "2tt"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at scala.collection.StringOps$.toInt$extension(StringOps.scala:915)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6(UnivocityParser.scala:163)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6$adapted(UnivocityParser.scala:163)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:259)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$5(UnivocityParser.scala:163)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:312)
	... 26 more


In [41]:
#copy the corrupted data to other file

schemaE=StructType([
StructField('id',IntegerType()),
StructField('name',StringType()),
StructField('sales_dt',StringType()),
StructField('amount',IntegerType()),
StructField('flag',StringType()),
StructField('errorRecords',StringType())])


dfc=spark.read.format('csv').option("header","true").option("delimiter","|").option('mode','PERMISSIVE').schema(schemaE)\
.option('columnNameOfCorruptRecord','errorRecords')\
.load(filepath)

dfc.show()


+----+----+----------+------+----+--------------------+
|  id|name|  sales_dt|amount|flag|        errorRecords|
+----+----+----------+------+----+--------------------+
|   1| bin|01-01-2024|  1000|   y|                null|
|null|   a|03-01-2024|  1320|   y|2tt|a|03-01-2024|...|
|   3|   b|04-01-2024|  2000|   y|                null|
|   4|   d|05-01-2024|  3000|   n|                null|
|null|   f|06-01-2024|  4000|   n|5qwe|f|06-01-2024...|
|   6|   r|07-01-2024|  5000|   n|                null|
+----+----+----------+------+----+--------------------+



In [45]:
#filter the non corrupt data
df_non_error=dfc.filter(col('errorRecords').isNull())
df_non_error.show()

+---+----+----------+------+----+------------+
| id|name|  sales_dt|amount|flag|errorRecords|
+---+----+----------+------+----+------------+
|  1| bin|01-01-2024|  1000|   y|        null|
|  3|   b|04-01-2024|  2000|   y|        null|
|  4|   d|05-01-2024|  3000|   n|        null|
|  6|   r|07-01-2024|  5000|   n|        null|
+---+----+----------+------+----+------------+



In [46]:
#filter the corrupt data
df_non_error=dfc.filter(col('errorRecords').isNotNull())
df_non_error.show()

+----+----+----------+------+----+--------------------+
|  id|name|  sales_dt|amount|flag|        errorRecords|
+----+----+----------+------+----+--------------------+
|null|   a|03-01-2024|  1320|   y|2tt|a|03-01-2024|...|
|null|   f|06-01-2024|  4000|   n|5qwe|f|06-01-2024...|
+----+----+----------+------+----+--------------------+

