# Performance test with native execution engine using the NYC taxi - yellow dataset from https://learn.microsoft.com/en-us/azure/open-datasets/dataset-taxi-yellow?tabs=pyspark
In this example, I am using a Fabric environment with the native execution engine enabled.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

StatementMeta(, 0ecf7285-ca05-4d00-963e-99da582a512e, 3, Finished, Available, Finished)

In [2]:
# Create Spark session
spark = SparkSession.builder \
    .appName("NativeExecutionEngine") \
    .getOrCreate()

StatementMeta(, 0ecf7285-ca05-4d00-963e-99da582a512e, 4, Finished, Available, Finished)

In [3]:
# Get the NYC Taxi yellow data from Microsoft: https://learn.microsoft.com/en-us/azure/open-datasets/dataset-taxi-yellow?tabs=pyspark

# Azure storage access info
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = r""

# Allow SPARK to read from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
print('Remote blob path: ' + wasbs_path)

# SPARK read parquet, note that it won't load any data yet by now
df_yellow = spark.read.parquet(wasbs_path)
print('Register the DataFrame as a SQL temporary view: source')
df_yellow.createOrReplaceTempView('yellow')

StatementMeta(, 0ecf7285-ca05-4d00-963e-99da582a512e, 5, Finished, Available, Finished)

Remote blob path: wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow
Register the DataFrame as a SQL temporary view: source


In [4]:
# Count the number of rows
df_yellow.count()

StatementMeta(, 0ecf7285-ca05-4d00-963e-99da582a512e, 6, Finished, Available, Finished)

1571671152

In [7]:
# Executes a SQL query using Spark SQL to sum up all values in the 'totalAmount' column from the 'yellow' table
yellow_sum = spark.sql('SELECT SUM(totalAmount) FROM yellow')

# Triggers the execution of the query and retrieves the result as a list of Row objects
yellow_sum.collect()

# Displays the execution plan of the query
yellow_sum.explain()

StatementMeta(, 0ecf7285-ca05-4d00-963e-99da582a512e, 9, Finished, Available, Finished)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   VeloxColumnarToRowExec
   +- ^(6) HashAggregateTransformer(keys=[], functions=[sum(totalAmount#714)], isStreamingAgg=false)
      +- ^(6) InputIteratorTransformer[sum#1052]
         +- ShuffleQueryStage 0
            +- ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=460], [id=#460], [OUTPUT] List(sum:DoubleType)
               +- ^(5) FlushableHashAggregateTransformer(keys=[], functions=[partial_sum(totalAmount#714)], isStreamingAgg=false)
                  +- ^(5) ProjectExecTransformer [totalAmount#714]
                     +- ^(5) NativeFileScan parquet [totalAmount#714,puYear#715,puMonth#716] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<totalAmount:double>
+- == Initial Plan ==
   HashAggregate(keys=[], functions=[sum(total