#Oil Extraction Production Forecasting
<br/>
<img src="https://www.nsenergybusiness.com/wp-content/uploads/sites/4/2022/07/refinery-ga56d4972f_640.jpg" />

In [0]:
#IMPORTANT! DO NOT CHANGE THESE VALUES!!!!
catalog = "workshop"
db = "default"
current_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().get("user").get()

#IMPORTANT! THIS NEEDS TO BE UNIQUE FOR EVERY PARTICIPANT!!!!
#IMPORTANT! THIS NEEDS TO BE THE NAME OF THE TABLE YOU CREATED FOR THIS LAB!!!!
src_table = "ademianczuk_oil_yield"

In [0]:
from databricks.feature_engineering import FeatureEngineeringClient

fe = FeatureEngineeringClient()

df = fe.read_table(
  name=f'{catalog}.{db}.{src_table}_features'
)

We can handle our data normalization in one of two ways. We can either compute the data as it lands in the feature tables which we would normally do as part of the ingestion pipeline or we can late-stage process them as a wrapper function for the compiled model. There are benefits and drawbacks of both, but for this lab we'll be simulating pre-processing the features as though they were part of the ingestion pipeline.

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pandas as pd
from scipy.stats import boxcox, yeojohnson

# Convert PySpark DF to Pandas for Box-Cox Calculation
df_pd = df.select("yield_bbl", "precipitation", "temperature").toPandas()

# Apply Box-Cox transformation & store lambda values
df_pd["yield_bbl"], lambda_yield = boxcox(df_pd["yield_bbl"] + 1)  # Shift to avoid zero
df_pd["precipitation"], lambda_precip = boxcox(df_pd["precipitation"] + 1)
df_pd["temperature"], lambda_temp = yeojohnson(df_pd["temperature"])

# Convert back to Spark DF
df_transformed = spark.createDataFrame(pd.DataFrame(df_pd))

# Define schema for lambda values DataFrame
schema = StructType([
    StructField("feature_name", StringType(), True),
    StructField("lambda_value", DoubleType(), True)
])

# Convert numpy.float64 to native Python float
lambda_yield = float(lambda_yield)
lambda_precip = float(lambda_precip)
lambda_temp = float(lambda_temp)

# Add lambda values as feature metadata
df_lambdas = spark.createDataFrame([
    ("lambda_yield", lambda_yield),
    ("lambda_precipitation", lambda_precip),
    ("lambda_temp", lambda_temp)
], schema)

# Store lambda values in Delta table (feature metadata)
df_lambdas.write.mode("overwrite").format("delta").saveAsTable(f"{catalog}.{db}.{src_table}_lambdas")

print(f"Stored Box-Cox lambdas: {lambda_yield}, {lambda_precip}, {lambda_temp}")

In [0]:
from pyspark.sql.functions import col, log, when, lit
import math

# Load lambda values from feature store
df_lambdas = spark.read.table(f"{catalog}.{db}.{src_table}_lambdas")
lambda_dict = {row["feature_name"]: row["lambda_value"] for row in df_lambdas.collect()}

lambda_yield = lambda_dict["lambda_yield"]
lambda_precip = lambda_dict["lambda_precipitation"]
lambda_temp = lambda_dict["lambda_temp"]

# Define Box-Cox transformation function in PySpark
def boxcox_pyspark(column, lambda_value):
    return when(lambda_value == 0, log(col(column) + 1)).otherwise(
        ((col(column) + 1) ** lambda_value - 1) / lambda_value
    )

# Apply transformations in PySpark
df = df.withColumn("yield_bbl_transformed", boxcox_pyspark("yield_bbl", lit(lambda_yield)))
df = df.withColumn("precipitation_transformed", boxcox_pyspark("precipitation", lit(lambda_precip)))
df = df.withColumn("temperature_transformed", boxcox_pyspark("temperature", lit(lambda_temp)))

print("Box-Cox transformed features saved successfully.")

In [0]:
from databricks.feature_engineering import FeatureEngineeringClient

fe = FeatureEngineeringClient()

# Create feature table with `id` as the primary key.
customer_feature_table = fe.create_table(
  name=f'{catalog}.{db}.{src_table}_features_transformed',
  primary_keys=['id', 'date'],
  schema=df.schema,
  description='oil yield features - transformed',
  df = df,
  timeseries_columns='date'
)

In [0]:
import mlflow

# Set a named experiment
mlflow.set_experiment(f"/Users/{current_user}/Oil Extraction Production Forecasting")

# Start MLflow run
with mlflow.start_run(run_name=f"{src_table} BoxCox Transformation"):

    # Log transformation parameters
    mlflow.log_param("lambda_yield", lambda_yield)
    mlflow.log_param("lambda_precipitation", lambda_precip)
    mlflow.log_param("lambda_temp", lambda_temp)

    # Log feature table paths
    mlflow.log_param("transformed_feature_table", f"{catalog}.{db}.{src_table}_features_transformed")
    mlflow.log_param("lambda_values_table", f"{catalog}.{db}.{src_table}_lambdas")

    print("Logged Box-Cox transformation details to MLflow.")