In [0]:
%load_ext autoreload
%autoreload 2

## Process Data

This notebook takes all the raw files from the bronze volume and processes them into a single table. It could be optimized to use less ram, but does the trick.

In [0]:
from well_agent.utils import get_config_path, DotConfig
config_path = get_config_path()
config_path = 'config.yaml'
config = DotConfig(config_path)

In [None]:
from databricks.connect import DatabricksSession as SparkSession

if "spark" not in locals():
    spark = SparkSession.builder.serverless(True).getOrCreate()

## Bronze to Silver Cleaning
There are a couple problems with how the files were saved for use with spark. We need to loop through every parquet and fix the timestamp before reading with spark.

In [0]:
from pathlib import Path
import pandas as pd
cols_to_keep = ['P-PDG', 'P-MON-CKP', 'P-MON-CKGL', 'P-JUS-CKGL', 'P-JUS-CKP', 'P-TPT', 'T-TPT', 'QGL', 'T-JUS-CKP', 'state', 'timestamp']

for path in Path(config.download.output_dir).rglob('*.parquet'):
  df_in = pd.read_parquet(path)
  df_out = df_in.reset_index()[cols_to_keep].dropna(subset=['state'])
  out_path = str(path).replace('bronze','silv')
  
  # check if the modified file exists already
  if Path(out_path).exists():
    next
  else:
    print(path)
  
  # save the fixed file out
  Path(out_path).parent.mkdir(exist_ok=True)
  df_out.to_parquet(
    Path(out_path),
    coerce_timestamps="ms", 
    allow_truncated_timestamps=True,
    index=False
    )

## Table Generation

Now that we have proper files, we can use spark to make an enormous table.

In [0]:
from pyspark.sql.types import (
    StructType, StructField, DoubleType, IntegerType, TimestampType, LongType, StringType
)

SCHEMA_3W = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("P-PDG", DoubleType(), True),
    StructField("P-MON-CKP", DoubleType(), True),
    StructField("P-JUS-CKP", DoubleType(), True),
    StructField("P-TPT", DoubleType(), True),
    StructField("T-TPT", DoubleType(), True),
    StructField("QGL", DoubleType(), True),
    StructField("T-JUS-CKP", DoubleType(), True),
    StructField("state", IntegerType(), True),
])

In [0]:
path_pattern = f"{config.process.silver_dir}**/*.parquet"
df = spark.read.schema(SCHEMA_3W).parquet(path_pattern)

In [0]:
from pyspark.sql.functions import col, sum as spark_sum, isnan

na_counts = df.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns
])
display(na_counts)

In [0]:
from well_agent.process import clean_data_spark

In [0]:
df = df.withColumn("source_file", df["_metadata.file_path"])
df = clean_data_spark(df)

## forward fill

In [0]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

well_col = "well_number"  # adjust if your well column is named differently
timestamp_col = "timestamp"

window_spec = Window.partitionBy(well_col).orderBy(timestamp_col).rowsBetween(Window.unboundedPreceding, 0)

ffill_exprs = [
    F.last(F.col(c), ignorenulls=True).over(window_spec).alias(c)
    if c not in [well_col, timestamp_col] else F.col(c)
    for c in df.columns
]

df = df.select(ffill_exprs)

In [0]:
(
    df
    .write.mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(f"{config.catalog}.{config.schema}.{config.process.table}")
)