In [0]:
%pip install databricks-feature-engineering
%restart_python 

In [0]:
dbutils.widgets.text("app_catalog_name", "", "app_catalog_name")
dbutils.widgets.text("app_schema_name", "", "app_schema_name")
dbutils.widgets.text("project_catalog_name", "", "project_catalog_name")
dbutils.widgets.text("project_schema_name", "", "project_schema_name")
dbutils.widgets.text("feature_lookup_id", "", "feature_lookup_id")
dbutils.widgets.text("eol_view", "", "eol_view")
dbutils.widgets.text("label", "", "label")



In [0]:
app_catalog_name = dbutils.widgets.get("app_catalog_name")
app_schema_name = dbutils.widgets.get("app_schema_name")
project_catalog_name = dbutils.widgets.get("project_catalog_name")
project_schema_name = dbutils.widgets.get("project_schema_name")
feature_lookup_id = int(dbutils.widgets.get("feature_lookup_id"))
eol_view = dbutils.widgets.get("eol_view")
label = dbutils.widgets.get("label")


In [0]:
import sys
import json
from typing import List, Dict, Any
from databricks.feature_engineering.entities.feature_lookup import FeatureLookup
from databricks.feature_engineering import FeatureEngineeringClient, FeatureFunction

fe = FeatureEngineeringClient()

In [0]:
def get_feature_lookup_by_id(feature_lookup_id: int):
    
    try:
        query = f"SELECT * FROM {app_catalog_name}.{app_schema_name}.feature_lookups WHERE id = {feature_lookup_id}"
        result = spark.sql(query)
        if result.count() == 0:
            print(f"No feature lookup found with ID {feature_lookup_id}")
            return None
        return result.first()
    except Exception as e:
        print(f"Error: {e}")
        return None

In [0]:

def generate_feature_lookups(feature_lookup_id: int):
    
    # Get the feature lookup from database
    feature_lookup = get_feature_lookup_by_id(feature_lookup_id)
    
    if feature_lookup is None:
        raise ValueError(f"No feature lookup found with ID {feature_lookup_id}")
    
    
    # Parse the features array
    features = feature_lookup['features']
    name = feature_lookup["name"]
    
    if not features:
        print("Warning: No features found in this feature lookup")
        return "feature_lookups = []"
    

    # Generate FeatureLookup objects
    feature_lookup_objects = []
    
    for feat in features:
        if isinstance(feat, str):
            try:
                # Try to parse as JSON first
                feat_dict = json.loads(feat)
            
            except json.JSONDecodeError:
                # Handle the case where feat is not a valid JSON string
                break;
        elif isinstance(feat, dict):
            feat_dict = feat
        else:
            print(f"Warning: Unexpected feature format: {feat}")
            break
        

        # Extract components
      
        table_name = feat_dict.get('table', '')
        feature_names = feat_dict.get('features', [])
        lookup_key = feat_dict.get('lookup_key')
        timestamp_key = feat_dict.get('timestamp_key')
        
        if not table_name:
            print(f"Warning: Skipping feature with no table name: {feat_dict}")
            continue
            
        if not lookup_key:
            print(f"Warning: No lookup key specified for table {table_name}")
            lookup_key = 'id'  # Default fallback
        
        table_name = feat_dict.get('table', '')
        feature_names = feat_dict.get('features', [])
        lookup_key = feat_dict.get('lookup_key')
        timestamp_key = feat_dict.get('timestamp_key', None)
        
        if not table_name:
            continue
            
        if not lookup_key:
           continue
        
        # Create FeatureLookup object
        if timestamp_key:
            feature_lookup_obj = FeatureLookup(
                table_name=table_name,
                feature_names=feature_names,
                lookup_key=lookup_key,
                timestamp_lookup_key=timestamp_key
            )
        else:
            feature_lookup_obj = FeatureLookup(
                table_name=table_name,
                feature_names=feature_names,
                lookup_key=lookup_key
            )
        
        feature_lookup_objects.append(feature_lookup_obj)
        return feature_lookup_objects, name 
   

In [0]:
import time

def create_table_with_epoch(df, split, catalog, schema, base_name):
    base_name = base_name.replace(' ', '_')
    epoch = int(time.time())
    train_table_name = f"{base_name}_train_{epoch}"
    eval_table_name = f"{base_name}_eval_{epoch}"
    train_df, val_df = df.randomSplit([1.0 - split, split], seed=42)
    train_df.write.saveAsTable(f"{catalog}.{schema}.{train_table_name}")
    val_df.write.saveAsTable(f"{catalog}.{schema}.{eval_table_name}")
    return train_table_name, eval_table_name
    

In [0]:
feature_lookups, name = generate_feature_lookups(feature_lookup_id)

In [0]:
df = spark.sql(f"SELECT * FROM {eol_view}")
print(df.columns)


In [0]:
training_set = fe.create_training_set(
  df=df,
  feature_lookups=feature_lookups,
  label=label,
  exclude_colunmns = df.columns
)

training_df = training_set.load_df()

In [0]:
train_table_name, eval_table_name = create_table_with_epoch(training_df, 0.2, project_catalog_name, project_schema_name, name)

In [0]:
dbutils.jobs.taskValues.set(key="train_table_name", value=train_table_name)
dbutils.jobs.taskValues.set(key="eval_table_name", value=eval_table_name)

In [0]:
import json

dbutils.notebook.exit(json.dumps({
    'train_table_name': train_table_name,
    'eval_table_name': eval_table_name
}))
