In [1]:
import numpy as np
from typing import Tuple, List, Set

from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as f

In [2]:
!head -10000 "data/sds011-2020-06-28.csv" > "data/subset1.csv"
!head -10000 "data/sds011-2020-06-29.csv" > "data/subset2.csv"
!head -10000 "data/sds011-2020-06-30.csv" > "data/subset3.csv"
!cat "data/subset1.csv" "data/subset2.csv" "data/subset3.csv" > "data/subset.csv"

In [3]:
KM_LAT = 0.008983112
KM_LON = 0.011972

In [4]:
def deg_to_grid_coordinates(lat_lon: Tuple[float]) -> Tuple[int, int]:
    lat, lon = lat_lon
    x = int(lon // KM_LON)
    y = int(lat // KM_LAT)

    return x, y


def possible_neighbour(
    p1: Tuple[float, float], p2: Tuple[float, float]
) -> List[tuple[float, float]]:
    x1, y1 = p1
    x2, y2 = p2
    if (abs(x1 - x2) == 1 and y1 == y2) or (x1 == x2 and abs(y1 - y2) == 1):
        return True
    else:
        return False


def get_tuple(row: Row, field: str = None):
    if field is not None:
        return row.__getitem__(field).__getitem__("_1"), row.__getitem__(
            field
        ).__getitem__("_2")
    else:
        return row.__getitem__("_1"), row.__getitem__("_2")


def add_to_set(set: Set[Tuple[int, int]], item: Tuple[int, int]):
    set.add(item)
    return set

In [14]:
spark = SparkSession.builder.master("local[*]").appName("q4").getOrCreate()
spark_context = spark.sparkContext

try:
    data = (
        spark_context.textFile("data/subset.csv")
        .map(lambda line: line.strip())
        .filter(lambda line: len(line) > 1)
        .map(lambda line: line.split(";"))
        .map(lambda all_cols: (all_cols[0], (float(all_cols[2]), float(all_cols[3]))))
        .distinct()
        .mapValues(deg_to_grid_coordinates)
        .map(
            lambda id_pos: Row(
                id=id_pos[0],
                grid_components=id_pos[1],
            )
        )
    )
    grid_sparse = spark.createDataFrame(data)
    grid_sparse2 = spark.createDataFrame(data).withColumnRenamed(
        "grid_components", "potential_neighbours"
    )
    grid_sparse = (
        (
            grid_sparse.alias("a")
            .crossJoin(grid_sparse2.alias("b"))
            .drop(grid_sparse2.id)
            .where("grid_components != potential_neighbours")
            .groupBy(["id", "grid_components"])
            .agg(f.collect_set("potential_neighbours"))
        )
        .rdd.map(
            lambda row: (
                row.__getitem__("id"),
                (
                    get_tuple(row, "grid_components"),
                    set(
                        get_tuple(r)
                        for r in row.__getitem__("collect_set(potential_neighbours)")
                    ),
                ),
            )
        )
        .mapValues(
            lambda gc_pn: (
                gc_pn[0],
                set(pn for pn in gc_pn[1] if possible_neighbour(pn, gc_pn[0])),
            )
        )
        .map(lambda id_gc_n: (list(add_to_set(id_gc_n[1][1], id_gc_n[1][0])),))
    )

    print(grid_sparse.take(5))
    spark_context.stop()
finally:
    spark_context.stop()

                                                                                

[([(1074, 5321)],), ([(836, 5980), (836, 5981)],), ([(674, 5402)],), ([(727, 5458), (727, 5457)],), ([(687, 5914), (688, 5914)],)]
