##Create Vector DGL output combining all results

**Attributes**
- ID (numeric)
- Land Ownership (string)
- Habitat (string, comma separated if two habs, 'Multiple' if 3+)
- Classification (combination of Land Ownership and Habitat)
- Habitat flags (boolean)

In [0]:
%pip install keplergl pydeck mapclassify rtree pygeos geopandas==1.0.0
dbutils.library.restartPython()

In [0]:
import geopandas as gpd
from pathlib import Path
from functools import reduce

In [0]:
from sedona.spark import *
from pyspark.sql.functions import expr, when, col
from pyspark.sql import functions as F

sedona = SedonaContext.create(spark)
sqlContext.clearCache()

username = (
    dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
)

Load in grid

In [0]:
# Define size of grid square
grid_square_size = 10

In [0]:
grid_path = Path(
    "/dbfs/mnt/lab-res-a1001005/esd_project/Defra_Land/Model_Grids"
)
alt_grid_path = str(grid_path).replace("/dbfs", "dbfs:")

In [0]:
eng_grid = (
    sedona.read.format("parquet")
    .load(f"{alt_grid_path}/{grid_square_size}m_england_grid.parquet")
    .withColumn("geometry", expr("st_makevalid(st_setsrid(ST_GeomFromWKB(geometry),27700))"))
    .repartition(1305)
)

eng_grid.createOrReplaceTempView("eng_grid")

Iterate through habitats and add boolean mask

Also add freehold/leasehold information on first iteration

In [0]:
habitats_cols = {
    'Arable': ['le_comb', 'phi_comb', 'lcm_comb', 'crome'],
    'Coastal': ["le_comb","phi_comb","lcm_comb","ne_marine"],
    'Saltmarsh': ["le_saltmarsh","phi_saltmarsh","lcm_saltmarsh","ne_marine_saltmarsh"],
    'Grassland': ["le_unimproved_grass","phi_comb","lcm_comb"],
    'Moorland': ["le_comb","phi_comb","lcm_comb"],
    'Upland_Bog': ["le_bog","phi_blanket_bog","lcm_bog"],
    'Woodland_Dense': ["le_comb","lcm_comb","nfi_dense","phi_deciduous_woodland"],
    'Woodland_Sparse': ["le_scrub","nfi_sparse","wood_pasture_park","phi_traditional_orchard","fr_tow"],
    'Urban': ["le_urban","lcm_comb","ons_urban"],
    'Water': ["le_comb","lcm_comb","phi_comb","os_ngd_water"]
}

In [0]:
asset_path = Path(
    "/dbfs/mnt/lab-res-a1001005/esd_project/Defra_Land/Final/Asset_Tables"
)
alt_asset_path = str(asset_path).replace("/dbfs", "dbfs:")

In [0]:
n = 1

In [0]:
for hab, cols in habitats_cols.items():

  print(hab)

  if hab == 'Woodland_Dense':
    hab_read= 'Dense_Woodland'
  elif hab == 'Woodland_Sparse':
    hab_read= 'Sparse_Woodland'
  else:
    hab_read = hab

  data_table = sedona.read.format("parquet").load(f"{alt_asset_path}/10m_x_assets_combined_{hab_read.lower()}.parquet")

  data_table.createOrReplaceTempView("data_table")

  # Add Freehold/Leasehold Information
  if n == 1:
    ownership = data_table.select(
        col("id"),  
        when(col("dgl_fh") == 1, "Freehold")  
        .when(col("dgl_lh") == 1, "Leasehold")  
        .otherwise(None)  
        .alias("Land_Ownership")  
    )

    data_combined = eng_grid.join(ownership, on="ID", how="left")
    data_combined.createOrReplaceTempView("data_combined")

    n = 2

  if hab == 'Upland_Bog':
    condition = ((F.col("dgl_fh") == 1) | (F.col("dgl_lh") == 1)) & (F.col("moorland_line") == 1)
  else:
    condition = ((F.col("dgl_fh") == 1) | (F.col("dgl_lh") == 1)) 


  data_score = data_table.withColumn(
    hab,
    F.when(
        condition & reduce(lambda acc, col: acc | (F.col(col) == 1), cols, F.lit(False)),  
        F.lit(1) 
    ).otherwise(0))

  data_score = data_score.select('id', hab)

  data_combined = data_combined.join(data_score, on="ID", how="left")
  data_combined.createOrReplaceTempView("data_combined")

In [0]:
data_footprint = data_combined.filter(data_combined["Land_Ownership"].isNotNull())
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
new_column_names = {
    "Land_Ownership": "Land Ownership",
    "Arable": "Enclosed Farmland",
    "Coastal": "Coastal Margins",
    "Grassland": "Semi-Natural Grassland",
    "Moorland": "Moorland and Heath",
    "Upland_Bog": "Upland Bog",
    "Woodland_Dense": "Woodland (Dense)",
    "Woodland_Sparse": "Woodland (Sparse)",
    "Water": "Freshwater and Wetlands"
}

for old_name, new_name in new_column_names.items():
    data_footprint = data_footprint.withColumnRenamed(old_name, new_name)
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
columns_to_sum = ["Enclosed Farmland","Coastal Margins","Saltmarsh","Semi-Natural Grassland","Moorland and Heath","Upland Bog","Woodland (Dense)","Woodland (Sparse)","Urban","Freshwater and Wetlands"]

In [0]:
data_footprint = data_footprint.withColumn(
    "sum",
    sum(F.coalesce(F.col(col), F.lit(0)) for col in columns_to_sum)
)
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
data_footprint = data_footprint.withColumn(
    "Habitat",
    F.when(
        F.col("sum") == 1,
        F.concat_ws(", ", *[
            F.when(F.col(col) == 1, F.lit(col)).otherwise(None)
            for col in columns_to_sum
        ])
    ).otherwise("None")
)
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
data_footprint = data_footprint.withColumn(
    "Habitat",
    F.when(
        (F.col("Habitat") == "None") &
        (F.col("sum") == 2) &
        (F.col("Coastal Margins") == 1) &
        (F.col("Saltmarsh") == 1),
        F.lit("Saltmarsh")
    )
    .otherwise(F.col("Habitat"))
)
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
data_footprint = data_footprint.withColumn(
    "Habitat",
    F.when(
        (F.col("Habitat") == "None") &
        (F.col("sum") == 2) &
        (F.col("Moorland and Heath") == 1) &
        (F.col("Upland Bog") == 1),
        F.lit("Upland Bog")
    )
    .otherwise(F.col("Habitat"))
)
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
data_footprint = data_footprint.withColumn(
    "Habitat",
    F.when(
        (F.col("Habitat") == "None") &
        (F.col("sum") == 2) &
        (F.col("Woodland (Dense)") == 1) &
        (F.col("Woodland (Sparse)") == 1),
        F.lit("Woodland (Mixed)")
    )
    .otherwise(F.col("Habitat"))
)
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
data_footprint = data_footprint.withColumn(
    "Habitat",
    F.when(
        (F.col("Habitat") == "None") &
        (F.col("sum") == 5),
        F.lit("Multiple Habitats")
    )
    .otherwise(F.col("Habitat"))
)
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
data_footprint = data_footprint.withColumn(
    "matching_columns",
    F.concat_ws(
        ", ",
        F.array_sort(
            F.array(*[F.when(F.col(c) == 1, F.lit(c)).otherwise(None) for c in columns_to_sum])
        )
    )
)

# Update the 'Habitat' column based on your conditions
data_footprint = data_footprint.withColumn(
    "Habitat",
    F.when(
        (F.col("Habitat") == "None") & (F.col("sum") == 2),
        F.when(F.col("matching_columns") == "", F.lit("")).otherwise(F.col("matching_columns"))
    ).otherwise(F.col("Habitat"))
)

# Drop the intermediate column if not needed
data_footprint = data_footprint.drop("matching_columns")
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
data_footprint = data_footprint.withColumn(
    "matching_columns",
    F.concat_ws(
        ", ",
        F.array_sort(
            F.filter(
                F.array(*[F.when(F.col(c) == 1, F.lit(c)).otherwise(None) for c in columns_to_sum]),
                lambda x: (x != "Coastal Margins") | (~F.array_contains(F.array(*[F.lit(c) for c in columns_to_sum]), "Saltmarsh"))
            )
        )
    )
)


# Update the 'Habitat' column based on your conditions
data_footprint = data_footprint.withColumn(
    "Habitat",
    F.when(
        (F.col("Habitat") == "None") & (F.col("sum") == 3) & (F.col("Saltmarsh") == 1),
        F.when(F.col("matching_columns") == "", F.lit("")).otherwise(F.col("matching_columns"))
    ).otherwise(F.col("Habitat"))
)

# Drop the intermediate column if not needed
data_footprint = data_footprint.drop("matching_columns")
data_footprint.createOrReplaceTempView("data_footprint")


In [0]:
data_footprint = data_footprint.withColumn(
    "matching_columns",
    F.concat_ws(
        ", ",
        F.array_sort(
            F.filter(
                F.array(*[F.when(F.col(c) == 1, F.lit(c)).otherwise(None) for c in columns_to_sum]),
                lambda x: (x != "Moorland and Heath") | (~F.array_contains(F.array(*[F.lit(c) for c in columns_to_sum]), "Upland Bog"))
            )
        )
    )
)

# Update the 'Habitat' column based on your conditions
data_footprint = data_footprint.withColumn(
    "Habitat",
    F.when(
        (F.col("Habitat") == "None") & (F.col("sum") == 3) & (F.col("Upland Bog") == 1),
        F.when(F.col("matching_columns") == "", F.lit("")).otherwise(F.col("matching_columns"))
    ).otherwise(F.col("Habitat"))
)

# Drop the intermediate column if not needed
data_footprint = data_footprint.drop("matching_columns")
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
matching_array = F.array(*[F.when(F.col(c) == 1, F.lit(c)).otherwise(None) for c in columns_to_sum])

# Create the new column with conditional replacement
data_footprint = data_footprint.withColumn(
    "matching_columns",
    F.concat_ws(
        ", ",
        F.when(
            F.array_contains(matching_array, "Woodland (Dense)") & F.array_contains(matching_array, "Woodland (Sparse)"),
            # Replace Dense and Sparse with Mixed, then sort alphabetically
            F.array_sort(
                F.array_union(
                    F.array(F.lit("Woodland (Mixed)")),
                    F.filter(matching_array, lambda x: (x != "Woodland (Dense)") & (x != "Woodland (Sparse)"))
                )
            )
        ).otherwise(
            # Leave the array untouched for other rows, but sorted
            F.array_sort(matching_array)
        )
    )
)
# Update the 'Habitat' column based on your conditions
data_footprint = data_footprint.withColumn(
    "Habitat",
    F.when(
        (F.col("Habitat") == "None") & (F.col("sum") == 3) & (F.col("Woodland (Dense)") == 1) & (F.col("Woodland (Sparse)") == 1),
        F.when(F.col("matching_columns") == "", F.lit("")).otherwise(F.col("matching_columns"))
    ).otherwise(F.col("Habitat"))
)

# Drop the intermediate column if not needed
data_footprint = data_footprint.drop("matching_columns")
data_footprint.createOrReplaceTempView("data_footprint")


In [0]:
data_footprint = data_footprint.withColumn(
    "Habitat",
    F.when(
        (F.col("Habitat") == "None") &
        (F.col("sum") == 3),
        F.lit("Multiple Habitats")
    )
    .otherwise(F.col("Habitat"))
)
data_footprint.createOrReplaceTempView("data_footprint")

In [0]:
rules = [
    (
        ["Woodland (Dense)", "Woodland (Sparse)", "Coastal Margins", "Saltmarsh"],
        "Saltmarsh, Woodland (Mixed)"
    ),
    (
        ["Woodland (Dense)", "Woodland (Sparse)", "Upland Bog", "Moorland and Heath"],
        "Upland Bog, Woodland (Mixed)"
    ),
    (
        ["Upland Bog", "Moorland and Heath", "Coastal Margins", "Saltmarsh"],
        "Saltmarsh, Upland Bog"
    ),
]

# Convert rules into sets for comparison
rules = [(set(rule[0]), rule[1]) for rule in rules]

# Create a matching array of columns with value 1
matching_array = F.array(*[F.when(F.col(c) == 1, F.lit(c)).otherwise(None) for c in columns_to_sum])

# Update the Habitat column based on rules
data_footprint = data_footprint.withColumn(
    "Habitat",
    F.when(
        (F.col("Habitat") == "None") & (F.col("sum") == 4),
        F.coalesce(
            *[
                F.when(
                    F.size(F.array_intersect(matching_array, F.array(*[F.lit(col) for col in rule_set]))) == 4,
                    F.lit(habitat)
                )
                for rule_set, habitat in rules
            ],
            F.lit("Multiple Habitats")
        )
    ).otherwise(F.col("Habitat"))
)

data_footprint.createOrReplaceTempView("data_footprint")


In [0]:
data_footprint.display()

In [0]:
data_footprint = data_footprint.withColumn(
    "Classification",
    F.concat_ws(" - ", F.col("Land Ownership"), F.col("Habitat"))
)

In [0]:
final = data_footprint.select('id', 'Land Ownership','Habitat','Classification','geometry','Woodland (Dense)','Woodland (Sparse)','Moorland and Heath','Upland Bog','Semi-Natural Grassland','Coastal Margins','Saltmarsh','Enclosed Farmland','Freshwater and Wetlands','Urban',col('sum').alias('Count'))

In [0]:
final_pd = final.toPandas()
final_gpd = gpd.GeoDataFrame(final_pd, geometry='geometry')
final_gpd.crs = 'epsg:27700'

In [0]:
out_path = Path(
    "/dbfs/mnt/lab-res-a1001005/esd_project/Defra_Land/Final/"
)

alt_out_path = str(out_path).replace("/dbfs", "dbfs:")

In [0]:
final_gpd.to_parquet(str(out_path)+"/DGL_Habitats.parquet")

In [0]:
def download_link(filepath):
    # NB filepath must be in the format dbfs:/ not /dbfs/
    # Get filename
    filename = filepath[filepath.rfind("/") :]
    # Move file to FileStore
    dbutils.fs.cp(filepath, f"dbfs:/FileStore/{filename}")
    # Construct download url
    url = f"https://{spark.conf.get('spark.databricks.workspaceUrl')}/files/{filename}?o={spark.conf.get('spark.databricks.clusterUsageTags.orgId')}"
    # Return html snippet
    return f"<a href={url} target='_blank'>Download file: {filename}</a>"

In [0]:
displayHTML(download_link(alt_out_path + '/DGL_Habitats.parquet'))

In [0]:
final_gpd_diss = final_gpd.dissolve(by='Classification').explode(index_parts=False).reset_index(drop=True)

In [0]:
final_gpd_diss

In [0]:
final_gpd_diss['Classification'] = final_gpd_diss['Land Ownership'] + ' - ' + final_gpd_diss['Habitat']

In [0]:
final_gpd_diss['Area'] = final_gpd_diss .geometry.area

In [0]:
keep = ['Land Ownership','Habitat','Classification','Area','geometry']
final_gpd_diss = final_gpd_diss[keep]
final_gpd_diss

In [0]:
final_gpd_diss.crs = 'epsg:27700'

In [0]:
final_gpd_diss.to_parquet(str(out_path)+"/DGL_Habitats_Summary.parquet")

In [0]:
displayHTML(download_link(alt_out_path + '/DGL_Habitats_Summary.parquet'))

In [0]:
import shutil
import tempfile

with tempfile.TemporaryDirectory() as tmp:
    data_name = 'DGL_Habitats_Summary'
    output_file = tmp + '/' + data_name + '.gpkg'
    final_gpd.to_file(output_file, layer= data_name, driver='GPKG')
    shutil.copy(output_file, str(out_path) + '/' + data_name + '.gpkg')

displayHTML(download_link(alt_out_path + '/' + data_name + '.gpkg'))