In [0]:
SCHEMA_NAME = "aranda_ml"
TABLE_NAME = "tmp_cat_dev.aranda.casos"
FEATURES_TABLE_NAME = "tmp_cat_dev.aranda_ml.features"

spark.sql(f"CREATE SCHEMA tmp_cat_dev.{SCHEMA_NAME}")
df = spark.read.table(TABLE_NAME)

In [0]:
from pyspark.sql.functions import monotonically_increasing_id
df = df.select('id','IncidenttypeID','IncidentSubject','IncidentDescription','IncidentCommentary','GroupID','CategoryID', 'ServiceID','SlaID','RegistrytypeID','PriorityID','CityID','DepartamentoID','IncidentAnsInTime')

In [0]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
categorical_columns = ['IncidenttypeID','GroupID','CategoryID','ServiceID','SlaID','RegistrytypeID','PriorityID','CityID','DepartamentoID','IncidentAnsInTime']
indexers = [StringIndexer(inputCol=col, outputCol=col + "_idx", handleInvalid="skip") for col in categorical_columns]
pipeline = Pipeline(stages=indexers)
model = pipeline.fit(df)
df = model.transform(df)

In [0]:
from pyspark.sql.functions import concat, col, lit, regexp_replace
df_concatenado = df.withColumn("Description", concat(col("IncidentSubject"), lit(" "),col("IncidentDescription"), lit(" "),col("IncidentCommentary")))
df_concatenado = df_concatenado.withColumn("Description", regexp_replace(col("Description"), "\\s{2,}", " "))
df_concatenado = df_concatenado.select('id','IncidenttypeID','Description','GroupID','CategoryID','ServiceID','SlaID','RegistrytypeID','PriorityID','CityID','DepartamentoID','IncidentAnsInTime','IncidenttypeID_idx','GroupID_idx','CategoryID_idx','ServiceID_idx','SlaID_idx','RegistrytypeID_idx','PriorityID_idx','CityID_idx','DepartamentoID_idx','IncidentAnsInTime_idx')

In [0]:
from databricks.feature_store import FeatureStoreClient
fs_client = FeatureStoreClient()
fs_client.create_table(
    name=FEATURES_TABLE_NAME,
    primary_keys=["id"],
    df=df_concatenado,
    description="Features de casos."
)
print(f"Tabla {FEATURES_TABLE_NAME} creada en el Feature Store.")