# Integração

* Verificar `parse.ipynb` antes para instruções

* Une datasets pelo critério de distância mínima:
    1. Para cada ponto de tempetura, obtém a sua data e encontra todos os pontos de $CO_2$ nessa mesma data;
    2. Calcula a distância entre o ponto de temperatura e todos os outros pontos de $CO_2$ anteriormente selecionados;
    3. Escolhe aquele de mínima distância como o ponto equivalente entre os *datasets*.

<br />

* Cálculo da distância:
    * ✔️ **Abordagem 1:** plana — $d(P_1, P_2) = \sqrt{(x_1-x_2)^2 + (y_1-y_2)^2}$
    * ❓ **Abordagem 2:** esférica — Faz sentido? É necessária? Como fazer?

<br />

* Sobre a implementação:
    * Não encontrei uma forma direta e elegante para fazer a integração apenas através da API do PySpark, então optei por apelar ao SQL 🙏;
    * A título de exploração, foi feita uma integração utilizando `CROSS JOIN` e aplicando à risca o método;
        * Tal alternativa é inviável, haja vista que geraria registros numa ordem de grandeza de $10^{6 + 8}$.
    * A título de usababilidade, foi feita a integração utilizando `LEFT JOIN` e `ROUND`, que possui o exato mesmo *result set* da alternativa precedente.

# Bibliotecas e Configurações

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

spark = SparkSession.builder.config(
        "spark.jars.packages",
        "io.xskipper:xskipper-core_2.12:1.3.0"
).getOrCreate()

In [None]:
from xskipper import Xskipper

In [None]:
metadata_path = "./tmp/metadata"

config = dict([
    ("io.xskipper.parquet.mdlocation", metadata_path),
    ("io.xskipper.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION")
])

Xskipper.setConf(spark, config)

# Leitura

In [None]:
tpr_parsed_path = './data/tpr_data_parsed.csv'
co2_parsed_path = './data/co2_data_parsed.csv'

tpr_data = spark.read.options(header='True').format("csv").load(tpr_parsed_path)
co2_data = spark.read.options(header='True').format("csv").load(co2_parsed_path)

In [None]:
tpr_xskipper = Xskipper(spark, tpr_parsed_path)

if tpr_xskipper.isIndexed(): tpr_xskipper.dropIndex()

tpr_xskipper.indexBuilder()                   \
            .addMinMaxIndex("t_date")         \
            .addValueListIndex("t_longitude") \
            .addValueListIndex("t_latitude")  \
            .build(tpr_data)


co2_xskipper = Xskipper(spark, co2_parsed_path)

if co2_xskipper.isIndexed(): co2_xskipper.dropIndex()

co2_xskipper.indexBuilder()                   \
            .addValueListIndex("c_date")         \
            .addValueListIndex("c_longitude") \
            .addValueListIndex("c_latitude")  \
            .build(co2_data)

# Legacy

In [None]:
if Xskipper.isEnabled(spark): Xskipper.disable(spark)

In [None]:
tpr_data.limit(int(1e3)).createOrReplaceTempView("tpr_data")
co2_data.limit(int(1e5)).createOrReplaceTempView("co2_data")

In [None]:
# Raw query de integralização

cross_data = spark.sql(
    """
    SELECT *
    FROM (
        -- Ordenação crescente das distâncias,
        -- orientado a data, latitude e longitude
        SELECT
            *,
            ROW_NUMBER() OVER (
                PARTITION BY
                    t_date,
                    t_latitude,
                    t_longitude
                ORDER BY distance ASC
            ) AS row_num
        FROM (
            -- CROSS JOIN para cálculo de distâncias entre
            -- todos os pontos dos datasets em uma mesma data
            SELECT
                *,
                SQRT(POW(t_latitude - c_latitude, 2) + POW(t_longitude - c_longitude, 2)) AS distance
            FROM tpr_data AS T
            CROSS JOIN co2_data AS C
            ON t_date = c_date
        )
    )
    WHERE row_num = 1; -- Escolha dos registros com menor distância
    """
)

In [None]:
cross_data.show(10)

# Merge

In [None]:
if not Xskipper.isEnabled(spark): Xskipper.enable(spark)

tpr_data.limit(int(1e3)).createOrReplaceTempView("tpr_data")
co2_data.limit(int(1e5)).createOrReplaceTempView("co2_data")

# Raw query de integralização

cross_data = spark.sql(
    """
    SELECT *
    FROM tpr_data
    LEFT JOIN co2_data
    ON
        t_date = c_date AND
        t_latitude_rnd = c_latitude AND
        t_longitude_rnd = c_longitude;
    """
)

cross_data.show(10)

***

Integralização de fato

***

In [None]:
if not Xskipper.isEnabled(spark): Xskipper.enable(spark)

tpr_data.limit(1).createOrReplaceTempView("tpr_data")
co2_data.limit(int(1e5)).createOrReplaceTempView("co2_data")

# Raw query de integralização

cross_data = spark.sql(
    """
    SELECT *
    FROM tpr_data
    LEFT JOIN co2_data
    ON
        t_date = c_date AND
        t_latitude_rnd = c_latitude AND
        t_longitude_rnd = c_longitude;
    """
)

cross_data.show(10)