In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, LongType

import os

In [2]:
spark = (
    SparkSession
    .builder
    .appName('Schema handling')
    .getOrCreate()
)

There are two json files with incompatible Schema, the score column is Long in the first file and Double in the second:

#### Content of file-1.json:
{"user_id": 100, "score": 50} <br>
{"user_id": 200, "score": 51}

#### Content of file-2.json:
{"user_id": 300, "score": 45.0} <br>
{"user_id": 400, "score": 46.0}

In [3]:
spark.version

'2.4.5'

In [4]:
base_path = os.getcwd()

project_path = ('/').join(base_path.split('/')[0:-1]) 

data_input_path = os.path.join(project_path, 'data/incompatible-json-schema')

In [5]:
schema = StructType([
    StructField('score', LongType()),
    StructField('user_id', LongType())
])

In [6]:
# Read with default mode value (mode='PERMISSIVE')

df = spark.read.schema(schema).json(data_input_path)

In [7]:
df.printSchema()

root
 |-- score: long (nullable = true)
 |-- user_id: long (nullable = true)



In [8]:
df.show()

+-----+-------+
|score|user_id|
+-----+-------+
| null|   null|
| null|   null|
|   50|    100|
|   51|    200|
+-----+-------+



In [9]:
df.count()

4

In [10]:
df.filter(col('score').isNotNull()).count()

2

In [11]:
df.filter(col('user_id').isNotNull()).count()

4

In [12]:
df.collect()

[Row(score=None, user_id=None),
 Row(score=None, user_id=None),
 Row(score=50, user_id=100),
 Row(score=51, user_id=200)]

In [13]:
df.select('user_id').collect()

[Row(user_id=300), Row(user_id=400), Row(user_id=100), Row(user_id=200)]

In [14]:
df.select('score').collect()

[Row(score=None), Row(score=None), Row(score=50), Row(score=51)]

In [15]:
# Read with mode value DROPMALFORMED

df = (
    spark.read
    .schema(schema)
    .option('mode', 'DROPMALFORMED')
    .json(data_input_path)
)

In [16]:
df.printSchema()

root
 |-- score: long (nullable = true)
 |-- user_id: long (nullable = true)



In [17]:
df.show()

+-----+-------+
|score|user_id|
+-----+-------+
|   50|    100|
|   51|    200|
+-----+-------+



In [18]:
df.count()

4

In [19]:
df.filter(col('score').isNotNull()).count()

2

In [20]:
df.filter(col('user_id').isNotNull()).count()

4

In [21]:
df.collect()

[Row(score=50, user_id=100), Row(score=51, user_id=200)]

In [22]:
df.select('user_id').collect()

[Row(user_id=300), Row(user_id=400), Row(user_id=100), Row(user_id=200)]

In [23]:
df.select('score').collect()

[Row(score=50), Row(score=51)]

In [24]:
df.select('user_id').show()

+-------+
|user_id|
+-------+
|    300|
|    400|
|    100|
|    200|
+-------+



The DROPMALFORMED mode leads to confusing situations, since the total count returns 4 however the collected list has only two records. Similarly the show funtions displays only 2 records, however when we select only user_id it displays for records.

In [25]:
# Read with mode value FAILFAST

df = (
    spark.read
    .schema(schema)
    .option('mode', 'FAILFAST')
    .json(data_input_path)
)

In [26]:
df.count()

4

In [27]:
df.printSchema()

root
 |-- score: long (nullable = true)
 |-- user_id: long (nullable = true)



In [28]:
# This will throw Py4JavaError
# Py4JJavaError: An error occurred while calling o91.showString.
# : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 26.0 failed 1 times, 
# most recent failure: Lost task 0.0 in stage 26.0 (TID 39, 192.168.0.12, executor driver): 
# org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. 
# To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
                            

# df.show()

In [29]:
df.select('user_id').show()

+-------+
|user_id|
+-------+
|    300|
|    400|
|    100|
|    200|
+-------+



## Parquet format
<br>
Let's now see the behaviour for the same data but in parquet format

In [30]:
data_input_path = os.path.join(project_path, 'data/incompatible-parquet-schema')

In [35]:
df = spark.read.schema(schema).parquet(data_input_path)

In [37]:
df.count()

4

In [39]:
# This will throw Py4JavaError:
# Py4JJavaError: An error occurred while calling o104.showString.
# : org.apache.spark.SparkException: Job aborted due to stage failure: 
# Task 0 in stage 32.0 failed 1 times, most recent failure: Lost task 0.0 in stage 32.0 (TID 46, localhost, 
# executor driver): org.apache.spark.sql.execution.QueryExecutionException: 
# Parquet column cannot be converted in file ..file_name... Column: [score], Expected: bigint, Found: DOUBLE
                                        

# df.show()

In [40]:
# selecting the column with the correct data type works fine
# spark is doing column pruning and looks only on the user_id col

df.select('user_id').show()

+-------+
|user_id|
+-------+
|    300|
|    400|
|    100|
|    200|
+-------+



In [41]:
df = spark.read.parquet(data_input_path)

In [42]:
df.count()

4

In [43]:
# This will throw Py4JavaError:
# Py4JJavaError: An error occurred while calling o104.showString.
# : org.apache.spark.SparkException: Job aborted due to stage failure: 
# Task 0 in stage 32.0 failed 1 times, most recent failure: Lost task 0.0 in stage 32.0 (TID 46, localhost, 
# executor driver): org.apache.spark.sql.execution.QueryExecutionException: 
# Parquet column cannot be converted in file ..file_name... Column: [score], Expected: double, Found: INT64
                                        

# df.show()

In [44]:
# the same behaviour as in the case when we provided the schema
df.select('user_id').show()

+-------+
|user_id|
+-------+
|    300|
|    400|
|    100|
|    200|
+-------+

