In [0]:
print("Welcome to learn Databricks with fun!")

# Create a DataFrame Reader

In [0]:
type(spark)

In [0]:
dfr = spark.read
type(dfr)

# Create DataFrame from different sources

### Create DataFrame from CSV

In [0]:
df = spark.read.csv(
    "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    inferSchema=True
)
df.printSchema()
df.display()

### Create DataFrame from DAT

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.dat",
    header=True,
    inferSchema=True,
    sep="|"
)
df.printSchema()
df.display()
 

### Create DataFrame from JSON

In [0]:
df = spark.read.json(
    "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.json",
    multiLine=True
)
df.printSchema()
df.display()

# Custom Schema

In [0]:
from pyspark.sql.types import *

USER_SCHEMA =  StructType([
  StructField("id", IntegerType()),
  StructField("age", IntegerType()),
  StructField("gender", StringType()),
  StructField("designationdes", StringType()),
  StructField("salary", IntegerType())
])

df = spark.read.csv(
    "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    schema=USER_SCHEMA
)
df.printSchema()
df.display()

### How to handle bed Records?

In [0]:
spark.read.json("/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json").display()

#### Permissive
- Spark puts the whole row into a special column called _corrupt_record and continues processing.

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="PERMISSIVE",
).display()

#### DROPMALFORMED
- Spark drops any row that has bad or corrupted data. keeps only clean data.

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="DROPMALFORMED",
).display()

#### FAILFAST
- Spark stops immediately when it finds the first corrupted/bad row. Throws an error and does not load the data.

In [0]:
spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/access_logs.json",
    mode="FAILFAST",
).display()

# DataFrame Writer API

### Convert `CSV` into `JSON`.

In [0]:
df = spark.read.csv(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/sample_dataset/users_001.csv",
    header=True,
    schema=USER_SCHEMA,
)

In [0]:
print(type(df.write))

In [0]:
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
)

In [0]:
spark.read.json(path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json").display()

### Output modes

##### Error-if-exist

In [0]:
df.write.save(
    format="JSON",
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/output_json",
    mode="errorifexists"
)
df.explain(True)