#### Data calculations and statistics

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import dlt

In [0]:
@dlt.view(name = "gold_unpivoted")

def gold_unpivoted():
  df = dlt.read("all_wells_gold")

  unpivotExpr = "stack(16, 'GR', GR, 'ILD', ILD, 'ILM', ILM, 'CALI', CALI, 'DPHI', DPHI, 'DT', DT, 'DT2', DT2, 'NPHI', NPHI, 'RHOB', RHOB, 'SFLU', SFLU, 'SP', SP, 'TEN', TEN, 'POROSITY', POROSITY, 'Perm', PERM, 'Fluvialfacie', Fluvialfacies, 'NetGross', NetGross) as (log_name, values)"
  
  return (
    df.selectExpr("well_id", "DEPTH", unpivotExpr)
  )

@dlt.table(
    name = "gold_unpivoted_snapshot",
    comment = "Direct batch snapshot from Gold, without streaming, for windows calculations",
    table_properties = {"layer": "gold", "type": "well log"}
)
def create_gold_unpivoted_snapshot():
    return spark.sql("""
        SELECT well_id, DEPTH, log_name, CAST(values AS DOUBLE) AS values
        FROM (
            SELECT well_id, DEPTH, log_name, CAST(values AS DOUBLE) AS values
            FROM LIVE.gold_unpivoted
        )
    """)

@dlt.table(
    name = "gold_unpivoted_cleaned",
    comment = "Cleaned unpivoted table that contains all wells in gold layer without null values",
    table_properties = {"layer" : "gold", "type" : "well log"}
)

def gold_unpivoted_cleaned():
  df2 = dlt.read("gold_unpivoted_snapshot")

  return (
    df2
    .select("well_id", "DEPTH", "log_name", col("values").cast("double").alias("values"))
    .where((col("values") != lit(-999.25)) & col("values").isNotNull())
    .orderBy("well_id", "DEPTH", "log_name", ascending=[True, True, True])
  )

In [0]:
# Making the coresponding aggregations
@dlt.table(
    name = "gold_statistics_summary",
    comment = "Table that contains logs statistical summary per well in gold layer",
    table_properties = {"layer" : "gold", "type" : "well log"}
)
def gold_statistics_summary():
  
  # Making the coresponding aggregations 
  df_agg = (dlt.read("gold_unpivoted_snapshot")
           .groupBy("well_id", "log_name")
           .agg(
                min(when((col("values") != "-999.25") & col("values").isNotNull(), col("values"))).alias("min_value"),
                max(when((col("values") != "-999.25") & col("values").isNotNull(), col("values"))).alias("max_value"),
                avg(when((col("values") != "-999.25") & col("values").isNotNull(), col("values"))).alias("avg_value"),
                stddev(when((col("values") != "-999.25") & col("values").isNotNull(), col("values"))).alias("stddev_value"),
                count("values").alias("total_values_count"),
                count(when((col("values") != "-999.25") & col("values").isNotNull(), col("values"))).alias("valid_values_count"))
           .orderBy("well_id", "log_name", ascending=[True, True])
           .where((col("total_values_count") != 0) & (col("valid_values_count") != 0))
  )
  
  # Identify the min and max valid depth from each log
  window_spec_min = Window.partitionBy("well_id", "log_name").orderBy(col("DEPTH").asc())
  window_spec_max = Window.partitionBy("well_id", "log_name").orderBy(col("DEPTH").desc())
  
  df_min_depth = (dlt.read("gold_unpivoted_cleaned")
                  .withColumn("row_num", row_number().over(window_spec_min))
                  .where(col("row_num") == 1)
                  .select("well_id", "log_name", col("DEPTH").alias("min_depth_valid"))
  )
  
  df_max_depth = (dlt.read("gold_unpivoted_cleaned")
                  .withColumn("row_num", row_number().over(window_spec_max))
                  .where(col("row_num") == 1)
                  .select("well_id", "log_name", col("DEPTH").alias("max_depth_valid"))
  )
  
  # Calculating the difference between depths to find the measurement intervals
  df_with_diff = (dlt.read("gold_unpivoted_cleaned").withColumn(
      "depth_diff",
      col("DEPTH") - lag("DEPTH", 1).over(window_spec_min)
      )
  )
  
  # Calculating the sum between all measurement intervals to find the total valid interval
  df_valid_depth = (df_with_diff
               .groupBy("well_id", "log_name")
               .agg(
                   sum("depth_diff").alias("valid_interval_meters")
               )
  )
  
  # Joining everything to make the final table
  return (
      df_min_depth
      .join(df_max_depth, on=["well_id", "log_name"], how="inner")
      .join(df_valid_depth, on=["well_id", "log_name"], how="inner")    
      .join(df_agg, on=["well_id", "log_name"], how="inner")
  )