# h2o groupby - query 4

In [5]:
import pandas as pd
import polars as pl

## Polars 1e9 Parquet

In [3]:
%%time

(
    pl.scan_parquet("~/data/G1_1e9_1e2_0_0.parquet")
    .groupby("id4")
    .agg(
        [
            pl.mean("v1").alias("v1_mean"),
            pl.mean("v2").alias("v2_mean"),
            pl.mean("v3").alias("v3_mean"),
        ]
    )
).collect(streaming=True)

CPU times: user 30.3 s, sys: 17.1 s, total: 47.4 s
Wall time: 11.5 s


id4,v1_mean,v2_mean,v3_mean
i64,f64,f64,f64
12,2.99984,8.00007,49.99989
60,2.999751,7.998735,49.991873
24,2.999709,8.00059,49.990037
72,2.999663,8.002505,49.997393
67,2.999492,7.999422,50.005143
84,2.999165,7.998618,49.992159
36,2.999718,7.999917,50.005492
5,2.999809,7.999542,49.993548
48,2.999959,7.999087,49.998138
91,3.000607,7.998143,50.013529


## pandas 1e9

In [7]:
%%time

(
    pd.read_parquet("~/data/G1_1e9_1e2_0_0.parquet", columns=["id4", "v1", "v2", "v3"])
    .groupby("id4", as_index=False, sort=False, observed=True, dropna=False)
    .agg({"v1": "mean", "v2": "mean", "v3": "mean"})
)

CPU times: user 32.9 s, sys: 46.8 s, total: 1min 19s
Wall time: 53.7 s


Unnamed: 0,id4,v1,v2,v3
0,31,3.000027,8.000086,50.005195
1,32,2.999118,8.000395,50.003075
2,52,3.000156,8.000799,50.000750
3,37,2.999983,8.000249,50.008905
4,94,2.999754,7.999775,50.009630
...,...,...,...,...
95,27,2.999891,8.002250,49.995202
96,100,2.999844,8.000531,49.988959
97,72,2.999663,8.002505,49.997393
98,83,2.999361,7.998659,49.995630


## datafusion 1e9

In [8]:
import pathlib

from datafusion import SessionContext

In [9]:
# Create a DataFusion context
ctx = SessionContext()

In [10]:
ctx.register_parquet("x", f"{pathlib.Path.home()}/data/G1_1e9_1e2_0_0.parquet")

In [11]:
%%time

res = ctx.sql(
    "select id4, mean(v1) as v1, mean(v2) as v2, mean(v3) as v3 from x group by id4"
)
print(res)

DataFrame()
+-----+--------------------+-------------------+--------------------+
| id4 | v1                 | v2                | v3                 |
+-----+--------------------+-------------------+--------------------+
| 51  | 2.9998978856120826 | 8.000389554888283 | 49.99169947291488  |
| 94  | 2.999753562664744  | 7.999774665861878 | 50.00962994793638  |
| 64  | 2.99973078998151   | 8.001908353275093 | 49.99400574534753  |
| 28  | 2.9998600201990855 | 7.999258307026296 | 49.99759693434262  |
| 54  | 2.999831937917867  | 8.000056820989673 | 50.00737585565297  |
| 48  | 2.9999585949899936 | 7.999086989525733 | 49.99813764497906  |
| 42  | 3.0005378493745725 | 8.003788547788687 | 50.006665325973735 |
| 24  | 2.9997089870710685 | 8.000589723554713 | 49.990037046851555 |
| 75  | 2.9989431071170487 | 7.997714182838368 | 50.01580391849532  |
| 12  | 2.999839676400366  | 8.00007011032024  | 49.999889884748356 |
+-----+--------------------+-------------------+--------------------+
CPU time

## Spark 1e9

In [1]:
from pathlib import Path

import delta
from delta import *
from pyspark.sql import functions as F

import pyspark

In [2]:
builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.executor.memory", "10G")
    .config("spark.driver.memory", "25G")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/Users/matthew.powers/opt/miniconda3/envs/pyspark-340-delta-240/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/matthew.powers/.ivy2/cache
The jars for the packages stored in: /Users/matthew.powers/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cc68e7ab-1246-48b1-8189-c19e7e19b161;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 108ms :: artifacts dl 3ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default    

In [4]:
spark.read.format("delta").load(
    f"{Path.home()}/data/delta/G1_1e9_1e2_0_0"
).createOrReplaceTempView("x0")

In [5]:
%%time

spark.sql(
    "select id4, mean(v1) as v1, mean(v2) as v2, mean(v3) as v3 from x0 group by id4"
).show()

23/06/30 08:13:02 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+---+------------------+------------------+------------------+
|id4|                v1|                v2|                v3|
+---+------------------+------------------+------------------+
| 31|  3.00002710005962| 8.000085500188101|50.005194701907875|
| 85|2.9997569215610573| 7.999475546519024| 50.00824121783243|
| 65| 3.000018497648949| 7.999512062016918| 49.99618929804722|
| 53|2.9999569305190343| 8.002265294812096|49.980730604806816|
| 78| 3.000358751285417| 7.999743191703941|  50.0013464052305|
| 34| 3.000038691863101| 7.999651073379268|49.990471598797704|
| 81| 2.999849930119025| 7.999589682350752| 49.99841869137996|
| 28|2.9998600201990855| 7.999258307026296|49.997596934342674|
| 76|3.0002825900331844|7.9988081202671175| 50.00206271448731|
| 27|2.9998909979071597|  8.00225034320659|49.995202221886636|
| 26|2.9996000727467673|  8.00324590956905|49.992869292253644|
| 44|3.0004836689000896| 7.999020962952082|50.003574675633864|
| 12| 2.999839676400366|  8.00007011032024|49.999889884

                                                                                