In [43]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t
import math

In [44]:
spark = SparkSession \
    .builder \
    .appName("Data with Nikk the Greek Spark Session") \
    .master("local[4]") \
    .enableHiveSupport() \
    .config("spark.jars.packages", "uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.5") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0") \
    .getOrCreate()

"""
Reference gresearch:
- Parquet files analysis: https://www.gresearch.com/blog/article/parquet-files-know-your-scaling-limits/
- GitHub Spark extension: https://github.com/G-Research/spark-extension
- Parquet methods: https://github.com/G-Research/spark-extension/tree/master/python/gresearch/spark/parquet
"""

'\nReference gresearch:\n- Parquet files analysis: https://www.gresearch.com/blog/article/parquet-files-know-your-scaling-limits/\n- GitHub Spark extension: https://github.com/G-Research/spark-extension\n- Parquet methods: https://github.com/G-Research/spark-extension/tree/master/python/gresearch/spark/parquet\n'

In [45]:
sc = spark.sparkContext

In [46]:
#Turning off AQE as it generates more jobs which might be confusing for this scenario here. 
spark.conf.set("spark.sql.adaptive.enabled", "false")
#to not cache dataframes... this may not create repeatable results
spark.conf.set("spark.databricks.io.cache.enabled", "false")

In [47]:
def sdf_generator(num_rows: int, num_partitions: int = None) -> "DataFrame":
    return (
        spark.range(num_rows, numPartitions=num_partitions)
        .withColumn("date", f.current_date())
        .withColumn("timestamp",f.current_timestamp())
        .withColumn("idstring", f.col("id").cast("string"))
        .withColumn("idfirst", f.col("idstring").substr(0,1))
        .withColumn("idlast", f.col("idstring").substr(-1,1))
        )

In [48]:
sdf = sdf_generator(10000000, 8)
sdf.show()

+---+----------+--------------------+--------+-------+------+
| id|      date|           timestamp|idstring|idfirst|idlast|
+---+----------+--------------------+--------+-------+------+
|  0|2024-05-20|2024-05-20 13:53:...|       0|      0|     0|
|  1|2024-05-20|2024-05-20 13:53:...|       1|      1|     1|
|  2|2024-05-20|2024-05-20 13:53:...|       2|      2|     2|
|  3|2024-05-20|2024-05-20 13:53:...|       3|      3|     3|
|  4|2024-05-20|2024-05-20 13:53:...|       4|      4|     4|
|  5|2024-05-20|2024-05-20 13:53:...|       5|      5|     5|
|  6|2024-05-20|2024-05-20 13:53:...|       6|      6|     6|
|  7|2024-05-20|2024-05-20 13:53:...|       7|      7|     7|
|  8|2024-05-20|2024-05-20 13:53:...|       8|      8|     8|
|  9|2024-05-20|2024-05-20 13:53:...|       9|      9|     9|
| 10|2024-05-20|2024-05-20 13:53:...|      10|      1|     0|
| 11|2024-05-20|2024-05-20 13:53:...|      11|      1|     1|
| 12|2024-05-20|2024-05-20 13:53:...|      12|      1|     2|
| 13|202

In [49]:
sdf_schema = "id bigint, date date, timestamp timestamp, idstring string, idfirst string, idlast string"

How predicate pushdown works on Spark since Spark 3.1.0: https://www.waitingforcode.com/apache-spark-sql/what-new-apache-spark-3.1-predicate-pushdown-json-csv-apache-avro/read

# 1 - JSON

## 1-1 Initial state JSON

How it works: https://github.com/jerryshao/apache-spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L323
https://spark.apache.org/docs/latest/sql-data-sources-json.html

In [50]:
sc.setJobDescription("Save Json")
path_json = "format_json_large.json"
sdf.write.format("json").mode("overwrite").save(path_json)

                                                                                

In [51]:
#set maxPartitions MB for a fair comparison of 8 partitions also during read 
maxPartitionsMB = 160
maxPartitionsBytes = math.ceil(maxPartitionsMB*1024*1024)
spark.conf.set("spark.sql.files.maxPartitionBytes", str(maxPartitionsBytes)+"b")

In [52]:
spark.conf.set("spark.sql.sources.useV1SourceList", "json")
sc.setJobDescription("Load Json all data V1")
sdf_json = spark.read.format("json").schema(sdf_schema).load(path_json)
sdf_json.write.format("noop").mode("overwrite").save()

                                                                                

In [53]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("Load Json all data V2")
sdf_json = spark.read.format("json").schema(sdf_schema).load(path_json)
sdf_json.write.format("noop").mode("overwrite").save()

                                                                                

## 1-2 Column filter

In [54]:
spark.conf.set("spark.sql.sources.useV1SourceList", "json")
sc.setJobDescription("JSON Column Filter V1")
sdf_json = spark.read.format("json").schema(sdf_schema).load(path_json)
sdf_json = sdf_json.select("id", "idstring")
sdf_json.write.format("noop").mode("overwrite").save()

                                                                                

In [55]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("JSON Column Filter V2")
sdf_json = spark.read.format("json").schema(sdf_schema).load(path_json)
sdf_json = sdf_json.select("id", "idstring")
sdf_json.write.format("noop").mode("overwrite").save()

                                                                                

## 1-3 Row filter

In [56]:
spark.conf.set("spark.sql.sources.useV1SourceList", "json")
sc.setJobDescription("JSON Row Filter id V1")
sdf_json = spark.read.format("json").schema(sdf_schema).load(path_json)
sdf_json = sdf_json.filter(f.col("id") < 300)
sdf_json.write.format("noop").mode("overwrite").save()

                                                                                

In [57]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("JSON Row Filter id V2")
sdf_json = spark.read.format("json").schema(sdf_schema).load(path_json)
sdf_json = sdf_json.filter(f.col("id") < 300)
sdf_json.write.format("noop").mode("overwrite").save()

                                                                                

## 1-4 Count

In [58]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("JSON Count")
sdf_json = spark.read.format("json").schema(sdf_schema).load(path_json)
sdf_json = sdf_json.filter(f.col("id") < 300)
sdf_json.write.format("noop").mode("overwrite").save()

                                                                                

# 2 - CSV with schema interference

## 2-1 Initial state CSV

How it works: https://github.com/jerryshao/apache-spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L470
https://spark.apache.org/docs/latest/sql-data-sources-csv.html

In [59]:
sc.setJobDescription("Save CSV")
path_csv = "format_csv_large.csv"
sdf.write.format("csv").mode("overwrite").option("header", "True").save(path_csv)

                                                                                

In [60]:
#set maxPartitions MB for a fair comparison of 8 partitions also during read 
maxPartitionsMB = 80
maxPartitionsBytes = math.ceil(maxPartitionsMB*1024*1024)
spark.conf.set("spark.sql.files.maxPartitionBytes", str(maxPartitionsBytes)+"b")

In [61]:
spark.conf.set("spark.sql.sources.useV1SourceList", "csv")
sc.setJobDescription("Load CSV All data V1")
sdf_csv = spark.read.format("csv").options(header=True).schema(sdf_schema).load(path_csv)
sdf_csv.write.format("noop").mode("overwrite").save()

                                                                                

In [62]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("Load CSV All data V2")
sdf_csv = spark.read.format("csv").options(header=True).schema(sdf_schema).load(path_csv)
sdf_csv.write.format("noop").mode("overwrite").save()

                                                                                

## 2-2 Column Filter

In [63]:
spark.conf.set("spark.sql.sources.useV1SourceList", "csv")
sc.setJobDescription("CSV Column Filter V1")
sdf_csv = spark.read.format("csv").schema(sdf_schema).load(path_csv)
sdf_csv = sdf_csv.select("id", "idstring")
sdf_csv.write.format("noop").mode("overwrite").save()

                                                                                

In [64]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("CSV Column Filter V2")
sdf_csv = spark.read.format("csv").schema(sdf_schema).load(path_csv)
sdf_csv = sdf_csv.select("id", "idstring")
sdf_csv.write.format("noop").mode("overwrite").save()

                                                                                

## 2-3 Row Filter

In [65]:
spark.conf.set("spark.sql.sources.useV1SourceList", "csv")
sc.setJobDescription("CSV Row Filter id V1")
sdf_csv = spark.read.format("csv").schema(sdf_schema).load(path_csv)
sdf_csv = sdf_csv.filter(f.col("id") < 300)
sdf_csv.write.format("noop").mode("overwrite").save()

                                                                                

In [66]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("CSV Row Filter id V2")
sdf_csv = spark.read.format("csv").schema(sdf_schema).load(path_csv)
sdf_csv = sdf_csv.filter(f.col("id") < 300)
sdf_csv.write.format("noop").mode("overwrite").save()

                                                                                

## 2-4 Count


In [67]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("CSV Count")
sdf_csv = spark.read.format("csv").schema(sdf_schema).load(path_csv)
sdf_csv.count()

                                                                                

10000008

# 3 - Parquet

## 3-1 Initial state Parquet

How it works: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

In [68]:
sc.setJobDescription("Save Parquet")
path_parquet = "format_parquet_large.parquet"
sdf.write.format("parquet").mode("overwrite").save(path_parquet)

                                                                                

In [69]:
#set maxPartitions MB for a fair comparison of 8 partitions also during read 
maxPartitionsMB = 15
maxPartitionsBytes = math.ceil(maxPartitionsMB*1024*1024)
spark.conf.set("spark.sql.files.maxPartitionBytes", str(maxPartitionsBytes)+"b")

Introducing Spark config: spark.sql.sources.useV1SourceList, default: avro,csv,json,kafka,orc,parquet,text

A comma-separated list of data source short names or fully qualified data source 
implementation class names for which Data Source V2 code path is disabled. These 
data sources will fallback to Data Source V1 code path.

Data Source V2 is a more flexible data source class supporting easier implementation of new pushdown options and also might be a bit more efficient.

https://www.youtube.com/watch?v=U5n-evWfYSQ

https://github.com/apache/spark/blob/fe0aa1edff047689a1906eaa86d667e0ddfabfbc/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L3207

In [70]:
#TODO: Check with Spark community why V2 not activated for parquet and aggregate not pushed as default
#TODO: Why statistics eliminated from Scanning Tab... Inefficiency?
#TODO: filter and count does not work -> Does not make sense not beeing integrated new feature

In [71]:
spark.conf.set("spark.sql.sources.useV1SourceList", "parquet")
sc.setJobDescription("Load Parquet all data V1")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_parquet.write.format("noop").mode("overwrite").save()

                                                                                

In [72]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("Load Parquet all data V2")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_parquet.write.format("noop").mode("overwrite").save()

                                                                                

## 3-2 Column Filter

In [73]:
spark.conf.set("spark.sql.sources.useV1SourceList", "parquet")
sc.setJobDescription("Parquet Column Filter V1")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_parquet = sdf_parquet.select("id", "idstring")
sdf_parquet.write.format("noop").mode("overwrite").save()

In [74]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("Parquet Column Filter V2")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_parquet = sdf_parquet.select("id", "idstring")
sdf_parquet.write.format("noop").mode("overwrite").save()

                                                                                

## 3-3 Row Filter id

In [75]:
spark.conf.set("spark.sql.sources.useV1SourceList", "parquet")
sc.setJobDescription("Parquet Row Filter id V1")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_parquet = sdf_parquet.filter(f.col("id") < 300)
sdf_parquet.write.format("noop").mode("overwrite").save()

In [76]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("Parquet Row Filter id V2")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_parquet = sdf_parquet.filter(f.col("id") < 300)
sdf_parquet.write.format("noop").mode("overwrite").save()

## 3-4 Count

In [77]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.parquet.aggregatePushdown", "false")
sc.setJobDescription("Parquet count aggegratePushdown false")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_parquet.count()

10000000

Introducing: spark.sql.parquet.aggregatePushdown, default false, needs data source not to be listed in spark.sql.sources.useV1SourceList

If true, aggregates will be pushed down to Parquet for optimization. Support MIN, MAX and COUNT as aggregate expression. For MIN/MAX, support boolean, integer, float and date type. For COUNT, support all data types. If statistics is missing from any Parquet file footer, exception would be thrown.

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

In [78]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.parquet.aggregatePushdown", "true")
sc.setJobDescription("Parquet count aggegratePushdown true")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_parquet.count()

10000000

## 3-5 Max

In [79]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.parquet.aggregatePushdown", "false")
sc.setJobDescription("Parquet max aggegratePushdown false")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_max = sdf_parquet.groupBy().max("id")
sdf_max.show()

+-------+
|max(id)|
+-------+
|9999999|
+-------+



In [80]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.parquet.aggregatePushdown", "true")
sc.setJobDescription("Parquet max aggegratePushdown true")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_max = sdf_parquet.groupBy().max("id")
sdf_max.show()

+-------+
|max(id)|
+-------+
|9999999|
+-------+



## 3-6 filter and a count

In [81]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.parquet.aggregatePushdown", "true")
sc.setJobDescription("Parquet count with filter 1250000")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_parquet = sdf_parquet.filter(f.col("id") < 1250000)
sdf_parquet.count()

1250000

## 3-7 filter and max

In [82]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.parquet.aggregatePushdown", "true")
sc.setJobDescription("Parquet max with filter 1250000")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet)
sdf_parquet = sdf_parquet.filter(f.col("id") <= 1249999)
sdf_max = sdf_parquet.groupBy().max("id")
sdf_max.show()

+-------+
|max(id)|
+-------+
|1249999|
+-------+



## 3-8 Partitioned and filter

In [83]:
sc.setJobDescription("Save Parquet Partitioned")
path_parquet_part = "D:/Spark/Data/format_parquet_partitioned.parquet"
sdf.repartition("idlast").write.partitionBy("idlast").format("parquet").mode("overwrite").save(path_parquet_part)

Py4JJavaError: An error occurred while calling o804.save.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "D"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:454)
	at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:530)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("Parquet Row Filter idlast Partitioned")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet_part)
sdf_parquet = sdf_parquet.filter(f.col("idlast") == "1")
sdf_parquet.write.format("noop").mode("overwrite").save()

## 3-9 Partitioned, filter and max

In [None]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
spark.conf.set("spark.sql.parquet.aggregatePushdown", "true")
sc.setJobDescription("Parquet partitioned max with filter idlast 1")
sdf_parquet = spark.read.format("parquet").schema(sdf_schema).load(path_parquet_part)
sdf_parquet = sdf_parquet.filter(f.col("idlast") == "1")
sdf_max = sdf_parquet.groupBy().max("id")
sdf_max.show()

+-------+
|max(id)|
+-------+
|9999991|
+-------+



Limitations:
- Filter and aggregates work only with partitioning
- Max and Min works not with nested columns or string columns

# 4 - Avro

## 4-1 Initial state AVRO

How it works: https://spark.apache.org/docs/latest/sql-data-sources-avro.html

In [None]:
sc.setJobDescription("Save Avro")
path_avro = "D:/Spark/Data/format_avro_large.avro"
sdf.write.format("avro").mode("overwrite").save(path_avro)

In [None]:
#set maxPartitions MB for a fair comparison of 8 partitions also during read 
maxPartitionsMB = 10
maxPartitionsBytes = math.ceil(maxPartitionsMB*1024*1024)
spark.conf.set("spark.sql.files.maxPartitionBytes", str(maxPartitionsBytes)+"b")

In [None]:
spark.conf.set("spark.sql.sources.useV1SourceList", "avro")
sc.setJobDescription("Load Avro All data V1")
sdf_avro = spark.read.format("avro").schema(sdf_schema).load(path_avro)
sdf_avro.write.format("noop").mode("overwrite").save()

In [None]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("Load Avro All data V2")
sdf_avro = spark.read.format("avro").schema(sdf_schema).load(path_avro)
sdf_avro.write.format("noop").mode("overwrite").save()

## 4-2 Column Filter

Results:
- Load data: 1.6 s

In [None]:
spark.conf.set("spark.sql.sources.useV1SourceList", "avro")
sc.setJobDescription("Avro Column Filter V1")
sdf_avro = spark.read.format("avro").schema(sdf_schema).load(path_avro)
sdf_avro = sdf_avro.select("id", "idstring")
sdf_avro.write.format("noop").mode("overwrite").save()

In [None]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("Avro Column Filter V2")
sdf_avro = spark.read.format("avro").schema(sdf_schema).load(path_avro)
sdf_avro = sdf_avro.select("id", "idstring")
sdf_avro.write.format("noop").mode("overwrite").save()

## 4-3 Row Filter

In [None]:
spark.conf.set("spark.sql.sources.useV1SourceList", "avro")
sc.setJobDescription("Avro ID Filter V1")
sdf_avro = spark.read.format("avro").schema(sdf_schema).load(path_avro)
sdf_avro = sdf_avro.filter(f.col("id") < 300)
sdf_avro.write.format("noop").mode("overwrite").save()

In [None]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("Avro ID Filter V2")
sdf_avro = spark.read.format("avro").schema(sdf_schema).load(path_avro)
sdf_avro = sdf_avro.filter(f.col("id") < 300)
sdf_avro.write.format("noop").mode("overwrite").save()

## 4-4 Count

In [None]:
spark.conf.set("spark.sql.sources.useV1SourceList", "")
sc.setJobDescription("Avro Count")
sdf_avro = spark.read.format("avro").schema(sdf_schema).load(path_avro)
sdf_avro.count()

10000000