# Create DataFrame Reader

In [0]:
type(spark)

In [0]:
dfr = spark.read
print (type(dfr))

# Creating DataFrame from different Source

## Create DataFrame from CSV

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True  # inferschema shows the particular datatype of each column in the the datasource
)
df.printSchema()
df.display()  # display data in the form of rows and columns

## Create DataFrame from Delimiter


In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True,
    sep=" "  
)
df.printSchema()
df.display()  # display data in the form of rows and columns

## Create DataFrame from JSON

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_004.json",
    multiLine=True,
).display()

# Custom Schema

In [0]:
# To avoid the usage of inferschema as inferSchema is very expensive, we use structType to define the data type of each column

from pyspark.sql.types import *
 
USER_SCHEMA = StructType(
    [
        StructField("id", IntegerType()), #StructField is each column name
        StructField("age", IntegerType()),
        StructField("gen", StringType()),
        StructField("designation", StringType()),
        StructField("salary", IntegerType()),
    ]
)
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    schema=USER_SCHEMA
)
df.printSchema()
df.display()
 

# Handling Bad Records

There are three modes:
1. PERMISSIVE -  Here bad record automatically goes to a column called _corrupt_record (It is a default mode)
2. DROPMALFORMED - it will drop the bad record
2. FAILFAST - it will not allow to create dataframe, it will just show out the fail message

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="PERMISSIVE"
).display()

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="DROPMALFORMED"
).display()

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="FAILFAST"
).display()

# DataFrame Writer API


## Convert from CSV to JSON

In [0]:
#Here we are reading the data in the form of CSV and writing it in the form of JSON using df.write function
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True,
)
print(type(df.write))
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
)
 
 

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json"
).show()

# Output Modes

### errorifexists

In [0]:
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
)

### overwrite

In [0]:
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
    mode="overwrite"
)

### append 
New path is append with the existing path

In [0]:
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
    mode="append"
)

### ignore

In [0]:
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
    mode="ignore"
)