In [1]:
from itertools import product
import numpy as np
from pathlib import Path

from dask import delayed
import dask.dataframe as dd
import geopandas as gpd
import h3pandas
import pandas as pd

In [2]:
input_dir = Path('../data/output')
output_dir = Path('../data/output-joining')

d_output = output_dir / 'raster_combined-discrete.gpkg'
c_output = output_dir / 'raster_combined-continuous.gpkg'

d_files = list(input_dir.glob('discrete*.parquet'))
c_files = list(input_dir.glob('continuous*.parquet'))

N = 10000

In [3]:
@np.vectorize
def is_prime(n):
    if np.isnan(n):
        return False
    if n % 2 == 0 and n > 2:
        return False
    return all(n % i for i in range(3, int(np.sqrt(n)) + 1, 2))

@np.vectorize
def is_polygonal(s, x):
    if np.isnan(x):
        return 0
    assert s > 2 and s % 1 == 0 and x % 1 == 0
    n = (np.sqrt(8 * (s - 2) * x + (s - 4) ** 2) + (s - 4)) / (2 * (s - 2))
    return n % 1 == 0

@np.vectorize
def is_fibonacci(n):
    a, b = 0,1
    while a < n:
        a, b = b, a + b
    return a == n

@np.vectorize
def is_perfect(n):
    sum = 1
    i = 2
    while i * i <= n:
        if n % i == 0:
            sum = sum + i + n/i
        i += 1
    return sum == n and n != 1

def is_triangular(n):
    return is_polygonal(3, n)

def is_rectangular(n):
    return is_polygonal(4, n)

def is_pentagonal(n):
    return is_polygonal(5, n)

def is_hexagonal(n):
    return is_polygonal(6, n)

funcs = [
    is_prime,
    is_triangular, is_rectangular, is_pentagonal, is_hexagonal,
    is_fibonacci,
    is_perfect
]


In [4]:
@delayed
def read_and_label(filename, label, meta_type=np.int32):
    df = pd.read_parquet(filename)
    df['value'] = df['value'].astype(meta_type)
    df = df.rename(columns={"value": label})
    return df

@delayed
def join(dfs):
    joined_df = dfs[0]
    for df in dfs[1:]:
        joined_df = joined_df.merge(df, on='h3_12', how='outer')
    return joined_df

def classify(df, scale=1):
    series = []
    func_col_combos = list(product(funcs, df.columns))
    for func, col in func_col_combos:
        series.append((df[col]*scale).apply(np.floor).apply(func).astype(int))
    return pd.concat(series, axis=1).sum(axis=1).to_frame()

In [5]:
%%time
# Read, join and classify discrete parquet data
df = join([read_and_label(file, label, meta_type=np.int32) for file, label in zip(d_files[:N], range(0, N))]).compute()
df = df.fillna(value=0).astype(np.int32)
    
classify(df).rename(columns={0: 'satisfaction'}).h3.h3_to_geo_boundary().to_file(d_output, driver='GPKG', mode='w')

CPU times: user 3min 18s, sys: 3.34 s, total: 3min 21s
Wall time: 3min 22s


In [6]:
%%time
df = join([read_and_label(file, label, meta_type=np.float32) for file, label in zip(c_files[:N], range(0, N))]).compute()
df = df.fillna(value=0.0).astype(np.float32)

classify(df, scale=100).rename(columns={0: 'satisfaction'}).h3.h3_to_geo_boundary().to_file(c_output, driver='GPKG', mode='w')

CPU times: user 3min 30s, sys: 976 ms, total: 3min 31s
Wall time: 3min 32s
