In [2]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('read-avro-attempts')
         # Add AVRO package
         .config("spark.jars.packages", "org.apache.spark:spark-avro_2.11:2.4.5")
         .getOrCreate())
sc = spark.sparkContext

# Generate test data

In [1]:
import datetime

# Set up parameters
file_format = "avro"
entity = "test_data_{}".format(file_format)
data_path = "/home/jovyan/work/spark-data/raw/{}".format(entity)
num_rows = 10

%run ./modules/01_create_simple_schema.ipynb 
gen_data_simple_schema(data_path, datetime.date(2020,1,1), num_rows, file_format)

%run ./modules/02_add_nested_structure_to_schema.ipynb
gen_data_add_nested_struct(data_path, datetime.date(2020,2,1), num_rows, file_format)

%run ./modules/03_add_column_to_schema.ipynb
gen_data_add_columns(data_path, datetime.date(2020,3,1), num_rows, file_format)

%run ./modules/04_change_datatype_add_struct.ipynb
gen_data_change_datatype_add_struct(data_path, datetime.date(2020,4,1), num_rows, file_format)

%run ./modules/05_change_column_name.ipynb
gen_data_change_column_name(data_path, datetime.date(2020,5,1), num_rows, file_format)

%run ./modules/06_remove_column.ipynb
gen_data_remove_column(data_path, datetime.date(2020,6,1), num_rows, file_format)

Partition created: /home/jovyan/work/spark-data/raw/test_data_avro/date=2020-01-01
# Rows: 10
Schema:
root
 |-- identifier: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- date: date (nullable = true)



Partition created: /home/jovyan/work/spark-data/raw/test_data_avro/date=2020-02-01
# Rows: 10
Schema:
root
 |-- identifier: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- postal_code: string (nullable = true)
 |-- date: date (nullable = true)



Partition created: /home/jovyan/work/spark-data/

# Attempt 1 - Try to read all partitions without schema merge option

In [2]:
df_at1 = spark.read.format(file_format).load(data_path)

In [3]:
df_at1.printSchema()

root
 |-- identifier: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- postal_code: string (nullable = true)
 |-- date: date (nullable = true)



In [4]:
from pyspark.sql.functions import col

df_at1.groupBy(col("date")).count().sort(col("date")).show()

+----------+-----+
|      date|count|
+----------+-----+
|2020-01-01|   10|
|2020-02-01|   10|
|2020-03-01|   10|
|2020-04-01|   10|
|2020-05-01|   10|
|2020-06-01|   10|
+----------+-----+



In [5]:
df_at1.show()

Py4JJavaError: An error occurred while calling o1032.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 273, localhost, executor driver): org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro to catalyst because schema at path address.postal_code is not compatible (avroType = "int", sqlType = StringType).
Source Avro schema: {"type":"record","name":"topLevelRecord","fields":[{"name":"identifier","type":["string","null"]},{"name":"first_name","type":["string","null"]},{"name":"last_name","type":["string","null"]},{"name":"occupation","type":["string","null"]},{"name":"age","type":["int","null"]},{"name":"address","type":[{"type":"record","name":"address","namespace":"topLevelRecord","fields":[{"name":"address_details","type":[{"type":"record","name":"address_details","namespace":"topLevelRecord.address","fields":[{"name":"street","type":[{"type":"record","name":"street","namespace":"topLevelRecord.address.address_details","fields":[{"name":"street_name","type":["string","null"]},{"name":"latitude","type":["float","null"]},{"name":"longitude","type":["float","null"]}]},"null"]},{"name":"number","type":["int","null"]}]},"null"]},{"name":"city","type":["string","null"]},{"name":"country","type":["string","null"]},{"name":"country_code","type":["string","null"]},{"name":"state","type":["string","null"]},{"name":"postal_code","type":["int","null"]}]},"null"]},{"name":"title","type":["string","null"]}]}.
Target Catalyst type: StructType(StructField(identifier,StringType,true), StructField(first_name,StringType,true), StructField(last_name,StringType,true), StructField(occupation,StringType,true), StructField(age,IntegerType,true), StructField(address,StructType(StructField(address,StringType,true), StructField(city,StringType,true), StructField(country,StringType,true), StructField(state,StringType,true), StructField(postal_code,StringType,true)),true))
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:275)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:161)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:53)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:193)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:156)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro to catalyst because schema at path address.postal_code is not compatible (avroType = "int", sqlType = StringType).
Source Avro schema: {"type":"record","name":"topLevelRecord","fields":[{"name":"identifier","type":["string","null"]},{"name":"first_name","type":["string","null"]},{"name":"last_name","type":["string","null"]},{"name":"occupation","type":["string","null"]},{"name":"age","type":["int","null"]},{"name":"address","type":[{"type":"record","name":"address","namespace":"topLevelRecord","fields":[{"name":"address_details","type":[{"type":"record","name":"address_details","namespace":"topLevelRecord.address","fields":[{"name":"street","type":[{"type":"record","name":"street","namespace":"topLevelRecord.address.address_details","fields":[{"name":"street_name","type":["string","null"]},{"name":"latitude","type":["float","null"]},{"name":"longitude","type":["float","null"]}]},"null"]},{"name":"number","type":["int","null"]}]},"null"]},{"name":"city","type":["string","null"]},{"name":"country","type":["string","null"]},{"name":"country_code","type":["string","null"]},{"name":"state","type":["string","null"]},{"name":"postal_code","type":["int","null"]}]},"null"]},{"name":"title","type":["string","null"]}]}.
Target Catalyst type: StructType(StructField(identifier,StringType,true), StructField(first_name,StringType,true), StructField(last_name,StringType,true), StructField(occupation,StringType,true), StructField(age,IntegerType,true), StructField(address,StructType(StructField(address,StringType,true), StructField(city,StringType,true), StructField(country,StringType,true), StructField(state,StringType,true), StructField(postal_code,StringType,true)),true))
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:275)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:161)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:53)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:193)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:156)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


**Conclusion:** It can read some columns but cannot deal with different datatypes in postal_code. Error message: *org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro to catalyst because schema at path address.postal_code is not compatible (avroType = "int", sqlType = StringType).*

# Attempt 2 - Try to read all partitions with schema merge option

In [6]:
df_at2 = spark.read.option("mergeSchema", "true").format(file_format).load(data_path)

In [7]:
df_at2.printSchema()

root
 |-- identifier: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- postal_code: string (nullable = true)
 |-- date: date (nullable = true)



In [8]:
from pyspark.sql.functions import col

df_at2.groupBy(col("date")).count().sort(col("date")).show()

+----------+-----+
|      date|count|
+----------+-----+
|2020-01-01|   10|
|2020-02-01|   10|
|2020-03-01|   10|
|2020-04-01|   10|
|2020-05-01|   10|
|2020-06-01|   10|
+----------+-----+



In [9]:
df_at2.show()

Py4JJavaError: An error occurred while calling o1091.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 475, localhost, executor driver): org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro to catalyst because schema at path address.postal_code is not compatible (avroType = "int", sqlType = StringType).
Source Avro schema: {"type":"record","name":"topLevelRecord","fields":[{"name":"identifier","type":["string","null"]},{"name":"first_name","type":["string","null"]},{"name":"last_name","type":["string","null"]},{"name":"occupation","type":["string","null"]},{"name":"age","type":["int","null"]},{"name":"address","type":[{"type":"record","name":"address","namespace":"topLevelRecord","fields":[{"name":"address_details","type":[{"type":"record","name":"address_details","namespace":"topLevelRecord.address","fields":[{"name":"street","type":[{"type":"record","name":"street","namespace":"topLevelRecord.address.address_details","fields":[{"name":"street_name","type":["string","null"]},{"name":"latitude","type":["float","null"]},{"name":"longitude","type":["float","null"]}]},"null"]},{"name":"number","type":["int","null"]}]},"null"]},{"name":"city","type":["string","null"]},{"name":"country","type":["string","null"]},{"name":"country_code","type":["string","null"]},{"name":"state","type":["string","null"]},{"name":"postal_code","type":["int","null"]}]},"null"]},{"name":"title","type":["string","null"]}]}.
Target Catalyst type: StructType(StructField(identifier,StringType,true), StructField(first_name,StringType,true), StructField(last_name,StringType,true), StructField(occupation,StringType,true), StructField(age,IntegerType,true), StructField(address,StructType(StructField(address,StringType,true), StructField(city,StringType,true), StructField(country,StringType,true), StructField(state,StringType,true), StructField(postal_code,StringType,true)),true))
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:275)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:161)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:53)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:193)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:156)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro to catalyst because schema at path address.postal_code is not compatible (avroType = "int", sqlType = StringType).
Source Avro schema: {"type":"record","name":"topLevelRecord","fields":[{"name":"identifier","type":["string","null"]},{"name":"first_name","type":["string","null"]},{"name":"last_name","type":["string","null"]},{"name":"occupation","type":["string","null"]},{"name":"age","type":["int","null"]},{"name":"address","type":[{"type":"record","name":"address","namespace":"topLevelRecord","fields":[{"name":"address_details","type":[{"type":"record","name":"address_details","namespace":"topLevelRecord.address","fields":[{"name":"street","type":[{"type":"record","name":"street","namespace":"topLevelRecord.address.address_details","fields":[{"name":"street_name","type":["string","null"]},{"name":"latitude","type":["float","null"]},{"name":"longitude","type":["float","null"]}]},"null"]},{"name":"number","type":["int","null"]}]},"null"]},{"name":"city","type":["string","null"]},{"name":"country","type":["string","null"]},{"name":"country_code","type":["string","null"]},{"name":"state","type":["string","null"]},{"name":"postal_code","type":["int","null"]}]},"null"]},{"name":"title","type":["string","null"]}]}.
Target Catalyst type: StructType(StructField(identifier,StringType,true), StructField(first_name,StringType,true), StructField(last_name,StringType,true), StructField(occupation,StringType,true), StructField(age,IntegerType,true), StructField(address,StructType(StructField(address,StringType,true), StructField(city,StringType,true), StructField(country,StringType,true), StructField(state,StringType,true), StructField(postal_code,StringType,true)),true))
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:275)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:161)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:53)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:193)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:156)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


**Conclusion:** Same error as attempt 1: it can read some columns but cannot deal with different datatypes in postal_code. Error message: *org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro to catalyst because schema at path address.postal_code is not compatible (avroType = "int", sqlType = StringType).*

# Attempt 3 - Try to read forcing a schema on read

In [10]:
from pyspark.sql.types import StructType
import json

schema_json = '{"fields":[{"metadata":{},"name":"address","nullable":true,"type":{"fields":[{"metadata":{},"name":"address","nullable":true,"type":"string"},{"metadata":{},"name":"address_details","nullable":true,"type":{"fields":[{"metadata":{},"name":"number","nullable":true,"type":"string"},{"metadata":{},"name":"street","nullable":true,"type":{"fields":[{"metadata":{},"name":"lat","nullable":true,"type":"string"},{"metadata":{},"name":"latitude","nullable":true,"type":"string"},{"metadata":{},"name":"long","nullable":true,"type":"string"},{"metadata":{},"name":"longitude","nullable":true,"type":"string"},{"metadata":{},"name":"street_name","nullable":true,"type":"string"}],"type":"struct"}}],"type":"struct"}},{"metadata":{},"name":"city","nullable":true,"type":"string"},{"metadata":{},"name":"country","nullable":true,"type":"string"},{"metadata":{},"name":"country_code","nullable":true,"type":"string"},{"metadata":{},"name":"postal_code","nullable":true,"type":"string"},{"metadata":{},"name":"state","nullable":true,"type":"string"}],"type":"struct"}},{"metadata":{},"name":"age","nullable":true,"type":"string"},{"metadata":{},"name":"date","nullable":true,"type":"string"},{"metadata":{},"name":"first_name","nullable":true,"type":"string"},{"metadata":{},"name":"identifier","nullable":true,"type":"string"},{"metadata":{},"name":"last_name","nullable":true,"type":"string"},{"metadata":{},"name":"occupation","nullable":true,"type":"string"},{"metadata":{},"name":"title","nullable":true,"type":"string"},{"metadata":{},"name":"title_name","nullable":true,"type":"string"}],"type":"struct"}'
schema = StructType.fromJson(json.loads(schema_json))

In [11]:
schema

StructType(List(StructField(address,StructType(List(StructField(address,StringType,true),StructField(address_details,StructType(List(StructField(number,StringType,true),StructField(street,StructType(List(StructField(lat,StringType,true),StructField(latitude,StringType,true),StructField(long,StringType,true),StructField(longitude,StringType,true),StructField(street_name,StringType,true))),true))),true),StructField(city,StringType,true),StructField(country,StringType,true),StructField(country_code,StringType,true),StructField(postal_code,StringType,true),StructField(state,StringType,true))),true),StructField(age,StringType,true),StructField(date,StringType,true),StructField(first_name,StringType,true),StructField(identifier,StringType,true),StructField(last_name,StringType,true),StructField(occupation,StringType,true),StructField(title,StringType,true),StructField(title_name,StringType,true)))

In [12]:
from pyspark.sql.functions import explode

df_at3 = spark.read.schema(schema).option("mergeSchema", "true").format(file_format).load(data_path)

In [13]:
df_at3.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- address_details: struct (nullable = true)
 |    |    |-- number: string (nullable = true)
 |    |    |-- street: struct (nullable = true)
 |    |    |    |-- lat: string (nullable = true)
 |    |    |    |-- latitude: string (nullable = true)
 |    |    |    |-- long: string (nullable = true)
 |    |    |    |-- longitude: string (nullable = true)
 |    |    |    |-- street_name: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- country_code: string (nullable = true)
 |    |-- postal_code: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- age: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- identifier: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- title: string (nullable = true)
 |-- title_name: string (nullab

In [14]:
df_at3.show()

Py4JJavaError: An error occurred while calling o1161.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 24.0 failed 1 times, most recent failure: Lost task 0.0 in stage 24.0 (TID 476, localhost, executor driver): org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro to catalyst because schema at path address.address_details.number is not compatible (avroType = "int", sqlType = StringType).
Source Avro schema: {"type":"record","name":"topLevelRecord","fields":[{"name":"identifier","type":["string","null"]},{"name":"first_name","type":["string","null"]},{"name":"last_name","type":["string","null"]},{"name":"occupation","type":["string","null"]},{"name":"age","type":["int","null"]},{"name":"address","type":[{"type":"record","name":"address","namespace":"topLevelRecord","fields":[{"name":"address_details","type":[{"type":"record","name":"address_details","namespace":"topLevelRecord.address","fields":[{"name":"street","type":[{"type":"record","name":"street","namespace":"topLevelRecord.address.address_details","fields":[{"name":"street_name","type":["string","null"]},{"name":"latitude","type":["float","null"]},{"name":"longitude","type":["float","null"]}]},"null"]},{"name":"number","type":["int","null"]}]},"null"]},{"name":"city","type":["string","null"]},{"name":"country","type":["string","null"]},{"name":"country_code","type":["string","null"]},{"name":"state","type":["string","null"]},{"name":"postal_code","type":["int","null"]}]},"null"]},{"name":"title","type":["string","null"]}]}.
Target Catalyst type: StructType(StructField(address,StructType(StructField(address,StringType,true), StructField(address_details,StructType(StructField(number,StringType,true), StructField(street,StructType(StructField(lat,StringType,true), StructField(latitude,StringType,true), StructField(long,StringType,true), StructField(longitude,StringType,true), StructField(street_name,StringType,true)),true)),true), StructField(city,StringType,true), StructField(country,StringType,true), StructField(country_code,StringType,true), StructField(postal_code,StringType,true), StructField(state,StringType,true)),true), StructField(age,StringType,true), StructField(first_name,StringType,true), StructField(identifier,StringType,true), StructField(last_name,StringType,true), StructField(occupation,StringType,true), StructField(title,StringType,true), StructField(title_name,StringType,true))
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:275)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:161)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:161)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:53)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:193)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:156)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro to catalyst because schema at path address.address_details.number is not compatible (avroType = "int", sqlType = StringType).
Source Avro schema: {"type":"record","name":"topLevelRecord","fields":[{"name":"identifier","type":["string","null"]},{"name":"first_name","type":["string","null"]},{"name":"last_name","type":["string","null"]},{"name":"occupation","type":["string","null"]},{"name":"age","type":["int","null"]},{"name":"address","type":[{"type":"record","name":"address","namespace":"topLevelRecord","fields":[{"name":"address_details","type":[{"type":"record","name":"address_details","namespace":"topLevelRecord.address","fields":[{"name":"street","type":[{"type":"record","name":"street","namespace":"topLevelRecord.address.address_details","fields":[{"name":"street_name","type":["string","null"]},{"name":"latitude","type":["float","null"]},{"name":"longitude","type":["float","null"]}]},"null"]},{"name":"number","type":["int","null"]}]},"null"]},{"name":"city","type":["string","null"]},{"name":"country","type":["string","null"]},{"name":"country_code","type":["string","null"]},{"name":"state","type":["string","null"]},{"name":"postal_code","type":["int","null"]}]},"null"]},{"name":"title","type":["string","null"]}]}.
Target Catalyst type: StructType(StructField(address,StructType(StructField(address,StringType,true), StructField(address_details,StructType(StructField(number,StringType,true), StructField(street,StructType(StructField(lat,StringType,true), StructField(latitude,StringType,true), StructField(long,StringType,true), StructField(longitude,StringType,true), StructField(street_name,StringType,true)),true)),true), StructField(city,StringType,true), StructField(country,StringType,true), StructField(country_code,StringType,true), StructField(postal_code,StringType,true), StructField(state,StringType,true)),true), StructField(age,StringType,true), StructField(first_name,StringType,true), StructField(identifier,StringType,true), StructField(last_name,StringType,true), StructField(occupation,StringType,true), StructField(title,StringType,true), StructField(title_name,StringType,true))
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:275)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:161)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.org$apache$spark$sql$avro$AvroDeserializer$$newWriter(AvroDeserializer.scala:161)
	at org.apache.spark.sql.avro.AvroDeserializer.getRecordWriter(AvroDeserializer.scala:308)
	at org.apache.spark.sql.avro.AvroDeserializer.<init>(AvroDeserializer.scala:53)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:193)
	at org.apache.spark.sql.avro.AvroFileFormat$$anonfun$buildReader$1.apply(AvroFileFormat.scala:156)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


**Conclusion:** Error caused by incompatible data types in address.address_details.number:
*org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro to catalyst because schema at path address.address_details.number is not compatible (avroType = "int", sqlType = StringType).*

# Attempt 4: Read each partition at a time and union dataframes

In [15]:
import os
from pyspark.sql.functions import lit

idx = 0

for dir in [d for d in os.listdir(data_path) if d.find("=") != -1]:
    
    df_temp = spark.read.format(file_format).load(data_path + "/" + dir).withColumn(dir.split("=")[0], lit(dir.split("=")[1]))

    if idx == 0:
        df = df_temp
    else:
        df = df.union(df_temp)

    idx = idx + 1

AnalysisException: "Union can only be performed on tables with the same number of columns, but the first table has 7 columns and the second table has 8 columns;;\n'Union\n:- Project [identifier#1577, first_name#1578, last_name#1579, occupation#1580, age#1581, address#1582, 2020-02-01 AS date#1589]\n:  +- Relation[identifier#1577,first_name#1578,last_name#1579,occupation#1580,age#1581,address#1582] avro\n+- Project [identifier#1597, first_name#1598, last_name#1599, occupation#1600, age#1601, address#1602, title#1603, 2020-03-01 AS date#1611]\n   +- Relation[identifier#1597,first_name#1598,last_name#1599,occupation#1600,age#1601,address#1602,title#1603] avro\n"

**Conclusion:** Error because union cannot be performed on different schemas: *Union can only be performed on tables with the same number of columns, but the first table has 7 columns and the second table has 8 columns*

# Final solution

In [27]:
# Load functions
%run ./modules/mergeSchema-functions.ipynb
%run ./modules/helpers.ipynb

ctrl_file = "/home/jovyan/work/spark-data/raw/last_read_control/{}.json".format(entity)

# Read all partitions and merge the schemas
df = spark.read.json(merge_schemas(data_path, file_format, "F", ctrl_file))

Different schemas identified:
{
    "0": {
        "init_path": "/home/jovyan/work/spark-data/raw/test_data_avro/date=2020-01-01",
        "final_path": "/home/jovyan/work/spark-data/raw/test_data_avro/date=2020-01-01"
    },
    "1": {
        "init_path": "/home/jovyan/work/spark-data/raw/test_data_avro/date=2020-02-01",
        "final_path": "/home/jovyan/work/spark-data/raw/test_data_avro/date=2020-02-01"
    },
    "2": {
        "init_path": "/home/jovyan/work/spark-data/raw/test_data_avro/date=2020-03-01",
        "final_path": "/home/jovyan/work/spark-data/raw/test_data_avro/date=2020-03-01"
    },
    "3": {
        "init_path": "/home/jovyan/work/spark-data/raw/test_data_avro/date=2020-04-01",
        "final_path": "/home/jovyan/work/spark-data/raw/test_data_avro/date=2020-04-01"
    },
    "4": {
        "init_path": "/home/jovyan/work/spark-data/raw/test_data_avro/date=2020-05-01",
        "final_path": "/home/jovyan/work/spark-data/raw/test_data_avro/date=2020-05-01"
    }

In [28]:
# Check schema
df.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- address_details: struct (nullable = true)
 |    |    |-- number: string (nullable = true)
 |    |    |-- street: struct (nullable = true)
 |    |    |    |-- lat: string (nullable = true)
 |    |    |    |-- latitude: string (nullable = true)
 |    |    |    |-- long: string (nullable = true)
 |    |    |    |-- longitude: string (nullable = true)
 |    |    |    |-- street_name: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- country_code: string (nullable = true)
 |    |-- postal_code: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- age: string (nullable = true)
 |-- date: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- identifier: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- title: string (nullable = t

In [29]:
from pyspark.sql.functions import col

# Count by partition
df.select(
    col("date")
).groupBy("date").count().sort("date").toPandas()

Unnamed: 0,date,count
0,2020-01-01,10
1,2020-02-01,10
2,2020-03-01,10
3,2020-04-01,10
4,2020-05-01,10
5,2020-06-01,10


In [30]:
df.show()

+--------------------+---+----------+----------+----------+----------+--------------------+-----+----------+
|             address|age|      date|first_name|identifier| last_name|          occupation|title|title_name|
+--------------------+---+----------+----------+----------+----------+--------------------+-----+----------+
|                null| 42|2020-01-01|  Wilfredo|  89-16/49|     Pratt|           Lifeguard| null|      null|
|                null| 53|2020-01-01|      Jann|  02-92/94|      Ryan|             Almoner| null|      null|
|                null| 64|2020-01-01|    Tiesha|  21-53/62|     Joyce|Instrument Superv...| null|      null|
|                null| 57|2020-01-01|    Raymon|  14-29/10|    Branch|      Health Advisor| null|      null|
|                null| 30|2020-01-01|     Elvin|  85-95/54|       Orr|         Book-Keeper| null|      null|
|                null| 39|2020-01-01|     Roman|  01-55/73|    Acosta|          Tour Guide| null|      null|
|                nu

In [31]:
from pyspark.sql.types import StructType, ArrayType  

def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name + " AS " + name.replace(".","_"))

    return fields

In [32]:
df_flat = df.selectExpr(flatten(df.schema))

In [33]:
from pyspark.sql.functions import col, when

# Count by partition and column
df_flat.select(
    ["date"] + 
    [when(col(c).isNull(), 1).otherwise(0).alias(c) for c in df_flat.columns if c != "date"]
).groupBy("date").sum().sort("date").toPandas()

Unnamed: 0,date,sum(address_address),sum(address_address_details_number),sum(address_address_details_street_lat),sum(address_address_details_street_latitude),sum(address_address_details_street_long),sum(address_address_details_street_longitude),sum(address_address_details_street_street_name),sum(address_city),sum(address_country),sum(address_country_code),sum(address_postal_code),sum(address_state),sum(age),sum(first_name),sum(identifier),sum(last_name),sum(occupation),sum(title),sum(title_name)
0,2020-01-01,10,10,10,10,10,10,10,10,10,10,10,10,0,0,0,0,0,10,10
1,2020-02-01,0,10,10,10,10,10,10,0,0,10,0,0,0,0,0,0,0,10,10
2,2020-03-01,0,10,10,10,10,10,10,0,0,0,0,0,0,0,0,0,0,0,10
3,2020-04-01,10,0,10,0,10,0,0,0,0,0,0,0,0,0,0,0,0,0,10
4,2020-05-01,10,0,0,10,0,10,0,0,0,0,0,0,0,0,0,0,0,10,0
5,2020-06-01,10,0,10,10,10,10,0,0,0,10,0,0,0,0,0,0,0,10,10
