In [3]:
import vaex
from vaex.dataframe import DataFrame
import pyarrow as pa

from mismo.block._strings import norm_whitespace, tokenize
from mismo.datasets import load_febrl1
from mismo.util import explode_table

In [4]:
from pathlib import Path

data_path = Path("../../scg/noatak/data/interim/receipts_raw.parquet").resolve()
data_path

PosixPath('/Users/nickcrews/Documents/projects/scg/noatak/data/interim/receipts_raw.parquet')

In [5]:
donations = vaex.open(data_path)
donations

#,donation_date,donation_method,donation_amount,donor_last_business_name,donor_first_name,donor_street,donor_city,donor_state,donor_zipcode,donor_country,donor_occupation,donor_employer,election_name,election_type,election_municipality,election_office,recipient_type,recipient_name,report_year,report_date
0,9/23/2011,Payroll Deduction,$500.00,Anderson,David,412 Front St.,Ketchikan,Alaska,99901,USA,Marine Pilot,Self,--,--,--,--,Group,AK Sea Pilot PAC Fund,2011,1/17/2012
1,9/23/2011,Payroll Deduction,$500.00,Antonsen,Hans,701 Carlanna Lake Rd.,Ketchikan,Alaska,99901,USA,Marine Pilot,Self,--,--,--,--,Group,AK Sea Pilot PAC Fund,2011,1/17/2012
2,9/23/2011,Payroll Deduction,$500.00,Backen,Terrance,2417 Tongass Ave. Ste. 111-301,Ketchikan,Alaska,99901,USA,Marine Pilot,Self,--,--,--,--,Group,AK Sea Pilot PAC Fund,2011,1/17/2012
3,9/23/2011,Payroll Deduction,$500.00,Baken,Jeff,P.O. Box 8751,Ketchikan,Alaska,99901,USA,Marine Pilot,Self,--,--,--,--,Group,AK Sea Pilot PAC Fund,2011,1/17/2012
4,9/23/2011,Payroll Deduction,$500.00,Cathcart,Jim,2417 Tongass Ave. Ste. 111-185,Ketchikan,Alaska,99901,USA,Marine Pilot,Self,--,--,--,--,Group,AK Sea Pilot PAC Fund,2011,1/17/2012
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1137530,12/15/2021,Non-Monetary,$300.00,Eagley,Ronald,4333 Spenard Road,Anchorage,Alaska,99503,USA,Business Owner,Self,2023 - Anchorage Municipal Election,Anchorage Municipal,"Anchorage, City and Borough",School Board,Candidate,Dave Donley,2023,2/13/2022
1137531,1/5/2022,Check,$100.00,Brooks,Maryann,2020 Muldoon Rd # 346,Anchorage,Alaska,99504,USA,retired,retired,2023 - Anchorage Municipal Election,Anchorage Municipal,"Anchorage, City and Borough",School Board,Candidate,Dave Donley,2023,2/13/2022
1137532,1/11/2022,Check,$100.00,Rittger,J Parker,10661 Elies Dr,Anchorage,Alaska,99507,USA,self employed,self,2023 - Anchorage Municipal Election,Anchorage Municipal,"Anchorage, City and Borough",School Board,Candidate,Dave Donley,2023,2/13/2022
1137533,12/15/2021,Check,$100.00,"Rittgers Companies, LLC",--,10661 Elies Dr,Anchorage,Alaska,99507,USA,Sole owner LLC of Parker Rittgers,Sole owner LLC of Parker Rittgers,2023 - Anchorage Municipal Election,Anchorage Municipal,"Anchorage, City and Borough",School Board,Candidate,Dave Donley,2023,2/13/2022


In [6]:
df = vaex.from_arrays(
    strings=pa.array(
        ["jane's   house", "Ross' house  ", "a", "", None, "bees\tall cook"]
    )
)
df

#,strings
0,jane's house
1,Ross' house
2,a
3,
4,--
5,bees	all cook


In [7]:
from mismo.block._strings import (
    TokenFingerprinter,
    tokenize,
    SortedAcronymFingerprinter,
)
from mismo.block._blocker import check_fingerprints

# fp = SortedAcronymFingerprinter(column="strings")
fp = TokenFingerprinter(column="donor_occupation")

In [8]:
def merge_fingerprints(fp1: DataFrame, fp2: DataFrame) -> DataFrame:
    check_fingerprints(fp1)
    check_fingerprints(fp2)
    key_cols1 = [c for c in fp1.get_column_names() if c != "index"]
    key_cols2 = [c for c in fp2.get_column_names() if c != "index"]
    fp1 = fp1.copy()
    fp2 = fp2.copy()
    fp1["key"] = fp1.mismo.hash_rows(key_cols1).astype("uint32")
    fp2["key"] = fp2.mismo.hash_rows(key_cols2).astype("uint32")
    key_2_idx1: DataFrame = fp1.groupby("key").agg({"index": "list"})
    key_2_idx2: DataFrame = fp2.groupby("key").agg({"index": "list"})
    return key_2_idx1, key_2_idx2
    links = key_2_idx1.join(
        key_2_idx2, on="key", how="inner", lsuffix="_left", rsuffix="_right"
    )
    # links = links[["index_left", "index_right"]]
    # links = links.drop_duplicates()
    return links

In [9]:
fps1 = fp.fingerprint(donations.head(100))
fps2 = fp.fingerprint(donations.tail(100))

In [27]:
fps1

#,index,token
0,0,marine
1,0,pilot
2,1,marine
3,1,pilot
4,2,marine
...,...,...
162,95,artist
163,96,retire
164,97,legislator
165,98,retire


In [10]:
m1, m2 = merge_fingerprints(fps1.head(100), fps2.head(100))
m1

#,key,index
0,310667473,"'[0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 13, 14, 15,..."
1,3627299430,[54]
2,1252230475,[54]
3,931022204,[54]
4,3370723147,[54]
...,...,...
11,3137532011,[54]
12,3954775581,[54]
13,4280197019,[54]
14,2542118648,"'[0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 13, 14, 15,..."


In [31]:
fps1[fps1.index == 54]

#,index,token
0,54,distribution
1,54,of
2,54,donate
3,54,fund
4,54,to
...,...,...
11,54,from
12,54,mcadam
13,54,for
14,54,senate


In [11]:
index_left = m1[["index"]]
index_right = m2[["index"]]
m1 = m1.drop(columns=["index"])
m2 = m2.drop(columns=["index"])
m1["iloc"] = vaex.vrange(0, len(m1), dtype="uint64")
m2["iloc"] = vaex.vrange(0, len(m2), dtype="uint64")
index_left

#,index
0,"'[0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 13, 14, 15,..."
1,[54]
2,[54]
3,[54]
4,[54]
...,...
11,[54]
12,[54]
13,[54]
14,"'[0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 12, 13, 14, 15,..."


In [13]:
links = m1.join(m2, on="key", how="inner", lsuffix="_left", rsuffix="_right")
links

#,key_left,iloc_left,key_right,iloc_right
0,3627299430,1,3627299430,6
1,2698029969,9,2698029969,39
2,2109566629,10,2109566629,45


In [75]:
import pyarrow.compute as pc
from vaex.dataframe import DataFrame
from vaex.expression import Expression


def explode_table(table: pa.Table, column: str) -> pa.Table:
    """Analogous to pandas.DataFrame.explode()

    https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.explode.html
    """
    null_filled = pc.fill_null(table[column], [None])
    flattened = pc.list_flatten(null_filled)
    other_columns = list(table.schema.names)
    other_columns.remove(column)
    if len(other_columns) == 0:
        return pa.table({column: flattened})
    else:
        indices = pc.list_parent_indices(null_filled)
        result = table.select(other_columns).take(indices)
        result = result.append_column(
            pa.field(column, table.schema.field(column).type.value_type),
            flattened,
        )
        return result


def cartesian_product_chunk(left: pa.ListArray, right: pa.ListArray) -> pa.Array:
    result: pa.Table = pa.table(
        {
            "left": left,
            "right": right,
        }
    )
    result = explode_table(result, "left")
    result = explode_table(result, "right")
    # Result is a pa.Table, but when calling vaex.DataFrame.apply, our
    # function can only return a 1-D array, because the result of apply is
    # supposed to be an expression.
    # So, turn this pa.Table in a pa.StructArray. Should be minimal copying.
    # We must call .combine_chunks() because .from_arrays() expects pa.Arrays,
    # not pa.ChunkedArrays.
    x = result["left"].combine_chunks()
    y = result["right"].combine_chunks()
    return pa.StructArray.from_arrays((x, y), names=("x", "y"))


def cartesian_product(df: DataFrame, left: str, right: str) -> Expression:
    return df.apply(cartesian_product_chunk, [left, right], vectorize=True)


df = vaex.from_arrays(
    x=pa.array([[1, 2], [3, 4]]),
    y=pa.array([[11, 12], [13, 14]]),
)
df

#,x,y
0,"[1, 2]","[11, 12]"
1,"[3, 4]","[13, 14]"


In [82]:
print(df.to_pandas_df().to_markdown())

|    | x     | y       |
|---:|:------|:--------|
|  0 | [1 2] | [11 12] |
|  1 | [3 4] | [13 14] |


In [80]:
df2 = vaex.from_arrays(carted=cartesian_product(df, "x", "y").evaluate())
# Upack the 1-D array back into separate columns
df2["x"] = df2["carted"].struct.get("x")
df2["y"] = df2["carted"].struct.get("y")
df2

#,carted,x,y
0,"{'x': 1, 'y': 11}",1,11
1,"{'x': 1, 'y': 12}",1,12
2,"{'x': 2, 'y': 11}",2,11
3,"{'x': 2, 'y': 12}",2,12
4,"{'x': 3, 'y': 13}",3,13
5,"{'x': 3, 'y': 14}",3,14
6,"{'x': 4, 'y': 13}",4,13
7,"{'x': 4, 'y': 14}",4,14


In [86]:
print(df2[["x", "y"]].to_pandas_df().to_markdown())

|    |   x |   y |
|---:|----:|----:|
|  0 |   1 |  11 |
|  1 |   1 |  12 |
|  2 |   2 |  11 |
|  3 |   2 |  12 |
|  4 |   3 |  13 |
|  5 |   3 |  14 |
|  6 |   4 |  13 |
|  7 |   4 |  14 |


In [51]:
def _cross_join_chunk(iloc_left, iloc_right):
    idxs_left: DataFrame = index_left.take(iloc_left)
    idxs_right: DataFrame = index_right.take(iloc_right)
    result: pa.Table = pa.table(
        {
            "index_left": idxs_left["index"].values,
            "index_right": idxs_right["index"].values,
        }
    )
    result = explode_table(result, "index_left")
    result = explode_table(result, "index_right")
    # Result is a pa.Table, but when calling vaex.DataFrame.apply, our
    # function can only return a 1-D array, because the result of apply is
    # suppoesed to be an expression.
    # So, turn this pa.Table in a pa.StructArray. Should be minimal copying.
    # must call .combine_chunks() because .from_arrays() expects pa.Arrays,
    # not pa.ChunkedArrays.
    x = result["index_left"].combine_chunks()
    y = result["index_right"].combine_chunks()
    return pa.StructArray.from_arrays((x, y), names=("x", "y"))

In [52]:
links_exp = links.apply(
    _cross_join_chunk,
    [links.iloc_left, links.iloc_right],
    vectorize=True,
    multiprocessing=False,
)

In [56]:
links_df = vaex.from_arrays(links=links_exp.evaluate())
links_df

#,links
0,"{'x': 54, 'y': 62}"
1,"{'x': 53, 'y': 10}"
2,"{'x': 53, 'y': 15}"
3,"{'x': 53, 'y': 17}"
4,"{'x': 53, 'y': 19}"
...,...
23,"{'x': 9, 'y': 20}"
24,"{'x': 11, 'y': 20}"
25,"{'x': 35, 'y': 20}"
26,"{'x': 39, 'y': 20}"


In [62]:
links_df.plot()



TypeError: eval() arg 1 must be a string, bytes or code object

<Figure size 432x288 with 0 Axes>

In [65]:
import numpy as np

x = [1.1, np.nan, np.nan, 4.4, 5.5]
y = ["dog", "dog", None, "cat", None]
df = vaex.from_arrays(x=x, y=y)

In [68]:
df.dropna()

#,x,y
0,1.1,dog
1,4.4,cat


In [57]:
links["links"] = links_exp
links

#,key_left,iloc_left,key_right,iloc_right,links
0,3627299430,1,3627299430,6,"{'x': 54, 'y': 62}"
1,2698029969,9,2698029969,39,"{'x': 53, 'y': 10}"
2,2109566629,10,2109566629,45,"{'x': 53, 'y': 15}"


In [58]:
links.drop_filter()

#,key_left,iloc_left,key_right,iloc_right,links
0,3627299430,1,3627299430,6,"{'x': 54, 'y': 62}"
1,2698029969,9,2698029969,39,"{'x': 53, 'y': 10}"
2,2109566629,10,2109566629,45,"{'x': 53, 'y': 15}"


In [5]:
from mismo.block import Equals, FingerprintBlocker

In [24]:
b = FingerprintBlocker(Equals("surname"))
b

<mismo.block.blocker.FingerprintBlocker at 0x1314609a0>

In [27]:
block_map = b.block(df, df)
block_map



Unnamed: 0,index_1,index_2
0,0,0
1,1,1
2,1,46
3,1,222
4,1,258
...,...,...
4715,998,998
4716,999,127
4717,999,536
4718,999,594


In [1]:
from mismo.block._blocker import set_cover

In [2]:
import pandas as pd

In [4]:
universe = pd.Series([0, 1, 2, 3, 4])
s0 = pd.Series([0, 1])  # this chosen first
s1 = pd.Series([0])  # shouldn't get chosen
s3 = pd.Series([2, 5])  # this chosen second
s2 = pd.Series([2, 5, 6])  # this not chosen
set_cover(universe, [s0, s1, s2, s3])

[0, 2]