#### Important - There are 3 typical read modes and the default read mode is permissive.
##### 1. permissive — All fields are set to null and corrupted records are placed in a string column called _corrupt_record
##### 	2. dropMalformed — Drops all rows containing corrupt records.
##### 3. failFast — Fails when corrupt records are encountered.

In [0]:
from pyspark.sql.types import *

custom_schema = StructType([StructField('ID', StringType(), True), StructField('Name', StringType(), True), StructField('Age', StringType(), True), StructField('Salary', StringType(), True), StructField('JoinDate', StringType(), True), StructField('LastLogin', StringType(), True), StructField('Notes', StringType(), True),StructField('Corrupted_data', StringType(), True)])

df1 = spark.read.csv("/Volumes/we47/we47schema/we47volume/we47directory/malformed_data.txt",schema=custom_schema,mode="permissive",comment="#",header=True,multiLine=True,columnNameOfCorruptRecord="Corrupted_data")
print(df1.count())
display(df1)

df2 = spark.read.csv("/Volumes/we47/we47schema/we47volume/we47directory/malformed_data.txt",schema=custom_schema,mode="dropmalformed",comment="#",header=True,multiLine=True,columnNameOfCorruptRecord="Corrupted_data")
print(df2.count())
display(df2)

df3 = spark.read.csv("/Volumes/we47/we47schema/we47volume/we47directory/malformed_data.txt",schema=custom_schema,mode="failfast",comment="#",header=True,multiLine=True,columnNameOfCorruptRecord="Corrupted_data")
print(df3.count())
display(df3)

####CSV Advanced Feature - Very Important, Important, Not Important (just try to know once for [all](url))
**Very Important:** path, schema, sep, header, inferSchema, samplingRatio, mode, columnNameOfCorruptRecord <br>
**Important:** quote, escape,comment, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, dateFormat, timestampFormat, multiLine, enforceSchema,pathGlobFilter, recursiveFileLookup,modifiedBefore,modifiedAfter<br>
**Not Important:** encoding, positiveInf, negativeInf, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, charToEscapeQuoteEscaping, emptyValue, locale, lineSep, unescapedQuoteHandling


In [0]:
custom_schema = "cid string, name string,amount string, dop string"
df1 = spark.read.csv("/Volumes/we47/we47schema/we47volume/we47directory/malformeddata.txt",schema=custom_schema,comment="#")
df1.where("upper(amount) == lower(amount)").show()

In [0]:
#**Very Important:** path, schema, sep, header, inferSchema, samplingRatio, mode, columnNameOfCorruptRecord <br>
struct1="cid int,name string,amt decimal(10,2),dop date"

df1=spark.read.schema(struct1).csv("/Volumes/we47/we47schema/we47volume/we47directory/malformeddata.txt",header=False,sep=",",comment='#',mode='dropMalformed') #Blindly drop malformed records
display(df1)

struct1="cid int,name string,amt decimal(10,2),dop date,corrupt_record string"
df1=spark.read.schema(struct1).csv("/Volumes/we47/we47schema/we47volume/we47directory/malformeddata.txt",header=False,sep=",",comment='#',mode='permissive',columnNameOfCorruptRecord='corrupt_record') #permit malformed records and allow me to do a RCA (Root cause analysis)
display(df1.where("corrupt_record is not null"))

df1=spark.read.schema(struct1).csv("/Volumes/we47/we47schema/we47volume/we47directory/malformeddata.txt",header=False,sep=",",comment='#',mode='failFast')#Fail immediately when malformed records occurs
display(df1)

In [0]:
#**Important:** quote, escape,comment, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace,  dateFormat, timestampFormat, multiLine, enforceSchema,pathGlobFilter, recursiveFileLookup,modifiedBefore,modifiedAfter, nullValue, nanValue<br>

struct1="cid int,name string,amt decimal(10,2),dop date"
df1=spark.read.schema(struct1).csv("/Volumes/we47/we47schema/we47volume/we47directory/malformeddata1.txt",header=False,sep=",",comment='#',quote="'",escape='|',ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True,dateFormat='yyyy-dd-MM',mode="permissive",modifiedAfter='2025-12-20',multiLine=True,nullValue='na')
display(df1)

####JSON Advanced Feature - 
**Very Important** - path,schema,columnNameOfCorruptRecord,dateFormat,timestampFormat,multiLine,pathGlobFilter,recursiveFileLookup<br>
No header, No inferSchema, No sep in json...<br>
**Important** - primitivesAsString(consider all cols as string), prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, lineSep, samplingRatio, dropFieldIfAllNull, modifiedBefore, modifiedAfter, useUnsafeRow(This is performance optimization when the data is loaded into spark memory) <br>
**Not Important** (just try to know once for all) - allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, allowUnquotedControlChars, encoding, locale, allowNonNumericNumbers<br>

In [0]:
df_json=spark.read.json("/Volumes/we47/we47schema/we47volume/we47directory/simple_json.txt",samplingRatio=0.1,prefersDecimal=True)

#primitivesAsString
# df_json=spark.read.json("/Volumes/we47/we47schema/we47volume/we47directory/simple_json.txt",samplingRatio=0.1,primitivesAsString=True)

df_json.printSchema()
df_json.show()

In [0]:
schema1 = "amt float,dop date,id int,name string,corrupt_record string"
df1_json = spark.read.json("/Volumes/we47/we47schema/we47volume/we47directory/simple_json2.txt",schema=schema1,mode="permissive",columnNameOfCorruptRecord="corrupt_record",lineSep="~",dateFormat="yyyy-dd-MM",allowComments=True,allowSingleQuotes=True,allowUnquotedFieldNames=True)
df1_json.printSchema()
df1_json.show()

In [0]:
strt1="id int,name string,amt float, dop date,custom_corruptrow string"
df_json=spark.read.json("/Volumes/we47/we47schema/we47volume/we47directory/simple_json_multiline3.txt",allowComments=True,allowSingleQuotes=True,allowUnquotedFieldNames=True,columnNameOfCorruptRecord="custom_corruptrow",schema=strt1,dateFormat='yyyy-dd-MM',multiLine=True)
#primitivesAsString
df_json.printSchema()
df_json.show(10,False)

####Serialized data Advanced Feature - orc, parquet/delta (very very important & we learn indepth)
- PathOrPaths
- **mergeSchema** - Important interview property (make it proactive/make it driven in the interview) SCHEMA EVOLUTION
- pathGlobFilter
- recursiveFileLookup
- modifiedBefore
- modifiedAfter
Problem statement:
Source is sending data in any way they want...
Day1/source1- 5 cols
Day2/source2 - 7 Cols

1. I am reading the dataframe in csv/json...
2. Writing into a orc/parquet format in a single location.
3. Reading data in a orc/parquet format using mergeSchema option.

problem:
Source is sending data in any way they want...
Day1/source1- 5 cols
Day2/source2 - 7 Cols
1. I am reading the dataframe in csv/json...
2. Writing into a orc/parquet format in a single location.
3. Reading data in a orc/parquet format using mergeSchema option.

In [0]:
#I am converting the CSV data into ORC data format to achieve SCHEMA EVOLUTION(CHANGING)
spark.read.csv("/Volumes/we47/we47schema/we47volume/we47directory/source1.txt",inferSchema=True,header=True).write.orc("/Volumes/we47/we47schema/we47volume/we47directory/orc_targetdata_merged/")
spark.read.csv("/Volumes/we47/we47schema/we47volume/we47directory/source2.txt",inferSchema=True,header=True).write.orc("/Volumes/we47/we47schema/we47volume/we47directory/orc_targetdata_merged/",mode='append')
spark.read.csv("/Volumes/we47/we47schema/we47volume/we47directory/source3.txt",inferSchema=True,header=True).write.orc("/Volumes/we47/we47schema/we47volume/we47directory/orc_targetdata_merged/",mode='append')

In [0]:
orc_data_merged = spark.read.orc("/Volumes/we47/we47schema/we47volume/we47directory/orc_targetdata_merged/",mergeSchema=True)
display(orc_data_merged)

In [0]:
#I am converting the CSV data into ORC data format to achieve SCHEMA EVOLUTION(CHANGING)
spark.read.csv("/Volumes/we47/we47schema/we47volume/we47directory/source1.txt",inferSchema=True,header=True).write.parquet("/Volumes/we47/we47schema/we47volume/we47directory/parquet_targetdata_merged/")
spark.read.csv("/Volumes/we47/we47schema/we47volume/we47directory/source2.txt",inferSchema=True,header=True).write.parquet("/Volumes/we47/we47schema/we47volume/we47directory/parquet_targetdata_merged/",mode='append')
spark.read.csv("/Volumes/we47/we47schema/we47volume/we47directory/source3.txt",inferSchema=True,header=True).write.parquet("/Volumes/we47/we47schema/we47volume/we47directory/parquet_targetdata_merged/",mode='append')

In [0]:
parquet_data_merged = spark.read.parquet("/Volumes/we47/we47schema/we47volume/we47directory/parquet_targetdata_merged/",mergeSchema=True)
display(parquet_data_merged)