In [0]:
import pandas as pd
import iarray_spark as iaspark
import iarray as ia
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

from time import time

In [0]:
shape = (512 * 1024, 1024)  # 4 GB
chunks = (64 * 1024, 64)  # 32 MB
blocks = (4 * 1024, 16)  # 256 KB

# shape = (1024, 1024)
# chunks = (1024, 512)
# blocks = (1024, 256)

dtype = np.float64

urlpath_a = "/iaspark_a.iarr"
urlpath_dbfs_a = "/dbfs/iaspark_a.iarr"


urlpath_parquet_a = "/iaspark_a.parquet"
urlpath_parquet_dbfs_a = "/dbfs/iaspark_a.parquet"

ia.set_config_defaults(chunks=chunks, blocks=blocks, btune=False, codec=ia.Codec.LZ4, dtype=dtype)

axis = 0

In [0]:
import shutil

try:
    ia.remove_urlpath(urlpath_a)
except:
    pass

try:
    shutil.rmtree(urlpath_parquet_a)
except:
    pass

a = ia.random.uniform(shape, fp_mantissa_bits=4, urlpath=urlpath_a)
_ = shutil.copy(urlpath_a, urlpath_dbfs_a)

df = pd.DataFrame(a.data, columns=[str(i) for i in range(shape[1])])
pa_table = pa.Table.from_pandas(df)
_ = pq.write_table(pa_table, row_group_size=chunks[0], where=urlpath_parquet_a)

_ = shutil.copy(urlpath_parquet_a, urlpath_parquet_dbfs_a)


In [0]:
del a
del df
del pa_table

In [0]:
import os

a_size = os.stat(urlpath_dbfs_a).st_size
a_parquet_size = os.stat(urlpath_parquet_dbfs_a).st_size

size = np.prod(shape) * np.dtype(dtype).itemsize

print(f"{urlpath_dbfs_a}: {a_size / 1024 ** 3:.4f} GB ({size / a_size:.2f}x)")
print(f"{urlpath_parquet_dbfs_a}: {a_parquet_size / 1024 ** 3:.4f} GB ({size / a_parquet_size:.2f}x)")

In [0]:
n = 3

In [0]:
import pyspark.sql.functions as f

t_parquet = np.empty(n, dtype=np.float64)

for i in range(n):
    t0 = time()
    df_spark = spark.read.parquet(urlpath_parquet_a).repartition(16)
    df_spark_out = df_spark.agg(*[f.avg(c).alias(c) for c in df_spark.columns]).toPandas()
    t1 = time()
    t_parquet[i] = t1 - t0

print(f"time parquet (pyspark.DataFrame): {t_parquet[:].mean():.2f} \u00B1 {t_parquet[1:].std():.2f} s")

In [0]:
t_parquet

In [0]:

t_iarray = np.empty(n, dtype=np.float64)

for i in range(n):
    t0 = time()
    out_iarray = iaspark.mean(urlpath_dbfs_a, axis=axis, spark_context=sc)
    t1 = time()
    t_iarray[i] = t1 - t0

print(f"time iarray: {t_iarray[:].mean():.2f} \u00B1 {t_iarray[1:].std():.2f} s")


In [0]:
t_iarray

In [0]:
out_np = out_iarray.data
out_spark_np = df_spark_out.to_numpy().reshape(-1)

np.testing.assert_almost_equal(out_np, out_spark_np)