In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, sum

import argparse

In [25]:
spark = (
    SparkSession.builder \
    .appName("S3ParquetReader") \
    .master("spark://spark-master:7077") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
    .getOrCreate()
    )

In [34]:
#parq_cols=["xyz","Intensity","Classification","Red","Green","Blue","Infrared"]
parq_cols=["xyz","Withheld","Synthetic","EdgeOfFlightLine"]

In [35]:
default_parq_file="s3a://ubs-datasets/FRACTAL/data/test/TEST-1176_6137-009200000.parquet"

# read and prune unwanted columns and persist it
df = spark.read.parquet(default_parq_file).select(*parq_cols)
#df = spark.read.parquet(default_parq_file)
df.cache()

DataFrame[xyz: array<double>, Withheld: smallint, Synthetic: smallint, EdgeOfFlightLine: smallint]

In [36]:
# the schema of with the selected columns
df.printSchema()


root
 |-- xyz: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- Withheld: short (nullable = true)
 |-- Synthetic: short (nullable = true)
 |-- EdgeOfFlightLine: short (nullable = true)



In [37]:
# print some rows
df.show(10,truncate=False

+--------------------------------------------+--------+---------+----------------+
|xyz                                         |Withheld|Synthetic|EdgeOfFlightLine|
+--------------------------------------------+--------+---------+----------------+
|[1176998.19, 6136968.24, 492.72]            |0       |0        |0               |
|[1176998.33, 6136968.12, 492.47]            |0       |0        |0               |
|[1176997.96, 6136967.78, 490.55]            |0       |0        |0               |
|[1176999.96, 6136968.43, 489.21000000000004]|0       |0        |0               |
|[1176999.55, 6136968.32, 491.68]            |0       |0        |0               |
|[1176999.58, 6136968.28, 491.24]            |0       |0        |0               |
|[1176999.67, 6136968.19, 490.89]            |0       |0        |0               |
|[1176999.87, 6136968.0200000005, 490.76]    |0       |0        |0               |
|[1176999.58, 6136968.23, 489.28000000000003]|0       |0        |0               |
|[11

In [38]:
#Check the description of the dataset
df.describe().show()


+-------+--------+---------+----------------+
|summary|Withheld|Synthetic|EdgeOfFlightLine|
+-------+--------+---------+----------------+
|  count|  141109|   141109|          141109|
|   mean|     0.0|      0.0|             0.0|
| stddev|     0.0|      0.0|             0.0|
|    min|       0|        0|               0|
|    max|       0|        0|               0|
+-------+--------+---------+----------------+



In [23]:
print("Unique values - Withheld:", [row['Withheld'] for row in df.select("Withheld").distinct().collect()], 
      "| Synthetic:", [row['Synthetic'] for row in df.select("Synthetic").distinct().collect()], 
      "| EdgeOfFlightLine:", [row['EdgeOfFlightLine'] for row in df.select("EdgeOfFlightLine").distinct().collect()])

                                                                                

Unique values - Withheld: [0] | Synthetic: [0] | EdgeOfFlightLine: [0]


In [40]:
parq_cols=["xyz","Intensity","Classification","Red","Green","Blue","Infrared"]
# read wanted columns
df = spark.read.parquet(default_parq_file).select(*parq_cols)
# print some rows
df.show(10,truncate=False)

                                                                                

+--------------------------------------------+---------+--------------+-----+-----+-----+--------+
|xyz                                         |Intensity|Classification|Red  |Green|Blue |Infrared|
+--------------------------------------------+---------+--------------+-----+-----+-----+--------+
|[1176998.19, 6136968.24, 492.72]            |1112     |5             |15104|19200|18176|35840   |
|[1176998.33, 6136968.12, 492.47]            |1151     |5             |15104|19200|18176|36608   |
|[1176997.96, 6136967.78, 490.55]            |966      |5             |15104|19968|19200|36864   |
|[1176999.96, 6136968.43, 489.21000000000004]|853      |5             |14848|18688|18944|30464   |
|[1176999.55, 6136968.32, 491.68]            |1108     |5             |15616|19712|19456|38400   |
|[1176999.58, 6136968.28, 491.24]            |1163     |5             |15616|19712|19456|38400   |
|[1176999.67, 6136968.19, 490.89]            |1175     |5             |13312|17152|17152|32768   |
|[1176999.

In [51]:
#Normalize height (by subtracting minimum z per patch)
from pyspark.sql import functions as F

#separate the x,y,z coordinates
df = df.withColumn("z", F.element_at("xyz", 3))

#get the minimum z per patch
min_z = df.agg(F.min("z").alias("z_min")).collect()[0]["z_min"]
print(f"Minimum Z (global) = {min_z}")


Minimum Z (global) = 456.48


In [52]:
#subtract the minimum z per patch to the z coordinates
df = df.withColumn("z_norm", F.col("z") - F.lit(min_z))

In [54]:
#reconstruct the xyz column with z normalized
df = df.withColumn("xyz_norm",
                   F.array(
                       F.element_at("xyz", 1),
                       F.element_at("xyz", 2),
                       F.col("z_norm")
                   ))

In [57]:
df.select("xyz", "xyz_norm").show(10, truncate=False)


+--------------------------------------------+---------------------------------------------------+
|xyz                                         |xyz_norm                                           |
+--------------------------------------------+---------------------------------------------------+
|[1176998.19, 6136968.24, 492.72]            |[1176998.19, 6136968.24, 36.24000000000001]        |
|[1176998.33, 6136968.12, 492.47]            |[1176998.33, 6136968.12, 35.99000000000001]        |
|[1176997.96, 6136967.78, 490.55]            |[1176997.96, 6136967.78, 34.06999999999999]        |
|[1176999.96, 6136968.43, 489.21000000000004]|[1176999.96, 6136968.43, 32.73000000000002]        |
|[1176999.55, 6136968.32, 491.68]            |[1176999.55, 6136968.32, 35.19999999999999]        |
|[1176999.58, 6136968.28, 491.24]            |[1176999.58, 6136968.28, 34.75999999999999]        |
|[1176999.67, 6136968.19, 490.89]            |[1176999.67, 6136968.19, 34.40999999999997]        |
|[1176999.