In [1]:
import sys
import os
sys.path.append(os.getenv("PYTHONPATH", "/app")) #REVIEW
from utils import Utils

In [2]:
utils = Utils()
spark = utils.get_spark_session()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/10 10:05:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
# To read data from a file, the read.format().load() function can be used. It is a built-in method in PySpark’s DataFrameReader class that provides a
# generic interface to load data from various sources:

emp_df = spark.read.format("csv").load("/app/inputs/emp.csv") # REVIEW - I want to feed a relative path regardless of where I am running it

In [None]:
emp_df.printSchema()

In [14]:
# Options can be passed to read column names from the header (header=True) and to run throgh some examples of the data and infer types for the columns (inferSchema=True)
emp_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("/app/inputs/emp.csv")

In [None]:
emp_df.printSchema()

In [None]:
emp_df.show()

In [23]:
# It is possible to specify the schema when loading data from a file, which will prevent job executions to collect the metadata:
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"

emp_df_schema = spark.read.format("csv").option("header", True).schema(_schema).load("/app/inputs/emp.csv") # If the header option is not passed as true, spark will read the header
# as a register

In [None]:
emp_df_schema.printSchema()

In [None]:
emp_df_schema.show()

In [None]:
# Spark CSV documentation: https://spark.apache.org/docs/latest/sql-data-sources-csv.html

In [None]:
# emp_new is the same dataset as emp with some corrupted data:
_schema = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date"

spark.read.format("csv").option("header", True).schema(_schema).load("/app/inputs/emp_new.csv").show()
# There is a non double values in the 'salary' column and a non date value in the hire_date column
# This is the default mode (PERMISSIVE), which allows for malformed fields setting them to null

In [None]:
# Spark provides a metadata column of type string to access corrupted data in permissive mode, which is called '_corrupt_record':
_schema_corr = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date, _corrupt_record string"

spark.read.format("csv").option("header", True).schema(_schema_corr).load("/app/inputs/emp_new.csv").show()

In [37]:
# To alias the corrupt_record column, spark provides the columnNameOfCorruptRecord option
# and to avoid calling option too many times, the options method can be called with a dictionary:

options_dict = {
    "header": True,
    "columnNameOfCorruptRecord": "corr_record"
}

_schema_corr2 = "employee_id int, department_id int, name string, age int, gender string, salary double, hire_date date, corr_record string"
corrupt_emp = spark.read.format("csv").options(**options_dict).schema(_schema_corr2).load("/app/inputs/emp_new.csv")

# The records where there are corrupted values can be filtered:
corrupt_emp.where("corr_record is not null").show()

+-----------+-------------+-------------+---+------+-------+----------+--------------------+
|employee_id|department_id|         name|age|gender| salary| hire_date|         corr_record|
+-----------+-------------+-------------+---+------+-------+----------+--------------------+
|          7|          101|James Johnson| 42|  Male|   NULL|2012-03-15|007,101,James Joh...|
|         11|          104|   David Park| 38|  Male|65000.0|      NULL|011,104,David Par...|
+-----------+-------------+-------------+---+------+-------+----------+--------------------+



In [None]:
# When the mode is set to DROPMALFORMED, spark ignores (drops) the whole corrupted records, which is the same of running permissive mode and filtering .where("corr_record is null"):

options_dict = {
    "header": True,
    "mode": "DROPMALFORMED"
}

spark.read.format("csv").options(**options_dict).schema(_schema).load("/app/inputs/emp_new.csv").show()

In [None]:
# With mode set to FAILFAST, Spark throws an exception when it meets corrupted records:
options_dict = {
    "header": True,
    "mode": "FAILFAST"
}
spark.read.format("csv").option("header", True).option("mode", "FAILFAST").schema(_schema).load("/app/inputs/emp_new.csv").show()