# Integración con **PySpark**
Otra alternativa para poder integrar todo este proyecto en conjunto y dentro todo mismo usando python puede ser con [PySpark](https://spark.apache.org/docs/latest/api/python/index.html). A continuación se detalla lo necesario para poder trabajar con esta librería y se analiza esta opción.

## Uso
Para iniciar a usar pyspark, es necesario primero crear una sesión de la siguiente forma (es necesario tener **Java** instalado:

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Fast testing
from pyspark.sql.functions import col

geojson_df = spark.read.json("../assets/basic.json", multiLine=True)
coordinates_df = geojson_df.select(col("coordinates").alias("coords"))

geojson_df.show()
coordinates_df.show()

+--------------------+---------------+
|         coordinates|           type|
+--------------------+---------------+
|[[[1.0, 2.0]], [[...|MultiLineString|
+--------------------+---------------+

+--------------------+
|              coords|
+--------------------+
|[[[1.0, 2.0]], [[...|
+--------------------+



In [7]:
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, ArrayType, StructType, StructField, StringType
from math import sqrt

# Define UDF to calculate Euclidean distance
def euclidean_distance(coords):
    point1 = coords[0][0]
    point2 = coords[1][0]
    return float(sqrt((point2[0] - point1[0])**2 + (point2[1] - point1[1])**2))

distance_udf = F.udf(euclidean_distance, FloatType())

geojson_df = geojson_df.withColumn('distance', distance_udf(F.col('coordinates')))
geojson_df.show(truncate=False)

+----------------------------+---------------+--------+
|coordinates                 |type           |distance|
+----------------------------+---------------+--------+
|[[[1.0, 2.0]], [[3.0, 4.0]]]|MultiLineString|2.828427|
+----------------------------+---------------+--------+



                                                                                

Algunos *warnings* fueron arrojados, pero no parecen ser vitales.

Pyspark trabaja de una manera más eficiente con `DataFrames` (usados también en Pandas) y estos pueden ser obtenidos a través de archivos que contengan fuentes de datos. En este caso, utilizarémos un ejemplo básico de un geojson (json).

In [2]:
# Leyendo json en Python
import json

with open('../assets/basic.json', 'r') as file:
    example_geojson = json.load(file)

example_geojson

{'type': 'MultiLineString', 'coordinates': [[[1.0, 2.0]], [[3.0, 4.0]]]}

In [5]:
from pyspark.sql.functions import col

geojson_df = spark.read.json("../assets/basic.json", multiLine=True)
coordinates_df = geojson_df.select(col("coordinates").alias("coords"))
print(coordinates_df.rdd.flatMap(lambda x: x.coords).collect())

[[[1.0, 2.0]], [[3.0, 4.0]]]


In [17]:
# Leyendo JSON con PySpark
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType, FloatType
from pyspark.sql.functions import explode, col, lit

# Define the schema
schema = StructType([
    StructField("id", MapType(StringType(), StringType()), True),
    StructField("type", StringType(), True),
    StructField("features", ArrayType(StructType([
        StructField("type", StringType(), True),
        StructField("properties", StructType([
            StructField("name", StringType(), True),
            StructField("tiempo", StringType(), True),
            StructField("geometry", StructType([
                StructField("type", StringType(), True),
                StructField("coordinates", ArrayType(ArrayType(FloatType())), True)
            ]))
        ]))
    ])), True)
])

# Read the GeoJSON file with the defined schema
example_geojson = spark.read.schema(schema).json("../assets/pyspark_example.geojson")
# Handle corrupt records
example_geojson = example_geojson.withColumn("_corrupt_record", lit(None))

# Check for corrupt records
corrupt_records = example_geojson.filter(example_geojson._corrupt_record.isNotNull())
if corrupt_records.count() > 0:
    corrupt_records.show(truncate=False)
else:
    # Explode the 'features' array to get each feature as a separate row
    features = example_geojson.select(explode("features").alias("feature"))
    features.show()
    
    # Select the coordinates from each feature
    coordinates = features.select(col("feature.properties.geometry.coordinates").alias("coordinates"))

    # Show the coordinates
    coordinates.show(truncate=False)

example_geojson.show(truncate=False)
example_geojson.printSchema()

+-------+
|feature|
+-------+
+-------+

+-----------+
|coordinates|
+-----------+
+-----------+

+----+----+--------+---------------+
|id  |type|features|_corrupt_record|
+----+----+--------+---------------+
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
|NULL|NULL|NULL    |NULL           |
+----+----+--------+---------------+
only showing t

In [26]:
# from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType
from pyspark.sql.functions import explode, col

# Define the schema
schema = StructType([
    StructField("id", StructType([
        StructField("$oid", StringType(), True)
    ]), True),
    StructField("type", StringType(), True),
    StructField("features", ArrayType(StructType([
        StructField("type", StringType(), True),
        StructField("properties", StructType([
            StructField("name", StringType(), True),
            StructField("tiempo", StringType(), True),
            StructField("geometry", StructType([
                StructField("type", StringType(), True),
                StructField("coordinates", ArrayType(ArrayType(FloatType())), True)
            ]))
        ]))
    ])), True)
])

# Read the GeoJSON file with the defined schema
example_geojson = spark.read.schema(schema) \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .json("../assets/pyspark_example.geojson")

example_geojson.show()

+----+----+--------+
|  id|type|features|
+----+----+--------+
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
|NULL|NULL|    NULL|
+----+----+--------+
only showing top 20 rows



In [4]:
from pyspark.sql.functions import explode, col

features = example_geojson.select(explode("features").alias("feature"))

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `features` cannot be resolved. Did you mean one of the following? [`_corrupt_record`].;
'Project [explode('features) AS feature#10]
+- Relation [_corrupt_record#8] json


Ahora con esto, se pueden acceder a las coordenadas de la siguiente forma:

In [18]:
from pyspark.sql.functions import col, explode

# Explode the 'features' array to access individual features
exploded_df = example_geojson.explode("features")

# Extract coordinates as an array of arrays
coordinates_df = exploded_df.select(col("features.geometry.coordinates").alias("coordinates"))

# Show the DataFrame with coordinates
coordinates_df.show()

AttributeError: 'DataFrame' object has no attribute 'explode'

Algunos *warnings* fueron arrojados, pero no parecen ser vitales.

Una vez teniendo la sesión inicializada, se crea la función usando **UDF**: **U**ser-**D**efined **F**unction.

In [5]:
# Define a User-Defined Function (UDF) for Euclidean distance
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType

def euclidean_distance(point1, point2):
  # Assuming point1 and point2 are lists with the same length
  if len(point1) != len(point2):
    raise ValueError("Points must have the same number of dimensions")
  
  # Calculate the squared difference of each dimension
  squared_differences = [(x - y) ** 2 for x, y in zip(point1, point2)]
  # Sum the squared differences
  distance = sum(squared_differences)
  # Take the square root of the sum (Euclidean distance)
  return distance ** 0.5  # More efficient than sqrt()

# Register the UDF with a specific return type (DoubleType)
euclidean_distance_udf = udf(euclidean_distance, DoubleType())

Y ahora esto se puede calcular de la siguiente forma.

In [7]:
# Sample points as Python lists
point1 = [1, 2, 3]
point2 = [4, 5, 6]

# Import the UDF (assuming it's defined in the same Python file)
# from my_functions import euclidean_distance_udf  # Replace with your file path

# Call the UDF with the sample points
distance = euclidean_distance_udf(point1, point2)

# Print the calculated distance
print("Euclidean Distance:", distance)

PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got list.