# Distributed probability map query testing

This notebook contains a prototype for the impact map using the scaler and the pointer:

Basically the idea would be to query the sourcing location data using the clien filters, so for each georegion we will need to do the following:

    distributed impact = (Value *  H3 prod values (pointer))/ Scaler

### TESTED OPTIONS   
 1. Fully dynamic query (original query)
 2. Fully dynamic query grouping indicator records with the same georegion and h3dataid first
 3. Fully expanded materialized view of the h3data for each indicator record
 4. Materialized view of h3data for unique georegion and h3dataid pairs
 5. Materialized view of h3data for unique georegion and h3dataid pairs with an index

All queries for one inidicator and year but no filters otherwise.

### RESULTS
 1. Query: ~**30s**. Less than ideal as will scale with number of sourcing locations
 2. Query: ~**10s**. Still too slow for dynamic querying? Will scale sub-linearly with number of sourcing locations
 3. Create view: ~**30s**. Query: ~**10s**. In this case I limited the materialized view to only one indicator and year since it otherwise ran me out of disk space and crashed PG+Docker. Mulitply by #indicators * #years(!).
 4. Create view: ~**10s**. Query: ~**2s**. Probably as good as we can get.
 5. Create view: ~**11s**. Query: ~**2s**. Same as above. PG decided not to use any index I created since the table is essentially the same.

### OTHER OPTIONS:
 - Use #2 but also cache the results of the filtered query in a cache table (not mat.view)
 


In [1]:
#import libraries
from psycopg2.pool import ThreadedConnectionPool

In [2]:
## env file for gcs upload
env_path = ".env"
with open(env_path) as f:
    env = {}
    for line in f:
        env_key, _val = line.split("=", 1)
        env_value = _val.split("\n")[0]
        env[env_key] = env_value

In [3]:
# conect to ddbb
postgres_thread_pool = ThreadedConnectionPool(1, 50,
                                              host=env['API_POSTGRES_HOST'],
                                              port=env['API_POSTGRES_PORT'],
                                              user=env['API_POSTGRES_USERNAME'],
                                              password=env['API_POSTGRES_PASSWORD']
                                              )
#get list of sourcing records to iterate:
conn = postgres_thread_pool.getconn()
cursor = conn.cursor()

In [4]:
def psql(query):
    try:
        cursor.execute(query)
        return cursor.fetchall()
    except Exception as e:
        conn.rollback()
        print(e)

In [5]:
## SELECT FIRST THE GEOREGION
SQL_GET_H3_UNCOMPACT_GEO_REGION = """
CREATE OR REPLACE FUNCTION get_h3_uncompact_geo_region(geo_region_id uuid, h3_resolution int)
RETURNS TABLE (h3index h3index) AS 
$$
    SELECT h3_uncompact(geo_region."h3Compact"::h3index[], h3_resolution) h3index
    FROM geo_region WHERE geo_region.id = geo_region_id
$$ 
LANGUAGE SQL;
"""

SQL_GET_H3_MATERIAL_TABLE_COLUMN = """
CREATE OR REPLACE FUNCTION get_h3_material_table_column(h3DataId uuid)
RETURNS TABLE (h3_table_name varchar, h3_column_name varchar) AS
$$
    SELECT h3_data."h3tableName", h3_data."h3columnName"
    FROM h3_data
    WHERE h3_data.id = h3DataId 
    LIMIT 1;
$$
LANGUAGE SQL;
"""

SQL_GET_H3_DATA_OVER_GEO_REGION = SQL_GET_H3_MATERIAL_TABLE_COLUMN+SQL_GET_H3_UNCOMPACT_GEO_REGION+"""
CREATE OR REPLACE FUNCTION get_h3_data_over_georegion(
    geo_region_id uuid, 
    h3DataId uuid
)
RETURNS TABLE (h3index h3index, value float) AS
$$
    DECLARE
        material_h3_table_name varchar;
        material_h3_column_name varchar;
        h3_resolution integer := 6;
        value float;

    BEGIN
        -- Get h3data table name and column name for given material
        SELECT * INTO material_h3_table_name, material_h3_column_name
        FROM get_h3_material_table_column(h3DataId);

        -- Sum table column over region
        RETURN QUERY EXECUTE format(
            'SELECT 
                h3grid.h3index,
                h3grid.%I::float
                FROM
                    get_h3_uncompact_geo_region($1, $2) geo_region
                    INNER JOIN %I h3grid ON h3grid.h3index = geo_region.h3index
            WHERE h3grid.%I > 0
            ', material_h3_column_name, material_h3_table_name, material_h3_column_name)
            USING geo_region_id, h3_resolution;
    END;
$$
LANGUAGE plpgsql;
"""

# CREATE FUNCTIONS
psql(
    SQL_GET_H3_DATA_OVER_GEO_REGION+\
    SQL_GET_H3_UNCOMPACT_GEO_REGION+\
"""COMMIT;""")


no results to fetch


In [6]:
#filters that can be provided by the client
indicator_id = 'e2c00251-fe31-4330-8c38-604535d795dc'
filters = ''
year = 2015

# 1. Fully dynamic query

In [7]:
DYNAMIC_QUERY = f"""
    SELECT
        h3data.h3index,
        sum(h3data.value)
    FROM sourcing_location sl
    INNER JOIN sourcing_records sr ON sr."sourcingLocationId" = sl.id 
    INNER JOIN indicator_record ir ON ir."sourcingRecordId" = sr.id,
    LATERAL (
        SELECT
            h3index,
            ir.value/ir.scaler * value as value
        FROM get_h3_data_over_georegion(sl."geoRegionId", ir."materialH3DataId")
    ) h3data
    WHERE sr.year = {year}
        AND ir."indicatorId" = '{indicator_id}'
        AND ir.scaler > 0
        {filters}
GROUP by h3data.h3index"""
psql("EXPLAIN ANALYZE " + DYNAMIC_QUERY)

[('HashAggregate  (cost=74827.58..74829.58 rows=200 width=16) (actual time=30010.030..33328.922 rows=1034650 loops=1)',),
 ('  Group Key: get_h3_data_over_georegion.h3index',),
 ('  Batches: 69  Memory Usage: 4265kB  Disk Usage: 329656kB',),
 ('  ->  Nested Loop  (cost=850.26..51417.58 rows=2341000 width=32) (actual time=11.679..26551.331 rows=6592617 loops=1)',),
 ('        ->  Hash Join  (cost=850.01..4597.33 rows=2341 width=48) (actual time=7.047..52.692 rows=2320 loops=1)',),
 ('              Hash Cond: (sr."sourcingLocationId" = sl.id)',),
 ('              ->  Hash Join  (cost=692.40..4433.57 rows=2341 width=48) (actual time=5.355..48.591 rows=2320 loops=1)',),
 ('                    Hash Cond: (ir."sourcingRecordId" = sr.id)',),
 ('                    ->  Seq Scan on indicator_record ir  (cost=0.00..3673.56 rows=25751 width=48) (actual time=0.034..32.060 rows=25520 loops=1)',),
 ('                          Filter: ((scaler > \'0\'::double precision) AND ("indicatorId" = \'e2c0025

# 2. Dynamic query combining records with the same geoRegion and h3DataId first

In [8]:
DYNAMIC_QUERY_GROUPED_GEOREGION_H3DATA = f"""
SELECT
    h3data.h3index,
    sum(h3data.value)
FROM (
    SELECT
        sl."geoRegionId", 
        ir."materialH3DataId" as "h3DataId", 
        sum(ir.value/ir.scaler) as scaled_value
    FROM sourcing_location sl
    INNER JOIN sourcing_records sr ON sr."sourcingLocationId" = sl.id 
    INNER JOIN indicator_record ir ON ir."sourcingRecordId" = sr.id
    WHERE sr.year = {year}
        AND ir."indicatorId" = '{indicator_id}'
        AND ir.scaler > 0
        {filters}
    GROUP BY sl."geoRegionId", "h3DataId"
) reduced,
LATERAL (
    SELECT
        h3index,
        reduced.scaled_value * value as value
    FROM get_h3_data_over_georegion(reduced."geoRegionId", reduced."h3DataId")
) h3data
GROUP by h3data.h3index"""
psql("EXPLAIN ANALYZE " + DYNAMIC_QUERY_GROUPED_GEOREGION_H3DATA)

[('HashAggregate  (cost=69045.31..69047.31 rows=200 width=16) (actual time=8586.190..9357.036 rows=1034650 loops=1)',),
 ('  Group Key: get_h3_data_over_georegion.h3index',),
 ('  Batches: 69  Memory Usage: 4265kB  Disk Usage: 62248kB',),
 ('  ->  Nested Loop  (cost=4620.99..51487.81 rows=2341000 width=24) (actual time=32.597..7958.810 rows=1441804 loops=1)',),
 ('        ->  HashAggregate  (cost=4620.74..4644.15 rows=2341 width=40) (actual time=31.508..32.957 rows=1437 loops=1)',),
 ('              Group Key: sl."geoRegionId", ir."materialH3DataId"',),
 ('              Batches: 1  Memory Usage: 369kB',),
 ('              ->  Hash Join  (cost=850.01..4597.33 rows=2341 width=48) (actual time=5.324..30.170 rows=2320 loops=1)',),
 ('                    Hash Cond: (sr."sourcingLocationId" = sl.id)',),
 ('                    ->  Hash Join  (cost=692.40..4433.57 rows=2341 width=48) (actual time=4.084..28.040 rows=2320 loops=1)',),
 ('                          Hash Cond: (ir."sourcingRecordId

# 3.1 Create fully expanded materialized view of the h3data for each indicator record

In [9]:
### NOTE: limiting to one year and indicator since it crashed my DB

CREATE_FULL_MATERIALIZED_VIEW = f"""
DROP MATERIALIZED VIEW IF EXISTS full_h3_data_over_geo_region;

CREATE MATERIALIZED VIEW full_h3_data_over_geo_region AS
SELECT
    ir.id as "indicatorRecordId",
    h3data.h3index as h3index,
    h3data.value as h3value
FROM sourcing_location sl
INNER JOIN sourcing_records sr ON sr."sourcingLocationId" = sl.id 
INNER JOIN indicator_record ir ON ir."sourcingRecordId" = sr.id,
LATERAL (
    SELECT
        h3index,
        ir.value/ir.scaler * value as value
    FROM get_h3_data_over_georegion(sl."geoRegionId", ir."materialH3DataId")
) h3data
WHERE sr.year = {year}                          -- REMOVE FOR FULL VIEW
    AND ir."indicatorId" = '{indicator_id}'     -- REMOVE FOR FULL VIEW
    AND ir.scaler > 0;

COMMIT;
"""
psql(CREATE_FULL_MATERIALIZED_VIEW)

no results to fetch


In [10]:
psql("""
DROP INDEX IF EXISTS idx_full_h3_data_over_geo_region;
CREATE INDEX idx_full_h3_data_over_geo_region ON full_h3_data_over_geo_region ("indicatorRecordId", "h3index");
COMMIT;
""")

no results to fetch


In [11]:
# 2.2 Query fully expanded materialized view of the h3data for each indicator record
QUERY_FULL_MATERIALIZED_VIEW = f"""
    SELECT
        h3data.h3index,
        sum(h3data.h3value)
    FROM sourcing_location sl
    INNER JOIN sourcing_records sr ON sr."sourcingLocationId" = sl.id 
    INNER JOIN indicator_record ir ON ir."sourcingRecordId" = sr.id
    LEFT JOIN full_h3_data_over_geo_region h3data ON h3data."indicatorRecordId" = ir.id
    WHERE 
        sr.year = {year} AND
        ir."indicatorId" = '{indicator_id}'
        {filters}
    GROUP by h3data.h3index
"""
psql("EXPLAIN ANALYZE "+QUERY_FULL_MATERIALIZED_VIEW)

[('HashAggregate  (cost=141791.04..141793.04 rows=200 width=16) (actual time=7973.957..10489.259 rows=1034651 loops=1)',),
 ('  Group Key: h3data.h3index',),
 ('  Batches: 69  Memory Usage: 4265kB  Disk Usage: 232592kB',),
 ('  ->  Hash Join  (cost=4556.17..141034.14 rows=151381 width=16) (actual time=48.318..5058.983 rows=6592814 loops=1)',),
 ('        Hash Cond: (sr."sourcingLocationId" = sl.id)',),
 ('        ->  Hash Join  (cost=4438.70..140518.63 rows=151381 width=32) (actual time=31.275..3744.770 rows=6592814 loops=1)',),
 ('              Hash Cond: (ir."sourcingRecordId" = sr.id)',),
 ('              ->  Hash Right Join  (cost=3746.30..135454.76 rows=1665067 width=32) (actual time=26.326..2427.000 rows=6617974 loops=1)',),
 ('                    Hash Cond: (h3data."indicatorRecordId" = ir.id)',),
 ('                    ->  Seq Scan on full_h3_data_over_geo_region h3data  (cost=0.00..114402.17 rows=6592617 width=32) (actual time=0.035..780.131 rows=6592617 loops=1)',),
 ('      

# 4.1 Create materialized view of h3data for each unique geoRegion and h3DataId

In [12]:

CREATE_MATERIALIZED_VIEW_GEOREGION_H3DATA = f"""
DROP MATERIALIZED VIEW IF EXISTS georegion_h3data;

CREATE MATERIALIZED VIEW georegion_h3data AS
SELECT
    reduced."geoRegionId" as "geoRegionId", 
    reduced."h3DataId" as "h3DataId",
    h3data."h3index" as "h3index",
    h3data.value as value
FROM (
    SELECT DISTINCT
        sl."geoRegionId" as "geoRegionId", 
        ir."materialH3DataId" as "h3DataId"
    FROM sourcing_location sl
    LEFT JOIN sourcing_records sr ON sr."sourcingLocationId" = sl."id"
    LEFT JOIN indicator_record ir ON ir."sourcingRecordId" = sr."id"
) reduced,
LATERAL (
    SELECT
        h3index,
        value
    FROM get_h3_data_over_georegion(reduced."geoRegionId", reduced."h3DataId")
) h3data;

COMMIT;
"""
psql(CREATE_MATERIALIZED_VIEW_GEOREGION_H3DATA)

no results to fetch


In [13]:
# 4.2 query materialized view of h3data for each georegion and h3dataid
QUERY_MATERIALIZED_VIEW_JOIN_GEOREGION_H3DATA = f"""
SELECT
    h3data.h3index,
    sum(h3data.value * reduced.scaled_value)
FROM (
    SELECT
        sl."geoRegionId", 
        ir."materialH3DataId" as "h3DataId", 
        sum(ir.value/ir.scaler) as scaled_value
    FROM sourcing_location sl
    INNER JOIN sourcing_records sr ON sr."sourcingLocationId" = sl.id 
    INNER JOIN indicator_record ir ON ir."sourcingRecordId" = sr.id
    WHERE sr.year = {year}
        AND ir."indicatorId" = '{indicator_id}'
        AND ir.scaler > 0
        {filters}
    GROUP BY sl."geoRegionId", "h3DataId"
) reduced
LEFT JOIN georegion_h3data h3data 
    ON h3data."geoRegionId" = reduced."geoRegionId" 
    AND h3data."h3DataId" = reduced."h3DataId"
GROUP by h3data.h3index"""
psql("EXPLAIN ANALYZE "+QUERY_MATERIALIZED_VIEW_JOIN_GEOREGION_H3DATA)

[('HashAggregate  (cost=40892.58..40894.58 rows=200 width=16) (actual time=1451.376..2175.502 rows=1034651 loops=1)',),
 ('  Group Key: h3data.h3index',),
 ('  Batches: 69  Memory Usage: 4265kB  Disk Usage: 57552kB',),
 ('  ->  Hash Right Join  (cost=4702.68..40258.06 rows=84602 width=24) (actual time=34.093..763.479 rows=1441805 loops=1)',),
 ('        Hash Cond: ((h3data."geoRegionId" = reduced."geoRegionId") AND (h3data."h3DataId" = reduced."h3DataId"))',),
 ('        ->  Seq Scan on georegion_h3data h3data  (cost=0.00..27965.70 rows=1445570 width=48) (actual time=0.024..245.088 rows=1445519 loops=1)',),
 ('        ->  Hash  (cost=4667.56..4667.56 rows=2341 width=40) (actual time=34.062..34.068 rows=1437 loops=1)',),
 ('              Buckets: 4096  Batches: 1  Memory Usage: 134kB',),
 ('              ->  Subquery Scan on reduced  (cost=4620.74..4667.56 rows=2341 width=40) (actual time=33.257..33.789 rows=1437 loops=1)',),
 ('                    ->  HashAggregate  (cost=4620.74..4644

# 5.1 Same as 4 but with added index

In [14]:
CREATE_MATERIALIZED_VIEW_GEOREGION_H3DATA_WITH_INDEX = """
DROP MATERIALIZED VIEW IF EXISTS georegion_h3data_indexed;

CREATE MATERIALIZED VIEW georegion_h3data_indexed AS
SELECT
    reduced."geoRegionId" as "geoRegionId", 
    reduced."h3DataId" as "h3DataId",
    h3data."h3index" as "h3index",
    h3data.value as value
FROM (
    SELECT DISTINCT
        sl."geoRegionId" as "geoRegionId", 
        ir."materialH3DataId" as "h3DataId"
    FROM sourcing_location sl
    LEFT JOIN sourcing_records sr ON sr."sourcingLocationId" = sl."id"
    LEFT JOIN indicator_record ir ON ir."sourcingRecordId" = sr."id"
) reduced,
LATERAL (
    SELECT
        h3index,
        value
    FROM get_h3_data_over_georegion(reduced."geoRegionId", reduced."h3DataId")
) h3data;

DROP INDEX IF EXISTS idx_georegion_h3data_indexed;
CREATE INDEX idx_georegion_h3data_indexed ON georegion_h3data_indexed ("geoRegionId", "h3DataId", "h3index") INCLUDE ("value");

COMMIT;
"""
psql(CREATE_MATERIALIZED_VIEW_GEOREGION_H3DATA_WITH_INDEX)

no results to fetch


In [15]:
# 5.2 Same as 4 but with added index
QUERY_MATERIALIZED_VIEW_JOIN_GEOREGION_H3DATA_WITH_INDEX = f"""
SELECT
    h3data.h3index,
    sum(h3data.value * reduced.scaled_value)
FROM (
    SELECT
        sl."geoRegionId", 
        ir."materialH3DataId" as "h3DataId", 
        sum(ir.value/ir.scaler) as scaled_value
    FROM sourcing_location sl
    INNER JOIN sourcing_records sr ON sr."sourcingLocationId" = sl.id 
    INNER JOIN indicator_record ir ON ir."sourcingRecordId" = sr.id
    WHERE sr.year = {year}
        AND ir."indicatorId" = '{indicator_id}'
        AND ir.scaler > 0
        {filters}
    GROUP BY sl."geoRegionId", "h3DataId"
) reduced
LEFT JOIN georegion_h3data_indexed h3data 
    ON h3data."geoRegionId" = reduced."geoRegionId" 
    AND h3data."h3DataId" = reduced."h3DataId"
GROUP by h3data.h3index"""

psql("EXPLAIN ANALYZE "+QUERY_MATERIALIZED_VIEW_JOIN_GEOREGION_H3DATA_WITH_INDEX)

[('HashAggregate  (cost=40891.78..40893.78 rows=200 width=16) (actual time=1489.562..2197.623 rows=1034651 loops=1)',),
 ('  Group Key: h3data.h3index',),
 ('  Batches: 69  Memory Usage: 4265kB  Disk Usage: 57552kB',),
 ('  ->  Hash Right Join  (cost=4702.68..40257.28 rows=84599 width=24) (actual time=34.767..793.682 rows=1441805 loops=1)',),
 ('        Hash Cond: ((h3data."geoRegionId" = reduced."geoRegionId") AND (h3data."h3DataId" = reduced."h3DataId"))',),
 ('        ->  Seq Scan on georegion_h3data_indexed h3data  (cost=0.00..27965.19 rows=1445519 width=48) (actual time=0.029..263.701 rows=1445519 loops=1)',),
 ('        ->  Hash  (cost=4667.56..4667.56 rows=2341 width=40) (actual time=34.730..34.736 rows=1437 loops=1)',),
 ('              Buckets: 4096  Batches: 1  Memory Usage: 134kB',),
 ('              ->  Subquery Scan on reduced  (cost=4620.74..4667.56 rows=2341 width=40) (actual time=33.920..34.422 rows=1437 loops=1)',),
 ('                    ->  HashAggregate  (cost=4620.