In [0]:
#importing sparksession
 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
 
spark = SparkSession.builder.appName("Read").getOrCreate()

In [0]:
data = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/FileStore/FileStore/data-2.csv")

data.display()

id,name,age
101.0,Sophia,28
102.0,Liam,35
103.0,Olivia,undefined
104.0,,40
,,
105.0,Noah,-5
106.0,Emma,thirty


In [0]:
mycsv_schema = StructType([
    StructField("id",IntegerType(),True),
    StructField("name",StringType(),True),
    StructField("age",IntegerType(),True),
    StructField("_corrupt_record",StringType(),True)
])

In [0]:
df_validated = data.withColumn("age", when(col("age") > 0, col("age")).otherwise(None))
df_validated.display()

id,name,age
101.0,Sophia,28.0
102.0,Liam,35.0
103.0,Olivia,
104.0,,40.0
,,
105.0,Noah,
106.0,Emma,


**PERMISSIVE**


In [0]:
data1 = spark.read.format("csv").schema(mycsv_schema).option("mode", "PERMISSIVE").load("/FileStore/FileStore/data-2.csv")
data1 = data1.withColumn("age", when(col("age") > 0, col("age")).otherwise(None))

data1.display()

id,name,age,_corrupt_record
,name,,"id,name,age"
101.0,Sophia,28.0,
102.0,Liam,35.0,
103.0,Olivia,,"103,Olivia,undefined"
104.0,,40.0,
,,,"NA,,"
105.0,Noah,,
106.0,Emma,,"106,Emma,thirty"


**DROPMAL FORMED**


In [0]:
data2 = spark.read.format("csv").schema(mycsv_schema).option("mode", "DROPMALFORMED").load("/FileStore/FileStore/data-2.csv")
data2 = data2.withColumn("age", when(col("age") > 0, col("age")).otherwise(None))
data2.display()

id,name,age,_corrupt_record
101,Sophia,28.0,
102,Liam,35.0,
104,,40.0,
105,Noah,,


**FAILFAST**

In [0]:
data3 = spark.read.format("csv").schema(mycsv_schema).option("mode", "FAILFAST").load("/FileStore/FileStore/data-2.csv")
data3 = data3.withColumn("age", when(col("age") > 0, col("age")).otherwise(None))
data3.display()

**CORRUPT DATA STORAGE**

In [0]:
bad_data = spark.read.format("csv") \
    .schema(mycsv_schema) \
    .option("header", "TRUE") \
    .option("badRecordsPath", "/FileStore/FileStore/baddata") \
    .load("/FileStore/FileStore/data.csv")

bad_data = bad_data.withColumn("age", when(col("age") > 0, col("age")).otherwise(None))

bad_data.display()

id,name,age,_corrupt_record
101,Sophia,28.0,
102,Liam,35.0,
104,,40.0,
105,Noah,,


In [0]:
dbutils.fs.ls("/FileStore/FileStore/baddata/20241127T001931/bad_records/")

Out[53]: [FileInfo(path='dbfs:/FileStore/FileStore/baddata/20241127T001931/bad_records/part-00000-f01b128f-8ca7-462c-831c-2466a669ca44', name='part-00000-f01b128f-8ca7-462c-831c-2466a669ca44', size=431, modificationTime=1732666773000)]

In [0]:
baddata_read = spark.read.format("csv").load("/FileStore/FileStore/baddata/20241127T001931/bad_records/")


baddata_read.display()

_c0,_c1,_c2,_c3,_c4
"{""path"":""dbfs:/FileStore/FileStore/data.csv""","""record"":""103",Olivia,"undefined""","""reason"":""java.lang.NumberFormatException: For input string: \""undefined\""""}"
"{""path"":""dbfs:/FileStore/FileStore/data.csv""","""record"":""NA",,""",""reason"":""java.lang.NumberFormatException: For input string: \""NA\""""}",
"{""path"":""dbfs:/FileStore/FileStore/data.csv""","""record"":""106",Emma,"thirty""","""reason"":""java.lang.NumberFormatException: For input string: \""thirty\""""}"
