In [1]:
import pyspark.sql.functions as F
from datetime import datetime, timedelta, date
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import sum, to_date, col, when, month, dayofmonth, explode, array, struct, expr, lit , avg
import geopandas as gpd
import matplotlib.pyplot as plt
from shapely import wkt
import pandas as pd

**Load dataframes**

In [2]:
%run Meta/connection_information

In [3]:
#create mount point
mssparkutils.fs.mount( 
    ADLS_PATH+"/base/extern/wiwb/", 
    "/bwiwb/", 
    {"linkedService":"AzureDataLakeStorage1"} 
) 

In [4]:
#load geojson polygon file
cells = gpd.read_file(f"/data/polygons_irc.geojson")
cells.index = cells["index"]
cells = cells.drop(columns=['index'])

In [5]:
#load dataframes
BASE_GIS_FOLDER = '/base/wsbd/gis/WEB_Beheerregister_Afvalwaterketen/zuiveringseenheden'
BASE_IRC_FOLDER = '/base/extern/wiwb/irc_combined'
ENRICHED_FOLDER = '/enriched/extern/waterketen'

# Needed to check when pipeline was last succesfull
data_source = 'neerslag_per_zuivering'
# Get today's date
today_date = date.today()

In [6]:
def get_max_date_value(data_source: str) -> int:
    try:
        # Get the maximum timestamp value from the specified column
        max_timestamp = spark.read.format('delta').load(f'{ENRICHED_FOLDER}/{data_source}').select(F.max(F.col("date_hour"))).first()[0]
        # Convert the maximum timestamp to a datetime object
        max_datetime = datetime.strptime(str(max_timestamp), '%Y-%m-%d %H')
        # Calculate the difference in days between max date and today
        date_difference = (today_date - max_datetime.date()).days
    except:
        return 0
    return date_difference

In [7]:
# filter data based last date in the source file
date_difference = get_max_date_value(data_source)
days_ago = today_date - timedelta(days=date_difference)

year, month, day = days_ago.year, days_ago.month, days_ago.day
DATE_FROM = date(year, month, day)

# load irc to DataFrame
df_irc = spark.read \
            .format('delta') \
            .load(BASE_IRC_FOLDER)

df_irc = df_irc.filter((df_irc.timestamp >= DATE_FROM))

**Zet coordinaat systeem van de polygon geosjson om van rd new naar wgs84**

In [51]:
# Create a GeoDataFrame from the Pandas DataFrame
cells = gpd.GeoDataFrame(cells, geometry="geometry")

# Define the RD New and WGS84 CRS
crs_rd = "EPSG:28992"
crs_wgs84 = "EPSG:4326"

# Set the CRS of the original GeoDataFrame to RD New
cells.crs = crs_rd

# Convert the coordinates to WGS84 using .to_crs()
cells_84 = cells.to_crs(crs_wgs84)

# Plot the geometries using GeoPandas
#cells.plot()

**Laad gislaag in**

In [9]:
# load zuiveringseenheden to DataFrame
zuiveringseenheden_df = (
    spark
    .read
    .format("delta")
    .load(BASE_GIS_FOLDER))

In [10]:
# Convert the Spark DataFrame to a Pandas DataFrame
zuiveringseenheden_df_pandas = zuiveringseenheden_df.toPandas()

# Convert the WKT geometries to Shapely geometries
zuiveringseenheden_df_pandas["geometry"] = gpd.GeoSeries.from_wkt(zuiveringseenheden_df_pandas["geometrie"])
zuiveringseenheden_df_pandas = zuiveringseenheden_df_pandas.drop(columns=['geometrie'])

# Create a GeoDataFrame from the Pandas DataFrame
gpd_zuiveringseenheden = gpd.GeoDataFrame(zuiveringseenheden_df_pandas, geometry="geometry", crs="EPSG:4326")

# Plot the geometries using GeoPandas
#zuiveringseenheden_df.plot()

In [20]:
# # Create a new plot
# fig, ax = plt.subplots()

# # Plot the geometries from the first GeoDataFrame
# cells_84.plot(ax=ax, color='blue', label='raster cellen WIWB')

# # Plot the geometries from the second GeoDataFrame
# gpd_zuiveringseenheden.plot(ax=ax, color='red', label='Zuiveringseenheden')

# # Set plot title and legend
# plt.title("Polygon laag ten opzichte van zuiveringseenhedn")
# plt.legend()

# # Show the plot
# plt.show()

**Make an intersection of the zuiveringseenheden and the polygonen and use the selected polygons to select the right data from the wiwb dataframe**

In [75]:
# Reset the index
cells_84 = cells_84.reset_index()

# Perform the spatial join using 'union' as the operation in overlay
union_gdf = cells_84.overlay(gpd_zuiveringseenheden, how='union')

union_gdf['area'] = union_gdf.area

# Drop rows with NaN values in the specified column
union_gdf = union_gdf.dropna(subset=['index','naam'])

In [76]:
# Calculate percentage of surface that belongs to the zuivering
# Multiply the 'area' column by 10000 and round the percentage column
union_gdf['percentage'] = (union_gdf['area'] * 10000 / 0.011895).round(0)

# Drop geometry and area
union_gdf = union_gdf.drop(columns=['geometry','area','naam'])

# Convert Pandas DataFrame to PySpark DataFrame
union_spark = spark.createDataFrame(union_gdf)

In [33]:
# resample the data to daily data, summing the precipitation
df_irc = df_irc.groupBy(to_date('timestamp').alias('date'))\
                .sum()

# adjust col names to match dict
remove_sum_col_names = list(map(lambda x: x.replace("sum(", "").replace(")", ""), df_irc.columns[1:]))
df_irc = df_irc.toDF("date", *remove_sum_col_names)

In [34]:
#function to melt dataframe
def to_explode(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("polygon"), col(c).alias("precipation")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.polygon", "kvs.precipation"])

In [42]:
columns_to_drop = ['year', 'month', 'day']
df_irc = df_irc.drop(*columns_to_drop)

#start function
df_irc_explode = to_explode(df_irc, ['date'])

In [77]:
#join irc and percentage zuiveringseenheid
df_irc_total = df_irc_explode.join(union_spark,df_irc_explode.polygon ==  union_spark.index,"fullouter")

# Drop rows where "index" is null
df_irc_total = df_irc_total.filter(col("index").isNotNull())

In [79]:
#percentage surface * amount of precipation
df_irc_total = df_irc_total\
  .withColumn('precipitation_percentage', (F.col('percentage')*F.col('precipation'))/100)

In [80]:
#aggregate zuivering days and precipitation
zuiveringen_precipitation = df_irc_total.groupby("date","CODE")\
    .agg(avg("precipitation_percentage").cast('double').alias("precipitation"),\
         )

In [37]:
# save DataFrame
zuiveringen_precipitation.write \
        .format("delta") \
        .mode("append") \
        .save(f'{ENRICHED_FOLDER}/neerslag_per_zuivering')
