In [1]:
import configparser

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import (expr, broadcast, substring as s_substring, trim as s_trim)
from pyspark.sql.types import (StructType, StructField, IntegerType, StringType)

from sedona.register import SedonaRegistrator
from sedona.utils import KryoSerializer, SedonaKryoRegistrator
from sedona.utils.adapter import Adapter
from sedona.core.formatMapper.shapefileParser import ShapefileReader 

# import geopandas as gpd

In [2]:
sparkConf = SparkConf()
parser = configparser.ConfigParser()
parser.optionxform=str
parser.read_file(open('../sparkconf.cfg'))

for section, config in parser.items():
    for key, value in config.items():
        sparkConf.set(key, value)

sparkConf.set("spark.serializer", KryoSerializer.getName)
sparkConf.set("spark.kryo.registrator", SedonaKryoRegistrator.getName)

sparkConf.set("spark.archives", "https://minio.minio-tenant/dutrajardim-etls/dependencies.zip?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=admin%2F20220228%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20220228T111424Z&X-Amz-Expires=604800&X-Amz-SignedHeaders=host&X-Amz-Signature=a20e1526dd993354ae90c90c7cb76da4726a5ae00c9920ceec8e2a35f3482c3c#deps")
sparkConf.set("spark.executorEnv.PYTHONPATH", "/opt/spark/work-dir/deps")
sparkConf.set("spark.executorEnv.LD_LIBRARY_PATH", "/opt/spark/work-dir/deps/Shapely.libs")

spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

In [3]:
SedonaRegistrator.registerAll(spark)
spark

                                                                                

In [14]:
rdd_adm0 = ShapefileReader.readToGeometryRDD(spark.sparkContext, "s3a://dutrajardim-fi/src/shapes/gadm40/adm_0/*/")
rdd_adm0 = rdd_adm0.rawSpatialRDD.map(lambda x: (x.geom.wkt, *x.userData.split("\t")))

schema = StructType(
    [
        StructField("geometry", StringType(), False),
        StructField("name", StringType(), False),
        StructField("id", StringType(), False),
    ]
)

sdf_adm0 = rdd_adm0.toDF(schema=schema)
sdf_adm0 = sdf_adm0.selectExpr(
    "id",
    "geometry",
    "DECODE(ENCODE(name, 'ISO-8859-1'), 'UTF-8') as name"
)

# set dynamic mode to preserve previous month of times saved
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "static")

sdf_adm0.write \
    .format("parquet") \
    .mode("overwrite") \
    .save("s3a://dutrajardim-fi/tables/shapes/adm0.parquet")

                                                                                

In [15]:
rdd_adm1 = ShapefileReader.readToGeometryRDD(spark.sparkContext, "s3a://dutrajardim-fi/src/shapes/gadm40/adm_1/*/")
rdd_adm1 = rdd_adm1.rawSpatialRDD.map(lambda x: (x.geom.wkt, *x.userData.split("\t")))

schema = StructType([
    StructField("geometry", StringType(), False),
    StructField("adm0", StringType(), False),
    StructField("adm0_name", StringType(), False),
    StructField("id", StringType(), False),
    StructField("name", StringType(), False),
    StructField("varname", StringType(), False),
    StructField("nl_name", StringType(), False),
    StructField("type", StringType(), False),
    StructField("eng_type", StringType(), False),
    StructField("cc", StringType(), False),
    StructField("hasc", StringType(), False),
    StructField("iso", StringType(), False),
])

sdf_adm1 = rdd_adm1.toDF(schema=schema)
sdf_adm1 = sdf_adm1.selectExpr(
    "id",
    "geometry",
    "DECODE(ENCODE(name, 'ISO-8859-1'), 'UTF-8') as name",
    "adm0"
)

# set dynamic mode to preserve previous month of times saved
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "static")

sdf_adm1.write \
    .partitionBy("adm0") \
    .format("parquet") \
    .mode("overwrite") \
    .save("s3a://dutrajardim-fi/tables/shapes/adm1.parquet")

                                                                                

In [16]:
rdd_adm2 = ShapefileReader.readToGeometryRDD(spark.sparkContext, "s3a://dutrajardim-fi/src/shapes/gadm40/adm_2/*/")
rdd_adm2 = rdd_adm2.rawSpatialRDD.map(lambda x: (x.geom.wkt, *x.userData.split("\t")))

schema = StructType([
    StructField("geometry", StringType(), False),
    StructField("adm0", StringType(), False),
    StructField("adm0_name", StringType(), False),
    StructField("adm1_name", StringType(), False),
    StructField("adm1_nl_name", StringType(), False),
    StructField("id", StringType(), False),
    StructField("name", StringType(), False),
    StructField("varname", StringType(), False),
    StructField("nl_name", StringType(), False),
    StructField("type", StringType(), False),
    StructField("eng_type", StringType(), False),
    StructField("cc", StringType(), False),
    StructField("hasc", StringType(), False)
])

sdf_adm2 = rdd_adm2.toDF(schema=schema)
sdf_adm2 = sdf_adm2.selectExpr(
    "id",
    "geometry",
    "DECODE(ENCODE(name, 'ISO-8859-1'), 'UTF-8') as name",
    "adm0",
    "CONCAT(CONCAT_WS('.', SLICE(SPLIT(id, '\\\\.'), 1, 2)), '_1') as adm1"
)

# set dynamic mode to preserve previous month of times saved
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "static")

sdf_adm2.write \
    .partitionBy(["adm0", "adm1"]) \
    .format("parquet") \
    .mode("overwrite") \
    .save("s3a://dutrajardim-fi/tables/shapes/adm2.parquet")

                                                                                

In [19]:
rdd_adm3 = ShapefileReader.readToGeometryRDD(spark.sparkContext, "s3a://dutrajardim-fi/src/shapes/gadm40/adm_3/*/")
rdd_adm3 = rdd_adm3.rawSpatialRDD.map(lambda x: (x.geom.wkt, *x.userData.split("\t")))

schema = StructType([
    StructField("geometry", StringType(), False),
    StructField("adm0", StringType(), False),
    StructField("adm0_name", StringType(), False),
    StructField("adm1_name", StringType(), False),
    StructField("adm1_nl_name", StringType(), False),
    StructField("adm2_name", StringType(), False),
    StructField("adm2_nl_name", StringType(), False),
    StructField("id", StringType(), False),
    StructField("name", StringType(), False),
    StructField("varname", StringType(), False),
    StructField("nl_name", StringType(), False),
    StructField("type", StringType(), False),
    StructField("eng_type", StringType(), False),
    StructField("cc", StringType(), False),
    StructField("hasc", StringType(), False)
])

sdf_adm3 = rdd_adm3.toDF(schema=schema)
sdf_adm3 = sdf_adm3.selectExpr(
    "id",
    "geometry",
    "DECODE(ENCODE(name, 'ISO-8859-1'), 'UTF-8') as name",
    "adm0",
    "CONCAT(CONCAT_WS('.', SLICE(SPLIT(id, '\\\\.'), 1, 2)), '_1') as adm1"
)

# set dynamic mode to preserve previous month of times saved
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "static")

sdf_adm3.write \
    .partitionBy(["adm0", "adm1"]) \
    .format("parquet") \
    .mode("overwrite") \
    .save("s3a://dutrajardim-fi/tables/shapes/adm3.parquet")

                                                                                

In [20]:
spark.stop()

22/02/28 18:42:22 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
