# Paramerterized SQL Queries

In [1]:
import os
import sys

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [1]:
import datetime
import pathlib

import delta
import pyspark
import pyspark.sql.functions as F
from delta import configure_spark_with_delta_pip

In [2]:
builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.executor.memory", "10G")
    .config("spark.driver.memory", "25G")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.sql.shuffle.partitions", "2")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/Users/matthew.powers/opt/miniconda3/envs/pyspark-332-delta-230/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/matthew.powers/.ivy2/cache
The jars for the packages stored in: /Users/matthew.powers/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-175dd9e0-0d2c-4f06-98dc-d2b9e9b5954b;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 98ms :: artifacts dl 4ms
	:: modules in use:
	io.delta#delta-core_2.12;2.3.0 from central in [default]
	io.delta#delta-storage;2.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   

23/10/22 00:17:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## PySpark parameterized queries with named arguments

In [5]:
spark.read.format("delta").load(
    f"{pathlib.Path.home()}/data/deltalake/G1_1e9_1e2_0_0"
).createOrReplaceTempView("h20_1e9")

In [6]:
spark.sql("select * from h20_1e9 limit 3").show()

23/10/21 22:35:40 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-----+-----+------------+---+---+-------+---+---+---------+
|  id1|  id2|         id3|id4|id5|    id6| v1| v2|       v3|
+-----+-----+------------+---+---+-------+---+---+---------+
|id015|id010|id0000896846| 10|  1|5773276|  1|  1|33.170577|
|id017|id017|id0007133323| 71| 14|5476891|  3|  2|34.800229|
|id016|id030|id0002048672| 55| 35|4861331|  1|  3|32.064864|
+-----+-----+------------+---+---+-------+---+---+---------+



In [7]:
query = """SELECT id1, sum(v1) AS v1 
FROM h20_1e9
WHERE id1 = "id089"
GROUP BY id1
"""

spark.sql(query).show()



+-----+--------+
|  id1|      v1|
+-----+--------+
|id089|29990077|
+-----+--------+



                                                                                

In [8]:
spark.sql("SELECT id1, sum(v1) AS v1 from h20_1e9 GROUP BY id1").show()



+-----+--------+
|  id1|      v1|
+-----+--------+
|id016|30003304|
|id074|30006309|
|id070|29990210|
|id054|30011978|
|id053|29992360|
|id056|29987234|
|id057|29991822|
|id003|30003365|
|id001|30009448|
|id015|30006177|
|id017|29995061|
|id018|29992469|
|id014|29998476|
|id072|30003522|
|id071|29998357|
|id073|30006820|
|id055|30009993|
|id004|30015990|
|id002|29996534|
|id005|29993888|
+-----+--------+
only showing top 20 rows



                                                                                

In [9]:
query = """SELECT id1, sum(v1) AS v1 
FROM h20_1e9
WHERE id1 = {id1_val} 
GROUP BY id1"""

spark.sql(query, id1_val="id016").show()

+-----+--------+
|  id1|      v1|
+-----+--------+
|id016|30003304|
+-----+--------+



In [10]:
spark.sql(query, id1_val="id018").show()

+-----+--------+
|  id1|      v1|
+-----+--------+
|id018|29992469|
+-----+--------+



                                                                                

In [11]:
query = """SELECT id1, sum(v1) AS v1 
FROM h20_1e9
WHERE id1 = :id1_val 
GROUP BY id1"""

spark.sql(query, {"id1_val": "id016"}).show()

+-----+--------+
|  id1|      v1|
+-----+--------+
|id016|30003304|
+-----+--------+



In [4]:
mydf = spark.range(10)

In [6]:
spark.sql("SELECT id FROM {mydf} WHERE id % {param1} = 0", mydf=mydf, param1="3").show()

[Stage 0:>                                                         (0 + 0) / 10]

+---+
| id|
+---+
|  0|
|  3|
|  6|
|  9|
+---+



                                                                                

In [14]:
spark.sql("SELECT id FROM {mydf} WHERE id % '3' = 0", mydf=mydf, param1="3").show()

+---+
| id|
+---+
|  0|
|  3|
|  6|
|  9|
+---+



## Parameterized query with custom string formatting

In [2]:
import datetime
import pathlib

import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("demo").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/22 08:36:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
purchases_df = spark.createDataFrame(
    [
        ("socks", 7.55, datetime.date(2022, 5, 15)),
        ("handbag", 49.99, datetime.date(2022, 5, 16)),
        ("shorts", 25.00, datetime.date(2023, 1, 5)),
        ("socks", 25.00, datetime.date(2023, 12, 23)),
    ],
    ["item", "amount", "purchase_date"],
)

In [5]:
purchases_df.show()

                                                                                

+-------+------+-------------+
|   item|amount|purchase_date|
+-------+------+-------------+
|  socks|  7.55|   2022-05-15|
|handbag| 49.99|   2022-05-16|
| shorts|  25.0|   2023-01-05|
|  socks|  25.0|   2023-12-23|
+-------+------+-------------+



In [7]:
purchases_df.createOrReplaceTempView("some_purchases")

In [16]:
query = "SELECT item, sum(amount) from some_purchases group by item having item = :item"

In [17]:
spark.sql(
    query,
    args={"item": "socks"},
).show()

+-----+-----------+
| item|sum(amount)|
+-----+-----------+
|socks|      32.55|
+-----+-----------+



## Parameterized query with named args & dictionary of arguments

In [5]:
some_df = spark.createDataFrame(
    [
        (1, 3.3, datetime.date(2020, 1, 20)),
        (2, 9.9, datetime.date(2021, 5, 17)),
        (3, 4.5, datetime.date(2022, 10, 5)),
    ],
    ["id", "number", "the_date"],
)

In [6]:
some_df.show()

23/10/22 00:27:19 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/matthew.powers/opt/miniconda3/envs/pyspark-341/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 683, in main
    raise RuntimeError(
RuntimeError: Python in worker has different version 3.12 than that in driver 3.9, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(Interrupti

Py4JJavaError: An error occurred while calling o89.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (qtk9h72yp0.lan executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/matthew.powers/opt/miniconda3/envs/pyspark-341/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 683, in main
    raise RuntimeError(
RuntimeError: Python in worker has different version 3.12 than that in driver 3.9, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/matthew.powers/opt/miniconda3/envs/pyspark-341/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 683, in main
    raise RuntimeError(
RuntimeError: Python in worker has different version 3.12 than that in driver 3.9, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [31]:
some_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- number: double (nullable = true)
 |-- the_date: date (nullable = true)



In [36]:
query = "SELECT * from {some_df} WHERE the_date > :the_date AND number > :min_value"

spark.sql(
    query,
    some_df=some_df,
    args={"the_date": datetime.date(2021, 1, 1), "min_value": 5.0},
).show()

+---+------+----------+
| id|number|  the_date|
+---+------+----------+
|  2|   9.9|2021-05-17|
+---+------+----------+



In [37]:
spark.sql(
    query,
    some_df=some_df,
    args={"the_date": datetime.date(2021, 1, 1), "min_value": 4.0},
).show()

+---+------+----------+
| id|number|  the_date|
+---+------+----------+
|  2|   9.9|2021-05-17|
|  3|   4.5|2022-10-05|
+---+------+----------+



## String interpolation is worse

In [42]:
some_df.createOrReplaceTempView("whatever")
the_date = "2021-01-01"
min_value = "4.0"
table_name = "whatever"

query = (
    f"SELECT * from {table_name} WHERE the_date > '{the_date}' AND number > {min_value}"
)
spark.sql(query).show()

+---+------+----------+
| id|number|  the_date|
+---+------+----------+
|  2|   9.9|2021-05-17|
|  3|   4.5|2022-10-05|
+---+------+----------+



## Query a PySpark DataFrame with SQL

In [43]:
person_df = spark.createDataFrame(
    [
        ("frank", "usa"),
        ("sourav", "india"),
        ("rahul", "india"),
        ("sim", "buglaria"),
    ],
    ["firstname", "country"],
)

In [47]:
person_df.show()

+---------+--------+
|firstname| country|
+---------+--------+
|    frank|     usa|
|   sourav|   india|
|    rahul|   india|
|      sim|buglaria|
+---------+--------+



In [46]:
spark.sql(
    "select country, count(*) as num_ppl from {person_df} group by country",
    person_df=person_df,
).show()

+--------+-------+
| country|num_ppl|
+--------+-------+
|     usa|      1|
|   india|      2|
|buglaria|      1|
+--------+-------+

