In [1]:
from pyarrow import parquet as pq
from pyarrow import json as pajson
import pyarrow as pa
from pathlib import Path
from pyarrow import compute as pc


In [2]:
len(set([*map(lambda x: x.rsplit("/", 1)[0], pq.read_table("parquet_files/1010000.parquet")['ids'].to_pylist())]))

438

In [3]:
len([*Path("parquet_files/").glob("*.parquet")])

328

In [4]:
for p in Path("parquet_files/").glob("*.parquet"):
    f = pq.read_table(p)
    break

In [5]:
pq.write_table(f, "test0.parquet", compression="zstd", compression_level=5)
pq.write_table(f.sort_by("ids"), "test1.parquet", compression="zstd", compression_level=5)

In [6]:
ls -lS test*.parquet

-rw-r--r--  1 benschmidt  staff  22267452 Apr 20 00:33 test0.parquet
-rw-r--r--  1 benschmidt  staff  22265343 Apr 20 00:33 test1.parquet


In [8]:
meta = pajson.read_json("metadata/all.json")
papers = pc.extract_regex(meta['dc:title'], "(?P<match>.*) - ").flatten()[0]
meta = meta.append_column("newspaper", papers)

In [9]:
#matches['@id']
import polars as pl
parq = Path("parquet_files/10000.parquet")


pyarrow.Table
dc:identifier: string
dc:language: string
dc:source: string
dc:subject: string
dc:title: string
dc:type: string
dc:extent: string
dc:isPartOf: string
dc:issued: string
dc:spatial: string
@id: string
dc:relation: string
dc:hasPart: string
newspaper: string
----
dc:identifier: [["http://data.theeuropeanlibrary.org/BibliographicResource/3000118999474"]]
dc:language: [["deu"]]
dc:source: [[null]]
dc:subject: [["http://d-nb.info/gnd/4018202-2"]]
dc:title: [["Frauenblätter"]]
dc:type: [["http://data.europeana.eu/concept/base/18"]]
dc:extent: [[null]]
dc:isPartOf: [["http://data.theeuropeanlibrary.org/Collection/a0600"]]
dc:issued: [["1871-12-16 - 1872-12-15"]]
dc:spatial: [["http://d-nb.info/gnd/4021912-4"]]
...

In [76]:
for parq in Path("parquet_files/").glob("*.parquet"):
    print(parq, end = "\r")
    ids = pq.read_table(parq, columns = ['ids', 'pages'])
    paper_ids = pc.replace_substring_regex(ids['ids'], "/[\d]+$", "")
    pages = pc.replace_substring_regex(ids['ids'], ".*/([\d]+)$", r"\1").cast(pa.int32())
    all_data = pa.table([*ids, paper_ids, pages], names = ['@id', 'nc:text', 'newspaper_id', 'page'])
    meta_arrow = pl.from_arrow(meta)
    plarrow = pl.from_arrow(all_data).join(meta_arrow, left_on=['newspaper_id'], right_on = ['@id'])
    plarrow = plarrow.drop('dc:issued').with_column(plarrow['dc:issued'].str.strptime(pl.Date, "%Y-%m-%d"))
    joint = plarrow.to_arrow()

    for paper in pc.unique(joint['newspaper']):
        fout = Path(f"./papers/{paper}/{parq.stem}.parquet")
        Path(fout).parent.mkdir(exist_ok=True, parents = True)
        Path(fout).touch()
        matches = joint.filter(pc.equal(joint['newspaper'], paper))
        if len(matches):
            pq.write_table(matches, fout)
    #    print(group)

parquet_files/805000.parquett

In [77]:
for folder in Path("papers").glob("*"):
    print(folder, end = "\r")
    tables = []
    for p in folder.glob("*.parquet"):
        tables.append(pq.read_table(p))
        p.unlink()
    if len(tables) == 0:
        continue
    tabs = pa.concat_tables(tables)
    folder.with_suffix(".parquet").touch()
    pq.write_table(tabs.sort_by('dc:issued'), folder.with_suffix(".parquet"), compression = "zstd", compression_level=10)
    folder.rmdir()


papers/Österreichische Buchdrucker-Zeitung.parquetchen Kaiserstaat.parquetē, philologikē te emporikē ephēmeris

In [145]:
for p in Path("papers").glob("*.parquet"):
    size = p.stat().st_size / 1024 / 1024
    if size > 100:
        print(p)
        r = pq.read_table(p)
        n_chunks = int((size // 100) + 1)
        #nb relies on being sorted by date; must include 
        cutoffs = [r['dc:issued'][i * int(len(r) // n_chunks)] for i in range(n_chunks)]
        cutoffs.append(r['dc:issued'][len(r) - 1])
        print(len(cutoffs))
        for i in range(len(cutoffs) - 1):
            dates = f"{cutoffs[i].as_py()}--{cutoffs[i + 1].as_py()}"
            fout = str(p.with_suffix("")) + f"-{dates}.parquet"
            d = r['dc:issued']
            less = pc.less
            if i == len(cutoffs) - 1:
                less = pc.less_equal # Last chunk must include last date.
            subset = r.filter(pc.and_(pc.greater_equal(d, cutoffs[i]), less(d, cutoffs[i + 1])))
            Path(fout).touch()
            pq.write_table(subset, fout, compression = "zstd", compression_level = 10)


papers/Wiener Zeitung.parquet
13
papers/Klagenfurter Zeitung.parquet
3
papers/Neue Freie Presse.parquet
4
papers/Neues Fremden-Blatt.parquet
3
papers/Die Presse.parquet
6
papers/Das Vaterland.parquet
3
papers/Morgen-Post.parquet
3
papers/Fremden-Blatt.parquet
4


In [139]:
p = Path("papers/Wiener Zeitung.parquet")

for i in range(len(cutoffs) - 1):
    cutoffs = [r['dc:issued'][i * int(len(r) // (size // 100))] for i in range(int(size // 100))]
    cutoffs.append(r['dc:issued'][len(r) - 1])
    dates = f"{cutoffs[i].as_py()}--{cutoffs[i + 1].as_py()}"
    fout = str(p.with_suffix("")) + f"-{dates}.parquet"
    d = r['dc:issued']
    less = pc.less
    if i == len(cutoffs) - 1:
        less = pc.less_equal
    subset = r.filter(pc.and_(pc.greater_equal(d, cutoffs[i]), less(d, cutoffs[i + 1])))
    Path(fout).touch()
    pq.write_table(subset, fout)


In [142]:
size = p.stat().st_size / 1024 / 1024
p.stat().st_size / 1024 / 1024
cutoffs = [r['dc:issued'][i * int(len(r) // (size // 100))] for i in range(int(size // 100))]
cutoffs

[<pyarrow.Date32Scalar: datetime.date(1847, 7, 2)>,
 <pyarrow.Date32Scalar: datetime.date(1853, 12, 21)>,
 <pyarrow.Date32Scalar: datetime.date(1857, 5, 12)>,
 <pyarrow.Date32Scalar: datetime.date(1860, 1, 11)>,
 <pyarrow.Date32Scalar: datetime.date(1862, 4, 18)>,
 <pyarrow.Date32Scalar: datetime.date(1864, 3, 13)>,
 <pyarrow.Date32Scalar: datetime.date(1866, 1, 5)>,
 <pyarrow.Date32Scalar: datetime.date(1867, 8, 5)>,
 <pyarrow.Date32Scalar: datetime.date(1869, 4, 18)>,
 <pyarrow.Date32Scalar: datetime.date(1870, 9, 28)>,
 <pyarrow.Date32Scalar: datetime.date(1872, 3, 3)>]

In [121]:
i = 0

In [152]:
pc.extract_regex(pa.array(["foo bar, is the thing"]), r"(?P<words>(?P<word>\w+)+)")

<pyarrow.lib.StructArray object at 0x1108bd900>
-- is_valid: all not null
-- child 0 type: string
  [
    "foo"
  ]
-- child 1 type: string
  [
    "foo"
  ]