-
Notifications
You must be signed in to change notification settings - Fork 745
Description
Expected behavior
value| current_timestamp | network_operator_name | dl_load_date | isDishNrCell | isSimDish | nrNci | AOI_ID | Cluster_ID
(nestedjsonvalue) | 8/22/2023 5:44:11 PM | Digicel | 8/22/2023 | FALSE | TRUE | 3569856325 | ALB | ALB-01-Downtown
Actual behavior
value| current_timestamp | network_operator_name | dl_load_date | isDishNrCell | isSimDish | nrNci | AOI_ID | Cluster_ID
(nestedjsonvalue) | 8/22/2023 5:44:11 PM | Digicel | 8/22/2023 | FALSE | TRUE | 3569856325 | |
Steps to reproduce the problem
actual dataframe is a streaming dataset running on spark cluster.
- Create spark session
- get shape file from location(s3 here)
code:
def getAoiShapeDf: DataFrame = {
val aoiShapefileLocation = "s3://bucket/opensource_loc/top_shp/aoi_oto/"
val aoiShapeRdd = ShapefileReader.readToGeometryRDD(session.sparkContext, aoiShapefileLocation)
aoiShapeRdd.CRSTransform("epsg:4326", "epsg:5070", false)
val aoiShapeDf = Adapter.toDf(aoiShapeRdd, session)
aoiShapeDf
}
def getClusterShapeDf: DataFrame = {
val clusterShapefileLocation = "s3://bucket/opensource_loc/top_shp/cluster_oto/"
val clusterShapeRdd = ShapefileReader.readToGeometryRDD(session.sparkContext, clusterShapefileLocation)
clusterShapeRdd.CRSTransform("epsg:4326", "epsg:5070", false)
val clusterShapeDf = Adapter.toDf(clusterShapeRdd, session)
clusterShapeDf
}
- join shape file dataframe with actual dataframe on ST_Contains join condition.
code:
def enrichWithAoi(dataframe:DataFrame,clientLatColumn: String, clientLongColumn: String): DataFrame = {
val networkAoiShape = broadcast(this.getAoiShapeDf.select("geometry","AOI_ID"))
val ueDataWithGeom = dataframe.withColumn("aoiGeoPoint",
expr(s"ST_TRANSFORM(ST_POINT(CAST($clientLatColumn AS DOUBLE), CAST($clientLongColumn AS DOUBLE)), 'EPSG:4326', 'EPSG:5070')"))
val aoiShapeJoin = ueDataWithGeom.alias("roamingAoiData").join(networkAoiShape.alias("shapeData"),
expr("ST_Contains(shapeData.geometry,roamingAoiData.aoiGeoPoint)"),"LeftOuter")
aoiShapeJoin.drop("geometry","aoiGeoPoint")
}
def enrichWithCluster(dataframe: DataFrame, clientLatColumn: String, clientLongColumn: String): DataFrame = {
val networkClusterShape = broadcast(this.getClusterShapeDf.select("geometry", "Cluster_ID"))
val ueClusterDataWithGeom = dataframe.withColumn("clusterGeoPoint",
expr(s"ST_TRANSFORM(ST_POINT(CAST($clientLatColumn AS DOUBLE), CAST($clientLongColumn AS DOUBLE)), 'EPSG:4326', 'EPSG:5070')"))
val clusterShapeJoin = ueClusterDataWithGeom.alias("roamingClusterData").join(networkClusterShape.alias("shapeData"),
expr("ST_Contains(shapeData.geometry,roamingClusterData.clusterGeoPoint)"), "LeftOuter")
clusterShapeJoin.drop("geometry","clusterGeoPoint")
}
I tried with schema for shape files as well. Still the same result.
Settings
- EMR Serverless 6.9.0
- spark 3.3.2
- scala 2.12
- jdk 11
Sedona version = ?
implementation group: 'org.apache.sedona', name: 'sedona-python-adapter-3.0_2.12', version: '1.3.1-incubating'
implementation group: 'org.apache.sedona', name: 'sedona-viz-3.0_2.12', version: '1.4.1'
implementation group: 'org.apache.sedona', name: 'sedona-common', version: '1.4.1'
implementation group: 'org.apache.sedona', name: 'sedona-sql-3.0_2.12', version: '1.4.1'
Apache Spark version = ?
3.3.2
API type = Scala, Java, Python?
Scala
Scala version = 2.11, 2.12, 2.13?
2.12
JRE version = 1.8, 1.11?
jdk11
Environment = Standalone, AWS EC2, EMR, Azure, Databricks?
AWS EMR Serverless