In [0]:
from pyspark.sql import SparkSession
from sedona.spark import SedonaContext
from sedona.spark.utils import KryoSerializer, SedonaKryoRegistrator
from sedona.spark import SedonaContext 

spark = (
    SparkSession.builder
    .appName("roof-top-metrics")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .getOrCreate()
)
spark = SedonaContext.create(spark)

### Laod Search logs from CSV

In [0]:
from pyspark.sql.functions import col, expr

search_logs = spark.read.csv("/mnt/opas/opas-source/apt-roof-top-accuracy-improvement/rooftop_prod_logs_run_id_20251209_00032117.csv", header=True, inferSchema=True, sep=",", quote='"', escape='"')

print(f"Total Search Count: {search_logs.count()}")
usa_search_logs = search_logs.filter("country == 'USA'")

usa_total_count = usa_search_logs.count()
usa_rooftop_count = usa_search_logs.filter("rooftop_match == 1").count()
usa_outside_rooftop_count = usa_search_logs.filter("rooftop_match ==0").count()

# Calculate percentages
rooftop_percentage = (usa_rooftop_count / usa_total_count) * 100
outside_rooftop_percentage = (usa_outside_rooftop_count / usa_total_count) * 100

# Print the results
print(f"USA Search Count: {usa_total_count} ({100:.2f}%)")
print(f"USA Search On RoofTop Count: {usa_rooftop_count} ({rooftop_percentage:.2f}%)")
print(f"USA Search Outside RoofTop Count: {usa_outside_rooftop_count} ({outside_rooftop_percentage:.2f}%)")

search_logs.display()

### Country Specific search logs    

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Assuming you have a Spark DataFrame named search_logs
# Group by country and rooftop_match to get the count
grouped_result = search_logs.groupBy("country", "rooftop_match").count()

# Calculate total count per country
total_count_per_country = grouped_result.groupBy("country").agg(F.sum("count").alias("total_count"))

# Join the two DataFrames to calculate the percentage
result_with_percentage = grouped_result.join(total_count_per_country, "country") \
    .withColumn("percentage", F.format_number((F.col("count") / F.col("total_count") * 100), 2)) \
    .orderBy(F.col("country").desc())

# Show the result
display(result_with_percentage)


### Load APT Dataset

In [0]:
from pyspark.sql.functions import col, expr

apt_dataset = spark.table("preprocess_prod.layer_14533")
apt_dataset = apt_dataset.filter(expr("exists(tags, x -> x.tagKey.key = 'metadata:country' AND x.value = 'USA')"))

display(apt_dataset)
# DBTITLE 1


In [0]:
from pyspark.sql import functions as F

def convert_to_unsigned(layer_id, high, low):
    # layer_id = 14533
    unsigned_high = int(high) & ((1 << 64) - 1)
    unsigned_low = int(low) & ((1 << 64) - 1)
    return f"{layer_id}_{unsigned_high}_{unsigned_low}"


# Assuming df is your DataFrame
apt_Data_with_unsigned_id = apt_dataset.withColumn("unsigned_id", F.udf(convert_to_unsigned, "string")("id.layerId","id.high", "id.low")).withColumn("location", F.expr("CONCAT('POINT(', lat, ' ', lng, ')')"))
display(apt_Data_with_unsigned_id)

### Find missing search log APT Ids in layer data

In [0]:
from pyspark.sql.functions import col, explode

missing_ids_from_search_log = search_logs.alias("search").join(apt_Data_with_unsigned_id.alias("layer"), col("search.provider_uid") == col("layer.unsigned_id"), "leftanti")
missing_ids_from_search_log.display()
print(missing_ids_from_search_log.count())


### Load APT Relocated CSV

In [0]:
relocated_apt_ds = spark.read.csv("/mnt/opas/opas-source/apt-roof-top-accuracy-improvement/relocated_usa_apt.csv", header=False, inferSchema=True, sep="|", quote='"', escape='"').withColumnRenamed("_c0", "singed_id") \
.withColumnRenamed("_c1", "location") \
.withColumnRenamed("_c2", "relocated_location") \
.withColumnRenamed("_c3", "relocated_bfp") \
.withColumnRenamed("_c4", "uuid") \
.withColumnRenamed("_c5", "comment") \
.withColumnRenamed("_c6", "hn_number") \
.withColumnRenamed("_c7", "street_name") \
.withColumnRenamed("_c8", "apt_id") 

display(relocated_apt_ds)

### Load BFP from layer 10698 OSM Visualization of Interest

In [0]:
viz_dataset = spark.table("preprocess_dev.layer_53801")

bfp_dataset = viz_dataset.filter(col("wkt").isNotNull()) \
                         .filter(expr("exists(tags, x -> x.tagKey.key = 'building' AND x.value = 'yes')"))

print(f"viz_dataset Count: {viz_dataset.count()}")
print(f"bfp_dataset Count: {bfp_dataset.count()}")
display(bfp_dataset)

In [0]:
from pyspark.sql.functions import col, expr, when

relocated_apt_ds_with_bfp_data = relocated_apt_ds.alias("relocated_apt") \
.join(bfp_dataset.alias("bfp"), expr("ST_Intersects(ST_GeomFromText(bfp.wkt), ST_GeomFromText(relocated_apt.relocated_location))"), "left") \
.withColumn("apt_inside_bfp", when(col("bfp.wkt").isNull(), False).otherwise(True))

display(relocated_apt_ds_with_bfp_data)

In [0]:
from pyspark.sql.functions import col, expr, when

num_partitions = 1000
relocated_apt_ds = relocated_apt_ds.repartition(num_partitions)
bfp_dataset = bfp_dataset.repartition(num_partitions)

relocated_apt_ds_with_bfp_data = (
    relocated_apt_ds.alias("relocated_apt")
    .join(bfp_dataset.alias("bfp").hint("shuffle"), 
          expr("ST_Intersects(ST_GeomFromText(bfp.wkt), ST_GeomFromText(relocated_apt.relocated_location))"), 
          "left")
    .withColumn("apt_inside_bfp", when(col("bfp.wkt").isNull(), False).otherwise(True))
)

display(relocated_apt_ds_with_bfp_data)

### Load BFP from MCR
MCR data already loaded from LoadDataFromMcr.scala(orbis-addressing-bulk-apt-tools) at delta table: preprocess_prod.bfp

In [0]:
mcr_usa_bfp_dataset = spark.table("preprocess_prod.bfp").filter(col("licenseZone") == "USA")

display(mcr_usa_bfp_dataset)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

num_partitions = 1000

# Repartition datasets
relocated_apt_ds = relocated_apt_ds.repartition(num_partitions)
mcr_usa_bfp_dataset = mcr_usa_bfp_dataset.select("wkt").repartition(num_partitions)  # Select only necessary columns

# Optionally cache the smaller dataset if it's reused multiple times
relocated_apt_ds.cache()

# Use broadcast join if the BFP dataset is small enough
relocated_apt_ds_with_bfp_data = (
    relocated_apt_ds.alias("relocated_apt")
    .join(
        F.broadcast(mcr_usa_bfp_dataset.alias("bfp")),  # Broadcast join
        F.expr("ST_Intersects(ST_GeomFromText(bfp.wkt), ST_GeomFromText(relocated_apt.relocated_location))"),
        "left"
    )
    .withColumn("apt_inside_bfp", F.when(F.col("bfp.wkt").isNull(), False).otherwise(True))
    .dropDuplicates(["relocated_location"])
)

# Optionally unpersist if you no longer need the cached dataset
relocated_apt_ds.unpersist()

# Proceed with further operations on relocated_apt_ds_with_bfp_data

display(relocated_apt_ds_with_bfp_data)

In [0]:
print(f"viz_dataset relocated_apt_ds: {relocated_apt_ds.count()}")
print(f"viz_dataset relocated_apt_ds_with_bfp_data: {relocated_apt_ds_with_bfp_data.count()}")

In [0]:
num_partitions = 1000
relocated_apt_ds = relocated_apt_ds.repartition(num_partitions)
mcr_usa_bfp_dataset = mcr_usa_bfp_dataset.limit(1000000).repartition(num_partitions)

relocated_apt_ds_with_bfp_data = (
    relocated_apt_ds.alias("relocated_apt")
    .join(mcr_usa_bfp_dataset.alias("bfp").hint("shuffle"), 
          expr("ST_Intersects(ST_GeomFromText(bfp.wkt), ST_GeomFromText(relocated_apt.relocated_location))"), 
          "left")
    .withColumn("apt_inside_bfp", when(col("bfp.wkt").isNull(), False).otherwise(True))
)

display(relocated_apt_ds_with_bfp_data)