# Fugue vs Pandas on Spark

These experiments were run on Databricks Runtime 12.2 LTS with a cluster of 8 machines with 16 cpus each. We also used:

* Fugue 0.8.2dev3
* Polars 0.16.14
* Spark 3.3

## Utils Functions

In [0]:
import polars as pl
import numpy as np
import pandas as pd
from uuid import uuid4
from datetime import datetime
from typing import List,Dict,Any,Iterable
import fugue.api as fa
from triad import Schema

COLS = ["_"+str(x) for x in range(10)]
PL_COLS = [pl.col("_"+str(x)) for x in range(10)]
SPARK_PATH = "/dbfs/ht2-{n}.parquet"
WHOLE_PERIOD = 28*24
PERIOD = 7*24

def make_df(n, start, periods, freq):
    ts = pd.date_range(start, periods=periods, freq=freq, name="ts")
    m = np.random.rand(len(ts),len(COLS))
    df = pd.DataFrame(m, columns=COLS, index=ts).reset_index()
    return df.assign(uid=n)

def make_dfs(df:Iterable[List[Any]]) -> Iterable[pd.DataFrame]:
    for row in df:
        yield make_df(row[0], "2000-01-01", row[1], "min")

# schema: *,greatest:double
def greatest_pd(dfs:Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
    for df in dfs:
        yield df.assign(greatest=df[COLS].max(axis=1))

# schema: *,greatest:double
def greatest_pl(dfs:Iterable[pl.DataFrame]) -> Iterable[pl.DataFrame]:
    for df in dfs:
        yield df.with_columns(pl.max(PL_COLS).alias("greatest"))
        
# schema:*
def zscore_pd(df:pd.DataFrame, n) -> pd.DataFrame:
    subdf = df[COLS]
    df = df.sort_values("ts")
    x = subdf.shift(1).rolling(n)
    z=(subdf-x.mean()).abs()/x.std()
    return df.assign(**{k:z[k] for k in COLS}).dropna()

# schema:*
def zscore_pd_gp(df:pd.DataFrame, n) -> pd.DataFrame:
    idf = df.set_index("uid").sort_values(["uid","ts"])
    subdf = idf[COLS]
    x = subdf.groupby("uid").shift(1).rolling(n)
    z=(subdf-x.mean()).abs()/x.std()
    return idf.assign(**{k:z[k] for k in COLS}).dropna().reset_index()

# schema:*
def zscore_pl(df:pl.DataFrame, n:int) -> pl.DataFrame:
    params = {}
    for col in COLS:
        mean = pl.col(col).shift().rolling_mean(n, min_periods=n)
        std = pl.col(col).shift().rolling_std(n, min_periods=n)
        params[col]=(pl.col(col) - mean).abs()/std
    return df.sort("ts").with_columns(**params).drop_nulls()

# schema:*
def zscore_pl_gp(df:pl.DataFrame, n:int) -> pl.DataFrame:
    params = {}
    for col in COLS:
        mean = pl.col(col).shift().rolling_mean(n, min_periods=n).over("uid")
        std = pl.col(col).shift().rolling_std(n, min_periods=n).over("uid")
        params[col]=(pl.col(col) - mean).abs()/std
    return df.sort("ts").with_columns(**params).drop_nulls()

## Testing Utils

In [0]:
from datetime import datetime

NUMBERS = [10000,100000,1000000]

def save(n, path, engine):
    idf = pd.DataFrame(dict(a=range(n),b=WHOLE_PERIOD))

    with fa.engine_context(engine):
        df = fa.transform(idf, make_dfs, Schema("ts:datetime")+[(c,float) for c in COLS]+[("uid",int)], partition=128)  
        fa.save(df, path.format(n=n))

class TestRunner:
    def __init__(self, func):
        self.func = func

    def run_all(self):
        for n in NUMBERS:
            self.run(n)
            
    def pre(self):
        pass
    
    def post(self):
        pass
    
    def path(self, n):
        pass

    def run(self, n):
        self.pre()
        try:
            with fa.engine_context(spark):
                start = datetime.now()
                self.func(self.path(n))
                span = (datetime.now() - start).total_seconds()
            print(f"{self.func.__name__}({n}) --- {span} seconds")
        finally:
            self.post()

    def __call__(self, *args, **kwargs):
        return self.func(*args, **kwargs)

In [0]:
for n in NUMBERS:
    save(n, SPARK_PATH, spark)

## Databricks Tests

In [0]:
class SparkTestRunner(TestRunner):
    def pre(self):
        spark.catalog.clearCache()
    
    def post(self):
        pass
    
    def path(self, n):
        return SPARK_PATH.format(n=n)
    
def spark_test(func):
    return SparkTestRunner(func)


# Pure Overhead

In [0]:
from pyspark.sql.functions import sum

@spark_test
def koalas_overhead(path):
    print(pp.read_parquet(path).groupby("uid").apply(lambda x:x.head(1))["_0"].sum())

@spark_test
def pandas_udf_overhead(path):
    df = spark.read.parquet(path)
    df = df.groupby("uid").applyInPandas(lambda x:x.head(1), schema=df.schema)
    print(df.select(sum(df["_0"])).toPandas())

In [0]:
koalas_overhead.run_all()

4960.396094375464
koalas_overhead(10000) --- 7.541973 seconds
49938.184014258906
koalas_overhead(100000) --- 40.058433 seconds
499140.48774121556
koalas_overhead(1000000) --- 346.786903 seconds


In [0]:
pandas_udf_overhead.run_all()

       sum(_0)
0  4960.396094
pandas_udf_overhead(10000) --- 2.082695 seconds
        sum(_0)
0  49938.184014
pandas_udf_overhead(100000) --- 9.210402 seconds
        sum(_0)
0  499139.67806
pandas_udf_overhead(1000000) --- 73.292321 seconds


## ZScore - Pandas on Spark 

Pandas on Spark needs special type annotation on output, we have to create a pandas dataframe in order to construct the schema...

In [0]:
import pyspark.pandas as pp

edf = make_df(0,"2022-01-01",20,"D")

# schema:*
def zscore_pp(df:pd.DataFrame, n) -> pp.DataFrame[zip(edf.columns, edf.dtypes)]:
    return zscore_pd(df, n)

@spark_test
def koalas_zscore(path):
    print(pp.read_parquet(path).groupby("uid").apply(zscore_pp,n=PERIOD)["_0"].sum())

In [0]:
# this is to warmup the cluster
koalas_zscore.run(10000)

4378032.511842889
koalas_zscore(10000) --- 49.886775 seconds


In [0]:
koalas_zscore.run_all()

4378032.511842885
koalas_zscore(10000) --- 8.447558 seconds
43808623.11610298
koalas_zscore(100000) --- 64.523119 seconds
438117632.98056763
koalas_zscore(1000000) --- 437.265959 seconds


## ZScore - Fugue

In [0]:
import fugue.api as fa
from pyspark.sql.functions import sum

@spark_test
def zscore_pandas(path):
    df = fa.transform(
        path,
        zscore_pd,
        partition="uid",
        params=dict(n=PERIOD)
    )
    print(df.select(sum(df["_0"])).toPandas())
   
@spark_test
def zscore_pandas_coarse(path):
    df = fa.transform(
        path,
        zscore_pd_gp,
        partition=dict(by="uid", algo="coarse"),
        params=dict(n=PERIOD)
    )
    print(df.select(sum(df["_0"])).toPandas())

@spark_test
def zscore_polars(path):
    df = fa.transform(
        path,
        zscore_pl,
        partition="uid",
        params=dict(n=PERIOD)
    )
    print(df.select(sum(df["_0"])).toPandas())

@spark_test
def zscore_polars_coarse(path):
    df = fa.transform(
        path,
        zscore_pl_gp,
        partition=dict(by="uid", algo="coarse"),
        params=dict(n=PERIOD)
    )
    print(df.select(sum(df["_0"])).toPandas())

In [0]:
zscore_pandas.run_all()

        sum(_0)
0  4.378033e+06
zscore_pandas(10000) --- 5.519285 seconds
        sum(_0)
0  4.380862e+07
zscore_pandas(100000) --- 25.438292 seconds
        sum(_0)
0  4.381177e+08
zscore_pandas(1000000) --- 194.85018 seconds


In [0]:
zscore_pandas_coarse.run_all()

        sum(_0)
0  4.378033e+06
zscore_pandas_coarse(10000) --- 2.361903 seconds
        sum(_0)
0  4.380862e+07
zscore_pandas_coarse(100000) --- 4.726595 seconds
        sum(_0)
0  4.381176e+08
zscore_pandas_coarse(1000000) --- 37.439444 seconds


In [0]:
zscore_polars.run_all()

        sum(_0)
0  4.378033e+06
zscore_polars(10000) --- 3.59433 seconds
        sum(_0)
0  4.380862e+07
zscore_polars(100000) --- 17.652728 seconds
        sum(_0)
0  4.381176e+08
zscore_polars(1000000) --- 144.548372 seconds


In [0]:
zscore_polars_coarse.run_all()

        sum(_0)
0  4.378033e+06
zscore_polars_coarse(10000) --- 1.617953 seconds
        sum(_0)
0  4.380862e+07
zscore_polars_coarse(100000) --- 4.739778 seconds
        sum(_0)
0  4.381176e+08
zscore_polars_coarse(1000000) --- 45.711359 seconds


In [0]:
sql = """
WITH 
    mean_std AS (
        SELECT 
            uid, ts,_0,_1,_2,_3,_4,_5,_6,_7,_8,_9,
            AVG(_0) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS mean_0,
            STDDEV(_0) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS std_0,
            AVG(_1) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS mean_1,
            STDDEV(_1) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS std_1,
            AVG(_2) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS mean_2,
            STDDEV(_2) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS std_2,
            AVG(_3) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS mean_3,
            STDDEV(_3) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS std_3,
            AVG(_4) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS mean_4,
            STDDEV(_4) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS std_4,
            AVG(_5) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS mean_5,
            STDDEV(_5) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS std_5,
            AVG(_6) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS mean_6,
            STDDEV(_6) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS std_6,
            AVG(_7) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS mean_7,
            STDDEV(_7) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS std_7,
            AVG(_8) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS mean_8,
            STDDEV(_8) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS std_8,
            AVG(_9) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS mean_9,
            STDDEV(_9) OVER (PARTITION BY uid ORDER BY ts ROWS BETWEEN {PERIOD} PRECEDING AND 1 PRECEDING) AS std_9,
            ROW_NUMBER() OVER (PARTITION BY uid ORDER BY ts) AS rn
        FROM parquet.`{path}`
    ),
    z AS (
        SELECT 
            uid, ts,
            abs((_0 - mean_0)/std_0) AS z_0,
            abs((_1 - mean_1)/std_1) AS z_1,
            abs((_2 - mean_2)/std_2) AS z_2,
            abs((_3 - mean_3)/std_3) AS z_3,
            abs((_4 - mean_4)/std_4) AS z_4,
            abs((_5 - mean_5)/std_5) AS z_5,
            abs((_6 - mean_6)/std_6) AS z_6,
            abs((_7 - mean_7)/std_7) AS z_7,
            abs((_8 - mean_8)/std_8) AS z_8,
            abs((_9 - mean_9)/std_9) AS z_9
        FROM mean_std
        WHERE rn>{PERIOD} AND mean_0 IS NOT NULL AND std_0 IS NOT NULL
    )
SELECT
    SUM(z_0) AS z_0,
    SUM(z_1) AS z_1,
    SUM(z_2) AS z_2,
    SUM(z_3) AS z_3,
    SUM(z_4) AS z_4,
    SUM(z_0) AS z_5,
    SUM(z_5) AS z_6,
    SUM(z_6) AS z_7,
    SUM(z_7) AS z_8,
    SUM(z_8) AS z_9
FROM z
"""

In [0]:
@spark_test
def zscore_sql(path):
    df = spark.sql(sql.format(path=path, PERIOD=PERIOD))
    print(df.toPandas())

In [0]:
zscore_sql.run_all()

            z_0           z_1           z_2           z_3           z_4  \
0  4.378033e+06  4.380733e+06  4.384808e+06  4.378741e+06  4.380608e+06   

            z_5           z_6           z_7           z_8           z_9  
0  4.378033e+06  4.380569e+06  4.383062e+06  4.380676e+06  4.382290e+06  
zscore_sql(10000) --- 21.696104 seconds
            z_0           z_1           z_2           z_3           z_4  \
0  4.380862e+07  4.381683e+07  4.381767e+07  4.382026e+07  4.379173e+07   

            z_5           z_6           z_7           z_8           z_9  
0  4.380862e+07  4.381301e+07  4.382848e+07  4.381101e+07  4.381253e+07  
zscore_sql(100000) --- 165.218836 seconds
