In [1]:
import fastparquet
import numpy as np

In [None]:
pf = fastparquet.ParquetFile("metadetect_desdmv4_cutsv3_jk")

In [None]:
pf.columns

In [None]:
%%time

dd = pf.to_pandas(
    columns=["ra", "dec", "gauss_g_1", "gauss_g_2"],
    filters=[[("mdet_step", "==", "noshear")]],
    index=False,
).to_records()

dd = np.array(dd, dtype=dd.dtype.descr)

In [None]:
dd.shape

In [2]:
pf = fastparquet.ParquetFile("metadetect_desdmv4_cutsv3.parq")

In [None]:
%%time

tot = 0
for dd in pf.iter_row_groups(columns=["mdet_step"]):
    tot += np.sum(dd["mdet_step"] == "noshear")
    print(tot)

144623
289246
433869
578492
723115
867738
1012361
1156984
1301607
1446230
1590853
1735476
1880099
2024722
2169345
2313968
2458591
2603214
2747837
2892460
3037083
3181706
3326329
3470952
3615575
3760198
3904821
4049444
4194067
4338690
4483313
4627936
4772559
4917182
5061805
5206428
5351051
5495674
5640297
5784920
5929543
6074166
6218789
6363412
6508035
6652658
6797281
6941904
7086527
7231150
7375773
7520396
7665019
7809642
7954265
8098888
8243511
8388134
8532757
8677380
8822003
8966626
9111249
9255872
9400495
9545118
9689741
9834364
9978987
10123610
10268233
10412856
10557479
10702102
10846725
10991348
11135971
11280594
11425217
11569840
11714463
11859086
12003709
12148332
12292955
12437578
12582201
12726824
12871447
13016070
13160693
13305316
13449939
13594562
13739185
13883808
14028431
14173054
14317677
14462300
14606923
14751546
14896169
15040792
15185415
15330038
15474661
15619284
15763907
15908530
16053153
16197776
16342399
16487022
16631645
16776268
16920891
17065514
17210137
1735

In [None]:
%%time

dd = pf.to_pandas(
    columns=["ra", "dec", "gauss_g_1", "gauss_g_2"],
    filters=[[("mdet_step", "==", "noshear"), ("patch_num", "==", 20)]],
    index=False,
).to_records()

dd = np.array(dd, dtype=dd.dtype.descr)

## much of the time is conversion out of pandas

In [None]:
%%time

dd = pf.to_pandas(
    columns=["ra", "dec", "gauss_g_1", "gauss_g_2"],
    filters=[[("mdet_step", "==", "noshear")]],
    index=False,
)

In [None]:
%%time

dd = pf.to_pandas(
    columns=["ra", "dec", "gauss_g_1", "gauss_g_2"],
    filters=[[("mdet_step", "==", "noshear"), ("patch_num", "==", 20)]],
    index=False,
)

## try astropy

In [None]:
from astropy.table import Table

In [None]:
tb = Table.read(
    "metadetect_desdmv4_cutsv3.parq", 
    format="parquet",
    filters=[[("mdet_step", "==", "noshear")]],
    include_names=["ra", "dec", "gauss_g_1", "gauss_g_2"],
)

In [2]:
import os
import numpy as np

import pyarrow as pa
from pyarrow import parquet, dataset


def write_numparquet(filename, recarray, clobber=False):
    """
    Write a numpy recarray in parquet format.

    Parameters
    ----------
    filename : `str`
        Output filename.
    recarray : `np.ndarray`
        Numpy recarray to output
    """
    if os.path.exists(filename):
        raise NotImplementedError("No clobbering yet.")

    if not isinstance(recarray, np.ndarray):
        raise ValueError("Input recarray is not a numpy recarray.")

    if recarray.dtype.names is None:
        raise ValueError("Input recarray is not a numpy recarray.")

    columns = recarray.dtype.names

    metadata = {}
    for col in columns:
        # Special-case string types to record the length
        if recarray[col].dtype.type is np.str_:
            metadata[f'recarray::strlen::{col}'] = str(recarray[col].dtype.itemsize//4)

    type_list = [(name, pa.from_numpy_dtype(recarray[name].dtype.type))
                 for name in recarray.dtype.names]
    schema = pa.schema(type_list, metadata=metadata)

    with parquet.ParquetWriter(filename, schema) as writer:
        arrays = [pa.array(recarray[name])
                  for name in recarray.dtype.names]
        pa_table = pa.Table.from_arrays(arrays, schema=schema)

        writer.write_table(pa_table)


def read_numparquet(filename, columns=None, filter=None):
    """
    Read a numpy recarray in parquet format.

    Parameters
    ----------
    filename : `str`
        Input filename
    columns : `list` [`str`], optional
        Name of columns to read.
    filter : `expression thing`, optional
        Pyarrow filter expression to filter rows.

    Returns
    -------
    recarray : `np.ndarray`
    """
    ds = dataset.dataset(filename, format='parquet', partitioning='hive')

    schema = ds.schema

    # Convert from bytes to strings
    md = {key.decode(): schema.metadata[key].decode()
          for key in schema.metadata}

    names = schema.names

    if columns is not None:
        names = [name for name in schema.names
                 if name in columns]

        if names == []:
            # Should this raise instead?
            return np.zeros(0)
    else:
        names = schema.names

    dtype = []
    for name in names:
        if schema.field(name).type == pa.string():
            dtype.append('U%d' % (int(md[f'recarray::strlen::{name}'])))
        else:
            dtype.append(schema.field(name).type.to_pandas_dtype())

    pa_table = ds.to_table(columns=names, filter=filter)
    data = np.zeros(pa_table.num_rows, dtype=list(zip(names, dtype)))

    for name in names:
        data[name][:] = pa_table[name].to_numpy()

    return data

In [3]:
import pyarrow.compute as pc

In [4]:
%%time

dd = read_numparquet(
    "metadetect_desdmv4_cutsv3_jk", 
    filter=(pc.field("mdet_step") == pc.scalar("noshear")),  # & (pc.field("patch_num") == pc.scalar(20)),
    columns=["ra", "dec", "gauss_g_1", "gauss_g_2"],
)

CPU times: user 5.25 s, sys: 16.2 s, total: 21.4 s
Wall time: 27.3 s


In [5]:
dd.shape

(148852455,)

In [None]:
148852464