#Day69 and 70 - Advanced Read


## JDBC - used to read from / write to external sources
- need drivers
- limitations in free edition / serverless compute

- JDBC conne

#### Read different file formats and show 2 lines of outpur

In [0]:
print('-'*80)
print('csv')
dfcsv = spark.read.csv("/Volumes/workspace/wd36schema/ingestion_volume/target/csvout",header=True,sep='~',inferSchema=True)
dfcsv.show(2)

print('-'*80)
print('json')
dfjson = spark.read.json("/Volumes/workspace/wd36schema/ingestion_volume/target/jsonout")
dfjson.show(2)

print('-'*80)
print('xml')
dfxml = spark.read.xml("/Volumes/workspace/wd36schema/ingestion_volume/target/xmlout",rowTag='customer')
dfxml.show(2)

print('-'*80)
print('orc')
dforc = spark.read.orc("/Volumes/workspace/wd36schema/ingestion_volume/target/orcout")
dforc.show(2)

print('-'*80)
print('parquet')
dfparquet = spark.read.parquet("/Volumes/workspace/wd36schema/ingestion_volume/target/parquetout")
dfparquet.show(2)

print('-'*80)
print('delta')
dfdelta = spark.read.format('delta').load("/Volumes/workspace/wd36schema/ingestion_volume/target/deltaout")
dfdelta.show(2)

## Creating and reading from delta table

In [0]:
spark.sql("create table if not exists deltatable1(id int, name string) using delta")
# workspace -> default -> ZTables -> deltatable1

In [0]:
deltatable_df = spark.read.table("workspace.default.deltatable1")
deltatable_df.show(2)

In [0]:
deltatable_df = spark.read.table("workspace.default.mobile_os_usage")
deltatable_df.show(2)

In [0]:
deltatable_df = spark.read.table("workspace.wd36schema.lh_custtbl")
deltatable_df.show(2)

### size reduction in serialized file formats


In [0]:
# txns file actual size 8.08 MB
spark.read.csv("/Volumes/workspace/default/volumewd36/txns").write.mode('overwrite').orc("/Volumes/workspace/default/volumewd36/txnsout_orc")
# after orc write size is approx 2 MB

# its suitale for BigData

#Day 70



## Read modes in csv
**Importan**
- 3 modes
- If any data challenges or malformed data such as format issue, column numbers (less/more than expected) issues, etc.

  - permissive  (default) - permits all
  - DROPMALFORMED - DONT PERMIT and drop the malformed data
  - failfast - fail and exit immediately
  

  eg:
- id int, name string, age int
- 1,irfan,43
- 2,agalya
- two,agalya,43
- 3,agalya,30,chn

path: /Volumes/workspace/default/volumewd36/malformed_data.txt

/Volumes/workspace/default/volumewd36/malformed_data_with_commnet.txt








In [0]:
struct1 = "custid int, name string, age int"
df1 = spark.read.schema(struct1).csv("/Volumes/workspace/default/volumewd36/malformed_data.txt", header=False,sep=',',mode='PERMISSIVE')
df1.show()

In [0]:
df2 = spark.read.schema(struct1).csv("/Volumes/workspace/default/volumewd36/malformed_data.txt", header=False,sep=',',mode='DROPMALFORMED')
df2.show()

In [0]:
df3 = spark.read.schema(struct1).csv("/Volumes/workspace/default/volumewd36/malformed_data.txt", header=False,sep=',',mode='failfast')
df3.show()

In [0]:
df4 = spark.read.schema(struct1).csv("/Volumes/workspace/default/volumewd36/malformed_data_with_commnet.txt", header=False,sep=',',mode='permissive',comment='#')
df4.show()

### To capture corrupted/incorrect records
#### useful for RCA root cause analysis
- need an extra column which occupuies more space
- normally "corrupted_record string"



In [0]:
struct2 = "custid int, name string, age int, corrupted_record string"
df5 = spark.read.schema(struct2).csv("/Volumes/workspace/default/volumewd36/malformed_data_with_commnet.txt", header=False,sep=',',mode='permissive',
                                     comment='#', columnNameOfCorruptRecord='corrupted_record')
df5.show()

In [0]:
# filter the records
df5.filter('corrupted_record is not null').write.csv('/Volumes/workspace/default/volumewd36/rejected',mode='overwrite')
df5_rejected = spark.read.csv("/Volumes/workspace/default/volumewd36/rejected")
df5.filter('corrupted_record is null').write.csv('/Volumes/workspace/default/volumewd36/filtered',mode='overwrite')
df5_filtered = spark.read.csv("/Volumes/workspace/default/volumewd36/filtered")

df5_rejected.show()
df5_filtered = df5_filtered.drop('_c3')
df5_filtered.show()

In [0]:
df6 = spark.read.schema(struct2).csv("/Volumes/workspace/default/volumewd36/malformed_data_with_commnet.txt", header=False,sep=',',mode='failsafe',
                                     comment='#', columnNameOfCorruptRecord='corrupted_record')
df6.show()

In [0]:
df7 = spark.read.schema(struct2).csv("/Volumes/workspace/default/volumewd36/malformed_data_with_commnet.txt", header=False,sep=',',mode='dropmalformed',
                                     comment='#', columnNameOfCorruptRecord='corrupted_record')
df7.show()

In [0]:
# /Volumes/workspace/default/volumewd36/malformed_data_with_spl_characters.txt
strct3 = 'custid int, name string, age int, corrupted_record string'
df8 = spark.read.schema(strct3).csv("/Volumes/workspace/default/volumewd36/malformed_data_with_spl_characters.txt", header=False,sep=',',mode='permissive', columnNameOfCorruptRecord='corrupted_record', quote="'", escape="\\")
df8.show()


#### Read multiline string, ignoring leading and traiing spaces, handling, NaN and NULL

In [0]:
# /Volumes/workspace/default/volumewd36/malformed_data_multi_line.txt
struct4 = 'custid int, name string, height float, joindt date'
print("no proper data handling for multi line")
df9 = spark.read.schema(struct4).csv("/Volumes/workspace/default/volumewd36/malformed_data_multi_line.txt",header=False)
df9.show()

print("multi line")
df9 = spark.read.schema(struct4).csv("/Volumes/workspace/default/volumewd36/malformed_data_multi_line.txt",header=False, mode='permissive',
                                     multiLine=True,quote="'")
df9.show(truncate=False)

print("ignoring leading and trailing white spaces")
df9 = spark.read.schema(struct4).csv("/Volumes/workspace/default/volumewd36/malformed_data_multi_line.txt",header=False, mode='permissive',
                                     multiLine=True,quote="'",ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True)
display(df9)

print("discarding nan(not a number) and null")
df9 = spark.read.schema(struct4).csv("/Volumes/workspace/default/volumewd36/malformed_data_multi_line.txt",header=False, mode='permissive',
                                     multiLine=True,quote="'",ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True,
                                     nullValue='na', nanValue=-1)
display(df9)

In [0]:
# handle single and double quotes in data
# no direct way
# /Volumes/workspace/default/volumewd36/malformed_data_quotes.txt
struct4 = 'custid int, name string, height float, joindt date'
print("no direct data handling for quotes - single ands double in single file ")
df9 = spark.read.schema(struct4).csv("/Volumes/workspace/default/volumewd36/malformed_data_quotes.txt",header=False,mode='dropmalformed',
                                     ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True,
                                     quote="'")
df9.show(truncate=False)

df9.createOrReplaceTempView("df9")
df9 = spark.sql("select * from df9")
df9.show(truncate=False)
df9 = spark.sql("""select custid,replace(name, '"','') as name, height, joindt from df9""")
df9.show(truncate=False)
