# Spark Window Functions

In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local[2]', appName='local')  # local n specifies n threads
spark = SparkSession(sc)  # defined spark

In [8]:
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id, udf, pandas_udf, sum, count, round, col, greatest
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, StringType, DoubleType

from scipy import signal
import numpy as np
import pandas as pd

### Try Example UDF used in Bounded Spark Window

* Spark 3.* allows bounded window, Spark 2.4 or below doesn't allow
  * But you might need to modify the .conf file: https://stackoverflow.com/questions/62109276/errorjava-lang-unsupportedoperationexception-for-pyspark-pandas-udf-documenta
* Spark 3.* prefers to use python type hints as udf return type, instead of using pandas type as the return type
  * https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html

In [30]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.window import Window

df1 = spark.range(0, 16, 2).toDF('v')
w1 = Window.partitionBy().orderBy('v').rowsBetween(-4, -1)

@pandas_udf('double')
def myavg(v:pd.Series) -> float:
    return v.mean()

df1.withColumn('v_mean', myavg(df1['v']).over(w1)).show()

+---+------+
|  v|v_mean|
+---+------+
|  0|  null|
|  2|   0.0|
|  4|   1.0|
|  6|   2.0|
|  8|   3.0|
| 10|   5.0|
| 12|   7.0|
| 14|   9.0|
+---+------+



### Try Exponential Sum in Moving Window

In [13]:
df = spark.createDataFrame([
    ('A', 10, 0), ('A', 10, 0), ('A', 10, 0), ('A', 10, 0), ('A', 10, 0),
    ('A', 10, 0), ('A', 10, 0), ('A', 20, 1), ('A', 20, 1), ('A', 20, 1),
    ('A', 20, 1), ('A', 20, 0), ('A', 20, 0), ('A', 20, 0), ('A', 20, 0),
    ('A', 20, 1), ('A', 20, 1), ('A', 20, 0),
    ('B', 10, 0), ('B', 10, 0), ('B', 10, 0), ('B', 10, 0), ('B', 10, 0),
    ('B', 10, 0), ('B', 10, 0), ('B', 20, 1), ('B', 20, 1), ('B', 20, 1),
    ('B', 20, 1), ('B', 20, 0), ('B', 20, 0), ('B', 20, 0), ('B', 20, 0),
    ('B', 20, 1), ('B', 20, 1), ('B', 20, 0)
], ['name', 'qty', 'has_color'])

df = df.select('*').withColumn('rid', row_number().over(Window.orderBy(monotonically_increasing_id())))

df = df.select(['rid', 'Name','qty', 'has_color']).cache()

df.show(n=10)

+---+----+---+---------+
|rid|Name|qty|has_color|
+---+----+---+---------+
|  1|   A| 10|        0|
|  2|   A| 10|        0|
|  3|   A| 10|        0|
|  4|   A| 10|        0|
|  5|   A| 10|        0|
|  6|   A| 10|        0|
|  7|   A| 10|        0|
|  8|   A| 20|        1|
|  9|   A| 20|        1|
| 10|   A| 20|        1|
+---+----+---+---------+
only showing top 10 rows



In [15]:
window_size = 7
group = 'name'

win_spec = Window.partitionBy([group]).orderBy('rid').rowsBetween(-window_size, -1) 
df = df.withColumn('col0', count('has_color').over(win_spec))
df = df.withColumn('col1', sum('has_color').over(win_spec)).fillna(0)

df.show(n=40)

+---+----+---+---------+----+----+
|rid|Name|qty|has_color|col0|col1|
+---+----+---+---------+----+----+
|  1|   A| 10|        0|   0|   0|
|  2|   A| 10|        0|   1|   0|
|  3|   A| 10|        0|   2|   0|
|  4|   A| 10|        0|   3|   0|
|  5|   A| 10|        0|   4|   0|
|  6|   A| 10|        0|   5|   0|
|  7|   A| 10|        0|   6|   0|
|  8|   A| 20|        1|   7|   0|
|  9|   A| 20|        1|   7|   1|
| 10|   A| 20|        1|   7|   2|
| 11|   A| 20|        1|   7|   3|
| 12|   A| 20|        0|   7|   4|
| 13|   A| 20|        0|   7|   4|
| 14|   A| 20|        0|   7|   4|
| 15|   A| 20|        0|   7|   4|
| 16|   A| 20|        1|   7|   3|
| 17|   A| 20|        1|   7|   3|
| 18|   A| 20|        0|   7|   3|
| 19|   B| 10|        0|   0|   0|
| 20|   B| 10|        0|   1|   0|
| 21|   B| 10|        0|   2|   0|
| 22|   B| 10|        0|   3|   0|
| 23|   B| 10|        0|   4|   0|
| 24|   B| 10|        0|   5|   0|
| 25|   B| 10|        0|   6|   0|
| 26|   B| 20|      

In [53]:
tau = -(window_size-1) / np.log(1.0/window_size)
weights = np.array(list(reversed(signal.windows.exponential(window_size, tau=tau, center=0, sym=False))))

print(tau)
print(weights)

3.083390054218504
[0.14285714 0.19758394 0.27327588 0.37796447 0.52275796 0.72302003
 1.        ]


In [52]:
@pandas_udf('double')
def get_weighted_window(v: pd.Series) -> float:
    v_len = len(v)
    if v_len > 0:
        return np.sum(np.dot(weights[-v_len:], v))
    return 0

win_spec = Window.partitionBy([group]).orderBy('rid').rowsBetween(-window_size, -1) 
df = df.withColumn('col2', get_weighted_window('has_color').over(win_spec))
df.show(n=40)

+---+----+---+---------+----+----+------------------+
|rid|Name|qty|has_color|col0|col1|              col2|
+---+----+---+---------+----+----+------------------+
|  1|   A| 10|        0|   0|   0|               0.0|
|  2|   A| 10|        0|   1|   0|               0.0|
|  3|   A| 10|        0|   2|   0|               0.0|
|  4|   A| 10|        0|   3|   0|               0.0|
|  5|   A| 10|        0|   4|   0|               0.0|
|  6|   A| 10|        0|   5|   0|               0.0|
|  7|   A| 10|        0|   6|   0|               0.0|
|  8|   A| 20|        1|   7|   0|               0.0|
|  9|   A| 20|        1|   7|   1|               1.0|
| 10|   A| 20|        1|   7|   2| 1.723020026399484|
| 11|   A| 20|        1|   7|   3| 2.245777984974194|
| 12|   A| 20|        0|   7|   4| 2.623742457983421|
| 13|   A| 20|        0|   7|   4|1.8970183412366195|
| 14|   A| 20|        0|   7|   4|1.3715822511612057|
| 15|   A| 20|        0|   7|   4|0.9916814354436382|
| 16|   A| 20|        1|   7