In [0]:
%run /Workspace/Users/simon.nagy@hiflylabs.com/.bundle/dbxSandbox/dev/files/sandboxProject/includeFiles/Classroom-Setup

## Reading CSV files
1. path definition
2. constructing dataframe
3. using manual schema definition
4. using DDL schema definition

In [0]:
source_csv_path = f"{datasets_dir}/users/users-500k.csv"

In [0]:
df = (
    spark
    .read
    .option("sep", "\t") #separator is a tab
    .option("header", True)
    .option("inferSchema", True)
    .csv(source_csv_path)
)

df.head(5)

Creating the schema by hand, using StructType with column names, and data types.

In [0]:
from pyspark.sql.types import LongType, StringType, StructType, StructField

schema_definition = StructType([
  StructField("user_id", StringType(), True),
  StructField("user_first_touch_timestamp", LongType(), True),
  StructField("email", StringType(), True)
])

df = (
  spark
  .read
  .option("sep", "\t")
  .option("header", True)
  .schema(schema_definition) # <= import schema defined above
  .csv(source_csv_path)
)

# the above method does not have a job invoked; thus the performance should be better

DDL (data definition language) schema can be defined as wel

In [0]:
ddl_schema = "user_id string, user_first_touch_timestamp long, email string"

df = (
  spark
  .read
  .option("sep", "\t")
  .option("header", True)
  .schema(ddl_schema)
  .csv(source_csv_path)
)

## Reading JSON files

In [0]:
source_json_path = f"{datasets_dir}/events/events-500k.json"

In [0]:
df = (
    spark
    .read
    .option("inferSchema", True)
    .json(source_json_path)
)

df.printSchema()

Schema can be hardcoded, similarly to the CSV file; no jobs will be spawned then, so operation will be optimal.
Also, with scala you may print the schema and create a DDL declaration.

In [0]:
display(df)

## Data Frame writing
1. writing into a directory, by naming PATH, and saving as parquet
2. writing into a table, using `saveAsTable()`

In [0]:
output_dir = working_dir + "output.parquet"

(df
 .write
 .option("compression", "snappy")
 .mode("overwrite")
 .parquet(output_dir)
)

In [0]:
display(
    dbutils.fs.ls(output_dir)
)

In [0]:
df.write.mode("overwrite").saveAsTable("outputTable")
print(database_name)

In [0]:
%sql

select * from outputTable

## Writing results to Delta Table
Output directory path must be a delta "folder"

In [0]:
output_dir = working_dir + "/delta/events"

(df
 .write
 .format("delta")
 .mode("overwrite")
 .save(output_dir))

In [0]:
classroom_cleanup()