# Creating DataFrameReader

In [0]:
type(spark)

In [0]:
dfr = spark.read
print(type(dfr))

# Create DataFrame from different sources

## Create DataFrame from CSV

In [0]:
df = dfr.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True # getting the correct schema from csv
)
df.printSchema()
df.display()

# Create DataFrame from Delimited file

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.dat",
    header=True,
    inferSchema=True,
    sep="|" # for dat format
)
df.printSchema()
df.display()
 

# Create DataFrame from Delimited file

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_004.json",
    multiLine=True, # for lines in JSON
).display()

# Custom Schema

In [0]:
from pyspark.sql.types import *
 
USER_SCHEMA = StructType(
    [
        StructField("id", IntegerType()),
        StructField("age", IntegerType()),
        StructField("gen", StringType()),
        StructField("designation", StringType()),
        StructField("salary", IntegerType()),
    ]
)
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    schema=USER_SCHEMA
)
df.printSchema()
df.display()

# Handling bad records

1. PERMISSIVE (Default)
2. DROPMALFORMED
3. FAILFAST

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="PERMISSIVE"
).display()

# DROPMALFORMED

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="DROPMALFORMED"
).display()

# FAILFAST

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="FAILFAST"
).display()

# DATAFRAME WRITE API

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True,
)
print(type(df.write))
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
)

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json"
).show()

# Output modes
errorifexists
overwrite
append
ignore

In [0]:
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
    mode="ignore"
)