<center><img src="images/logo2.png"/></center>

<center><h1>Sedona intro</h1></center>
<center><h3><a href = 'https://ilum.cloud'>ilum.cloud</a></h3></center>
<center>Welcome to the Ilum Interactive Capabilities Tutorial! In this section you will learn how to start processing spartial data with Apache Sedona on the Ilum. Let's dive in!</center>

**Apache Sedona** is a distributed system for processing large-scale spatial data. It extends existing cluster computing systems, such as Apache Spark and Apache Flink, with a set of out-of-the-box distributed Spatial Datasets and Spatial SQL that efficiently load, process, and analyze large-scale spatial data across machines.

Key features of Apache Sedona include:

- Support for a wide range of spatial data formats, including GeoJSON, SHP, GPX, and GeoParquet.
- Efficient spatial operations, such as distance calculations, spatial joins, and aggregations.
- Scalability to handle large-scale spatial datasets.
- Ease of use, with a simple and intuitive API.

### Reference
- [Ilum Documentation](https://ilum.cloud/docs)
- [Apache Sedona Documentation](https://sedona.apache.org/)


<div class="alert alert-block alert-danger">
  <b>Warning:</b> Before creating a session, ensure the following:
  <ul>
    <li>
      The Spark image is set to <b>ilum/spark:3.5.0-sedona-1.6.0</b> in your cluster settings.
    </li>
    <li>
        <a href="https://repo1.maven.org/maven2/org/datasyslab/geotools-wrapper/1.6.0-31.0/geotools-wrapper-1.6.0-31.0.jar">geotools-wrapper-1.6.0-31.0.jar</a> <a>are in the ilum-files/jars bucket</a>
    </li>
    <li>
        <a href="https://repo1.maven.org/maven2/org/apache/sedona/sedona-spark-shaded-3.5_2.12/1.6.0/sedona-spark-shaded-3.5_2.12-1.6.0.jar">sedona-spark-shaded-3.5_2.12-1.6.0.jar</a> <a>are in the ilum-files/jars bucket</a>
        </li>
      </ul>
    </li>
  </ul>
  <p>All the above steps are described in detail here: <a href="https://docs.google.com/document/d/1fV7k7K8UOP1LnIzNPlXM8l0Tot3KgA60uowBWYkLXxI/edit">Apache Sedona with Ilum (link do zmiany kiedy wydamy bloga)</a></p>
</div>

To start interacting with the remote cluster, we'll need to load the spark magic extension. You can do this by running the following command:

In [None]:
%load_ext sparkmagic.magics

Next, we'll need to set up an endpoint. An endpoint is simply a URL that points to a specific Spark cluster. You can choose scala or python, but we will focus on python here. You can do this by running the following:

In [None]:
%manage_spark

**Example properties**:\
{"conf": {"spark.sql.extensions": "org.apache.sedona.sql.SedonaSqlExtensions", "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.kryo.registrator": "org.apache.sedona.core.serde.SedonaKryoRegistrator", "spark.jars": "s3a://ilum-files/jars/geotools-wrapper-1.6.0-31.0.jar,s3a://ilum-files/jars/sedona-spark-shaded-3.5_2.12-1.6.0.jar"},"driverMemory": "1000M", "executorCores": 2}


##### **Fueling the Geospatial Analysis Engine: Uploading Data**
Before we unleash the awesome power of Sedona for geospatial analysis, we need some fuel: data! In this case, we'll need two specific datasets:

1. **ne_50m_admin_0_countries_lakes.zip**: This file contains data on countries and lakes, providing the geographical context for our analysis.
2. **ne_50m_airports.zip**: This file holds information on airports, perfect for exploring potential travel routes or analyzing airport distribution.

The data is readily available for download [here](https://github.com/ilum-cloud/ilum-python-examples/tree/main/sedona-example). Head over to the Data section. Once you're there, create a new folder named "geodata" within the ilum-files bucket. With that folder ready, it's upload time! Transfer the two data files into the newly created folder called "geodata" within the ilum-files bucket.

Now that we've fed our data pipeline, we're ready to leverage Sedona's geospatial magic!

Before we start processing, we need to import the necessary libraries.

In [None]:
%%spark 

    from sedona.spark import *
    import geopandas as gpd
    import s3fs


Another necessary step before starting to process spatial data is SedonaContext initialization using the SparkSession (available as spark).  Think of it as the secret handshake that lets Spark understand the language of spatial data.

In [None]:
%%spark

    SedonaContext.create(spark)

In this example, we use a different way of accessing the data due to its original structure. The following credentials are set by default when building Ilum. If the default credentials don't work, contact your Ilum platform administrator.

In [None]:
%%spark

    s3 = s3fs.S3FileSystem(
          key='minioadmin',
          secret='minioadmin',
          endpoint_url='http://ilum-minio:9000/'
       )

Below is the download of the files from the bucket, packing them into the geopandas dataframe format and creating a Temporary View.

Countries:

In [None]:
%%spark

    countries_gpd = gpd.read_file(s3.open('s3://ilum-files/geodata/ne_50m_admin_0_countries_lakes.zip'), engine='pyogrio')
    countries_df = spark.createDataFrame(countries_gpd)
    countries_df.createOrReplaceTempView("country")
    countries_df.show()

Airports:

In [None]:
%%spark

    airports_gpd = gpd.read_file(s3.open('s3://ilum-files/geodata/ne_50m_airports.zip'), engine='pyogrio')
    airports_df = spark.createDataFrame(airports_gpd)
    airports_df.createOrReplaceTempView("airport")
    airports_df.show()

#### Sedona enables data processing spatial through:

##### ***SQL API***

Once you've imported the data, you can create a spatial join using ***SQL API***. In this case, we want to match airports with countries based on their location. 

In [None]:
%%spark

    result = spark.sql("SELECT c.geometry as country_geom, c.NAME_EN, a.geometry as airport_geom, a.name FROM country c, airport a WHERE ST_Contains(c.geometry, a.geometry)")
    result.createOrReplaceTempView("result")
    result.show()

##### ***RDD API***


Below we convert the data into special Spark DataFrames called SpatialRDDs using the Sedona Adapter, enabling spatial operations based on their "geometry" columns. Leveraging these spatial optimizations, we perform a spatial join between the airports and countries SpatialRDDs to find matching locations.

In [None]:
%%spark

    airports_rdd = Adapter.toSpatialRdd(airports_df, "geometry")
    # Drop the duplicate name column in countries_df
    countries_df = countries_df.drop("NAME")
    countries_rdd = Adapter.toSpatialRdd(countries_df, "geometry")
    
    airports_rdd.analyze()
    countries_rdd.analyze()
    
    # 4 is the num partitions used in spatial partitioning. This is an optional parameter
    airports_rdd.spatialPartitioning(GridType.KDBTREE, 4)
    countries_rdd.spatialPartitioning(airports_rdd.getPartitioner())
    
    buildOnSpatialPartitionedRDD = True
    usingIndex = True
    considerBoundaryIntersection = True
    airports_rdd.buildIndex(IndexType.QUADTREE, buildOnSpatialPartitionedRDD)
    
    result_pair_rdd = JoinQueryRaw.SpatialJoinQueryFlat(airports_rdd, countries_rdd, usingIndex, considerBoundaryIntersection)
    
    result2 = Adapter.toDf(result_pair_rdd, countries_rdd.fieldNames, airports_rdd.fieldNames, spark)
    
    result2.createOrReplaceTempView("join_result_with_all_cols")
    # Select the columns needed in the join
    result2 = spark.sql("SELECT leftgeometry as country_geom, NAME_EN, rightgeometry as airport_geom, name FROM join_result_with_all_cols")

##### Print spatial join results

In [None]:
%%spark
    
    print("The result of SQL API")
    result.show()
    print("The result of RDD API")
    result2.show()

Next, we'll group airports based on the country they belong to.

In [None]:
%%spark 

    groupedresult = spark.sql("SELECT c.NAME_EN, c.country_geom, count(*) as airports_count FROM result c GROUP BY c.NAME_EN, c.country_geom")
    groupedresult.createOrReplaceTempView("groupedresult")
    groupedresult.show()

The last step of processing will be to upload the processed data to a selected database for visualization or further processing.

In [None]:
%%spark
    
    groupedresult.write.mode('overwrite').format("geoparquet").save("s3a://ilum-files/geodata/airports.shp")

#### Visualization

For the purposes of data visualization, let's present them in a convenient combination.

In [None]:
%%spark -o visualresult_df

    # Join the tables with proper column aliases
    visual_data = spark.sql("""
    SELECT a.NAME_EN AS country_name, 
           a.name AS airport_name, 
           a.airport_geom, 
           c.airports_count
    FROM result AS a
    JOIN groupedresult AS c ON a.NAME_EN = c.NAME_EN
    """)
    
    # Convert Spark DataFrame to GeoPandas DataFrame
    gdf = gpd.GeoDataFrame(visual_data.toPandas(), geometry='airport_geom')
    
    # Extract longitude and latitude coordinates from the geometry
    gdf['longitude'] = gdf['airport_geom'].x
    gdf['latitude'] = gdf['airport_geom'].y
    
    # Drop the 'airport_geom' column
    gdf = gdf.drop('airport_geom', axis=1)
    
    # Create a new Spark DataFrame from the GeoPandas DataFrame
    visualresult_df = spark.createDataFrame(gdf)
    visualresult_df.show()

<div class="alert alert-block alert-warning">
  <b>Warning:</b> Visualization processing is occurring within a different kernel!
  <ul>
    <li>
      Due to the limitations of the kernel we are using for computation, advanced visualization methods are not available. Therefore, all visualizations will be performed within the ipykernel environment.
    </li>
  </ul>
</div>


Preprocessed data will be exchanged between kernels in the form of DataFrames.

To get started with pygwalker in the ipykernel kernel, you'll need to install the following Python packages:

In [None]:
pip install pandas pygwalker

Once the enviroment is prepared, you can simply call a visualization tool, passing your data frame as an argument. For detailed guidance on setting parameters for optimal visualization, please refer to our [blog post](https://sedona.apache.org/) (link do zmiany).

In [None]:
    import pygwalker as pyg

    walker = pyg.walk(visualresult_df, kernel_computation=True)

### Cleaning up

Now that you’re done with your work, you should clean them up.
Simply click on the Delete buttons!

![Ilum session clean](../../images/clean_ilum_jupyter_session.png)

In [None]:
%manage_spark