In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!pip install pyspark

In [None]:
def read_csv_point_cloud(csv_file):
    data = pd.read_csv(csv_file, header=None)
    return data

In [None]:
csv_file = "/kaggle/input/vaihingen/area2_cov_multi.csv"
points = read_csv_point_cloud(csv_file)

In [None]:
points[['x', 'y', 'z', 'cl', 'cs', 'cp', 'label']] = points[0].str.split(' ', expand=True)
points

In [None]:
points.drop(columns=[0, 'cl', 'cs', 'cp', 'label'], inplace=True)
points = points.astype(float)

In [None]:
# points = points[0:10000]

In [None]:
points.to_csv('points.csv')

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType
import numpy as np
from scipy.spatial import KDTree

# Initialize Spark session
spark = SparkSession.builder \
    .appName("PointCloudEigenfeatures") \
    .getOrCreate()

# Define the schema to ignore the first empty column
schema = StructType([
    StructField("_c0", FloatType(), True),
    StructField("x", FloatType(), True),
    StructField("y", FloatType(), True),
    StructField("z", FloatType(), True)
])

# Load your point cloud data with the defined schema
df = spark.read.csv("/kaggle/working/points.csv", header=True, schema=schema)

# Drop the unnecessary first column
df = df.drop("_c0")

# Convert DataFrame to NumPy array
point_cloud = np.array(df.collect())

In [None]:
broadcast_point_cloud = spark.sparkContext.broadcast(point_cloud)

In [None]:
from scipy.spatial import KDTree
def compute_eigenfeatures(point):
    # Extract the point cloud from the broadcast variable
    point_cloud = broadcast_point_cloud.value
    
    # Create a KDTree for efficient neighbor search
    kdtree = KDTree(point_cloud)
    
    # Find neighbors within the radius
    radius = 0.25
    indices = kdtree.query_ball_point(point, radius)
    
    # Get the neighbors
    neighbors = point_cloud[indices]
    
    # Compute eigenfeatures (eigenvalues of the covariance matrix)
    if len(neighbors) > 1:
        cov_matrix = np.cov(neighbors, rowvar=False)
        eigenvalues, _ = np.linalg.eigh(cov_matrix)
    else:
        # Handle edge cases where neighbors are less than 2
        eigenvalues = np.array([0, 0, 0])
    
    return (point.tolist(), eigenvalues.tolist())

# Convert the DataFrame to an RDD for parallel processing
rdd = df.rdd.map(lambda row: np.array([row['x'], row['y'], row['z']]))

In [None]:
# Apply the eigenfeature computation in parallel
eigenfeatures_rdd = rdd.map(compute_eigenfeatures)

# Collect the results
results = eigenfeatures_rdd.collect()

# Convert results to a DataFrame for further processing or saving
result_df = spark.createDataFrame(results, schema=["point", "eigenvalues"])
result_df.show()

In [None]:
results_df = result_df.toPandas()

In [None]:
results_df = np.array(results_df)

In [None]:
results_df

In [None]:
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

def compute_covariance_eigenvalues(neighbors):
    if len(neighbors) < 3:
        return [0.0, 0.0, 0.0]  # Not enough points to define a plane
    
    points = np.array(neighbors)
    cov_matrix = np.cov(points.T)
    eigenvalues = np.linalg.eigvalsh(cov_matrix)
    
    return sorted(eigenvalues.tolist())

# Register the UDF
compute_eigenvalues_udf = udf(compute_covariance_eigenvalues, ArrayType(FloatType()))

In [None]:
from pyspark.sql.functions import col, collect_list, lit

# Define the radius for neighborhood search
radius = 0.25

# Broadcast the point cloud data
point_cloud_broadcast = spark.sparkContext.broadcast(point_cloud_df.collect())

def get_neighbors(point, radius, all_points):
    px, py, pz = point
    neighbors = []
    for row in all_points:
        x, y, z, label = row
        if np.sqrt((x - px) ** 2 + (y - py) ** 2 + (z - pz) ** 2) <= radius:
            neighbors.append([x, y, z])
    return neighbors

# UDF to find neighbors
def get_neighbors_udf(px, py, pz):
    all_points = point_cloud_broadcast.value
    neighbors = get_neighbors([px, py, pz], radius, all_points)
    return neighbors

# Register the UDF
find_neighbors_udf = udf(get_neighbors_udf, ArrayType(ArrayType(FloatType())))

# Add neighbors to the DataFrame
point_cloud_with_neighbors = point_cloud_df.withColumn("neighbors", find_neighbors_udf(col("x"), col("y"), col("z")))

# Compute eigenvalues for each point's neighborhood
point_cloud_with_eigen = point_cloud_with_neighbors.withColumn("eigenvalues", compute_eigenvalues_udf(col("neighbors")))

# Show result
point_cloud_with_eigen.select("x", "y", "z", "eigenvalues").show()