In [11]:
from sedona.spark import *
from pyspark.sql.functions import concat_ws, col, expr, udf
from pyspark.sql.types import StringType

In [2]:
config = SedonaContext.builder().appName('fcc')\
    .config("spark.hadoop.fs.s3a.bucket.wherobots-geodata.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")\
    .getOrCreate()
sedona = SedonaContext.create(config)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [3]:
S3_URL_TV = "s3://wherobots-geodata/TV_service_contour_current.txt"

In [4]:
tv_df = sedona.read.format('csv').option('header', 'true').option('delimiter', '|').load(S3_URL_TV)

                                                                                

In [5]:
tv_df.createOrReplaceTempView("tv")

25/05/04 11:00:49 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [6]:
all_columns = tv_df.columns

In [7]:
selected_columns = all_columns[5:10]

In [8]:
selected_columns

['0', '1', '2', '3', '4']

In [9]:
tv_df1 = tv_df.select(*selected_columns)

In [10]:
tv_df1.show()

[Stage 4:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                   0|                   1|                   2|                   3|                   4|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|59.99147 ,-154.84912|59.99194 ,-154.84785|59.99207 ,-154.84720|59.99234 ,-154.84652|59.99245 ,-154.84585|
|35.94124 ,-87.13583 |35.94138 ,-87.09534 |35.94065 ,-87.07511 |35.93916 ,-87.05493 |35.93721 ,-87.03483 |
|33.40001 ,-114.82803|33.39709 ,-114.81354|33.39595 ,-114.80635|33.39563 ,-114.79913|33.39519 ,-114.79192|
|33.39612 ,-114.82803|33.39320 ,-114.81370|33.39208 ,-114.80660|33.39176 ,-114.79946|33.39131 ,-114.79233|
|37.36947 ,-120.20000|37.36937 ,-120.19332|37.36926 ,-120.18998|37.36910 ,-120.18664|37.36889 ,-120.18331|
|33.88583 ,-116.22777|33.89081 ,-116.22693|33.89404 ,-116.22631|33.89677 ,-116.22559|33.89909 ,-116.22479|
|33.75838 ,-116.23333|33.75713 ,-116.

                                                                                

In [13]:
original_columns = tv_df1.columns

df_closed = tv_df1.withColumn(str(len(original_columns)), col("0"))

df_closed.show(truncate=False)

[Stage 5:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|0                   |1                   |2                   |3                   |4                   |5                   |
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|59.99147 ,-154.84912|59.99194 ,-154.84785|59.99207 ,-154.84720|59.99234 ,-154.84652|59.99245 ,-154.84585|59.99147 ,-154.84912|
|35.94124 ,-87.13583 |35.94138 ,-87.09534 |35.94065 ,-87.07511 |35.93916 ,-87.05493 |35.93721 ,-87.03483 |35.94124 ,-87.13583 |
|33.40001 ,-114.82803|33.39709 ,-114.81354|33.39595 ,-114.80635|33.39563 ,-114.79913|33.39519 ,-114.79192|33.40001 ,-114.82803|
|33.39612 ,-114.82803|33.39320 ,-114.81370|33.39208 ,-114.80660|33.39176 ,-114.79946|33.39131 ,-114.79233|33.39612 ,-114.82803|
|37.36947 ,-120.20000|37.36937 ,-120.19332|37.36926 ,-120.18998|37.36910 ,-120.18664|37.36889 ,-120.1833

                                                                                

In [15]:
def row_to_wkt_dynamic(row):
    coords = []
    for point in row:
        lat, lon = map(float, point.split(','))
        coords.append(f"{lon} {lat}")
    return f"POLYGON (({', '.join(coords)}))"


to_wkt_udf = udf(row_to_wkt_dynamic, StringType())

columns = df_closed.columns

wkt_df = df_closed.withColumn("wkt_geometry", to_wkt_udf(array(*[col(c) for c in columns]))) \
           .select("wkt_geometry")

wkt_df.show(truncate=False)

[Stage 6:>                                                          (0 + 1) / 1]

+----------------------------------------------------------------------------------------------------------------------------------------+
|wkt_geometry                                                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------------------+
|POLYGON ((-154.84912 59.99147, -154.84785 59.99194, -154.8472 59.99207, -154.84652 59.99234, -154.84585 59.99245, -154.84912 59.99147)) |
|POLYGON ((-87.13583 35.94124, -87.09534 35.94138, -87.07511 35.94065, -87.05493 35.93916, -87.03483 35.93721, -87.13583 35.94124))      |
|POLYGON ((-114.82803 33.40001, -114.81354 33.39709, -114.80635 33.39595, -114.79913 33.39563, -114.79192 33.39519, -114.82803 33.40001))|
|POLYGON ((-114.82803 33.39612, -114.8137 33.3932, -114.8066 33.39208, -114.79946 33.39176, -114.79233 33.39131, -114.82803 33.39612))   |
|POLYGON ((-120.2 37.36947,

                                                                                

In [16]:
def row_to_wkt_dynamic(row):
    coords = []
    for point in row:
        lat, lon = map(float, point.split(','))
        coords.append(f"{lon} {lat}")  
    return f"POLYGON (({', '.join(coords)}))"

to_wkt_udf = udf(row_to_wkt_dynamic, StringType())

columns = df_closed.columns
df_with_wkt = df_closed.withColumn("wkt_geometry", to_wkt_udf(array(*[col(c) for c in columns])))

df_geom = df_with_wkt.withColumn("geometry", ST_GeomFromWKT("wkt_geometry"))

final_df = df_geom.select("geometry")

final_df.show(truncate=False)

[Stage 7:>                                                          (0 + 1) / 1]

+----------------------------------------------------------------------------------------------------------------------------------------+
|geometry                                                                                                                                |
+----------------------------------------------------------------------------------------------------------------------------------------+
|POLYGON ((-154.84912 59.99147, -154.84785 59.99194, -154.8472 59.99207, -154.84652 59.99234, -154.84585 59.99245, -154.84912 59.99147)) |
|POLYGON ((-87.13583 35.94124, -87.09534 35.94138, -87.07511 35.94065, -87.05493 35.93916, -87.03483 35.93721, -87.13583 35.94124))      |
|POLYGON ((-114.82803 33.40001, -114.81354 33.39709, -114.80635 33.39595, -114.79913 33.39563, -114.79192 33.39519, -114.82803 33.40001))|
|POLYGON ((-114.82803 33.39612, -114.8137 33.3932, -114.8066 33.39208, -114.79946 33.39176, -114.79233 33.39131, -114.82803 33.39612))   |
|POLYGON ((-120.2 37.36947,

                                                                                

In [17]:
SedonaKepler.create_map(final_df, "polygons")

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


                                                                                

KeplerGl(data={'polygons': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20…

In [18]:
tv_df.createOrReplaceTempView("tv")

In [19]:
tv_df.printSchema()

root
 |-- application_id: string (nullable = true)
 |-- service: string (nullable = true)
 |-- lms_application_id: string (nullable = true)
 |-- dts_site_number: string (nullable = true)
 |-- transmitter_site: string (nullable = true)
 |-- 0: string (nullable = true)
 |-- 1: string (nullable = true)
 |-- 2: string (nullable = true)
 |-- 3: string (nullable = true)
 |-- 4: string (nullable = true)
 |-- 5: string (nullable = true)
 |-- 6: string (nullable = true)
 |-- 7: string (nullable = true)
 |-- 8: string (nullable = true)
 |-- 9: string (nullable = true)
 |-- 10: string (nullable = true)
 |-- 11: string (nullable = true)
 |-- 12: string (nullable = true)
 |-- 13: string (nullable = true)
 |-- 14: string (nullable = true)
 |-- 15: string (nullable = true)
 |-- 16: string (nullable = true)
 |-- 17: string (nullable = true)
 |-- 18: string (nullable = true)
 |-- 19: string (nullable = true)
 |-- 20: string (nullable = true)
 |-- 21: string (nullable = true)
 |-- 22: string (nullable =

In [20]:
desc_cols = tv_df.columns[:5]
coord_cols = tv_df.columns[5: 366]

In [21]:
def row_to_wkt_dynamic(row):
    coords = []
    for point in row:
        lat, lon = map(float, point.split(','))
        coords.append(f"{lon} {lat}")  # lon, lat — bo WKT tego wymaga
    return f"POLYGON (({', '.join(coords)}))"

to_wkt_udf = udf(row_to_wkt_dynamic, StringType())

desc_cols = tv_df.columns[:5]
coord_cols = tv_df.columns[5:365]

df_with_wkt = tv_df.withColumn("wkt_geometry", to_wkt_udf(array(*[col(c) for c in coord_cols])))
df_geom = df_with_wkt.withColumn("geometry", ST_GeomFromWKT("wkt_geometry"))


final_df = df_geom.select(*desc_cols, "geometry")


In [22]:
final_df.show()

[Stage 9:>                                                          (0 + 1) / 1]

+--------------+-------+--------------------+---------------+--------------------+--------------------+
|application_id|service|  lms_application_id|dts_site_number|    transmitter_site|            geometry|
+--------------+-------+--------------------+---------------+--------------------+--------------------+
|    2036528   |    LPD|25076ff3729b1e0a0...|             01|59.97375 ,-154.84911|POLYGON ((-154.84...|
|    2020763   |    DTV|25076f9169e0309d0...|             01|35.00250 ,-87.13583 |POLYGON ((-87.135...|
|    2051473   |    LPD|25076ff386f28a460...|             01|33.05058 ,-114.82803|POLYGON ((-114.82...|
|    2047477   |    LPD|25076ff380754a570...|             01|33.05058 ,-114.82803|POLYGON ((-114.82...|
|    2047332   |    LPD|25076ff38197b72e0...|             01|37.21722 ,-120.20000|POLYGON ((-120.2 ...|
|    2045786   |    LPD|25076ff3818b1da20...|             01|33.87083 ,-116.22778|POLYGON ((-116.22...|
|    2042394   |    LPD|25076ff379edbd630...|             01|33.

                                                                                

In [24]:
final_df.count()

                                                                                

8528

In [23]:
SedonaKepler.create_map(final_df, "polygons2")

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


                                                                                

KeplerGl(data={'polygons2': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 2…