Vamos a usar un fichero de ejemplo que ya está almacenado en databricks. Podriamos subir otro fichero si lo deseamos



In [0]:
df = spark.read.format("csv") \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

display(df)

Al usar la opcion `.option("inferSchema", "false")`, no intentara inferir el schema y todas las columnas son leidas como Strings

Todas las opciones para el formato `csv`: https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option

In [0]:
df.printSchema()

Si se lo indicamos spark intentará inferir los tipos.

Veamos que tal funciona

In [0]:
df_infered_types = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("sep", ",") \
  .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

df_infered_types.printSchema()

display(df_infered_types)

Lo ha hecho bastante bien, ha identificado correctamente 'Call Date' y 'Watch Date' como fechas pero no ha podido identificar 'Available DtTm' como timestamp (fecha+hora)

La inferencia de tipos es muy útil pero en muchas ocasiones necesitaremos especificar el esquema para un correcto parseo de los datos. Siempre será más interesante hacerlo ya en la carga que implementar transformaciones a posteriori.

Como hacerlo? Existen dos opciones para espcificar esquemas en Spark:

* Usando StructType 
* Usando DDL

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, TimestampType, IntegerType, BooleanType, DoubleType

valid_schema = StructType(
    [
        StructField('Call Number', IntegerType(), True), 
        StructField('Unit ID', StringType(), True), 
        StructField('Incident Number', IntegerType(), True), 
        StructField('CallType', StringType(), True), 
        StructField('Call Date', DateType(), True), 
        StructField('Watch Date', DateType(), True), 
        StructField('Call Final Disposition', StringType(), True), 
        StructField('Available DtTm', TimestampType(), True),
        StructField('Address', StringType(), True), 
        StructField('City', StringType(), True), 
        StructField('Zipcode of Incident', IntegerType(), True), 
        StructField('Battalion', StringType(), True), 
        StructField('Station Area', StringType(), True), 
        StructField('Box', StringType(), True), 
        StructField('OrigPriority', StringType(), True), 
        StructField('Priority', StringType(), True), 
        StructField('Final Priority', IntegerType(), True), 
        StructField('ALS Unit', BooleanType(), True), 
        StructField('Call Type Group', StringType(), True), 
        StructField('NumAlarms', IntegerType(), True), 
        StructField('UnitType', StringType(), True), 
        StructField('Unit sequence in call dispatch', IntegerType(), True), 
        StructField('Fire Prevention District', StringType(), True), 
        StructField('Supervisor District', StringType(), True), 
        StructField('Neighborhood', StringType(), True), 
        StructField('Location', StringType(), True), 
        StructField('RowID', StringType(), True), 
        StructField('Delay', DoubleType(), True)
        ]
    )

df_explicit_types_using_structtype = spark.read.format("csv") \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .schema(valid_schema) \
  .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

df_explicit_types_using_structtype.printSchema()

display(df_explicit_types_using_structtype)

Hay un problema, no ha sido capaz de parsear correctamente las fechas.

Se debe a que el formato de la fecha es el formato por defecto, debemos especifirlo con `.option("dateFormat", "dd/MM/yyyy")` y lo mismo sucede con el timestamp, ejemplo: `01/11/2002 01:51:54 AM`, patron correspondiente `MM/dd/yyyy hh:mm:ss a`

Como definr los patrones: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html

In [0]:
df_explicit_types_using_structtype = spark.read.format("csv") \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .option("dateFormat", "MM/dd/yyyy") \
  .option("timestampFormat", "MM/dd/yyyy hh:mm:ss a") \
  .schema(valid_schema) \
  .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

df_explicit_types_using_structtype.printSchema()

display(df_explicit_types_using_structtype)

Tambien se puede definir el esquema con DLL, misma sintaxis que usariamos en un CREATE TABLE. Un string con los nombres de las columnas y su tipo separados por coma

In [0]:
valid_schema_ddl = """Call_Number INTEGER,
    Unit_ID STRING,
    Incident_Number INTEGER,
    CallType STRING,
    Call_Date DATE,
    Watch_Date DATE,
    Call_Final_Disposition STRING,
    Available_DtTm TIMESTAMP,
    Address STRING,
    City STRING,
    Zipcode_of_Incident INTEGER,
    Battalion STRING,
    Station_Area STRING,
    Box STRING,
    OrigPriority STRING,
    Priority STRING,
    Final_Priority INTEGER,
    ALS_Unit BOOLEAN,
    Call_Type_Group STRING,
    NumAlarms INTEGER,
    UnitType STRING,
    Unit_sequence_in_call_dispatch INTEGER,
    Fire_Prevention_District STRING,
    Supervisor_District STRING,
    Neighborhood STRING,
    Location STRING,
    RowID STRING,
    Delay DOUBLE"""

## Modos de lectura

Probemos los distintos modos dorzando un error al no espcificar los patrones

In [0]:
df_failfast = spark.read.format("csv") \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .option("mode", "FAILFAST") \
  .schema(valid_schema_ddl) \
  .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

display(df_failfast)

In [0]:
df_permissive = spark.read.format("csv") \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .option("mode", "PERMISSIVE") \
  .schema(valid_schema_ddl) \
  .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

display(df_permissive)

In [0]:
df_dropmalformed = spark.read.format("csv") \
  .option("inferSchema", "false") \
  .option("header", "true") \
  .option("sep", ",") \
  .option("mode", "DROPMALFORMED") \
  .schema(valid_schema_ddl) \
  .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

display(df_dropmalformed)