In [1]:
from pyspark.ml.param.shared import (
HasInputCol, 
HasOutputCol, 
HasInputCols, 
HasOutputCols
)

from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

from pyspark.ml.param import Param, Params, TypeConverters

from pyspark import keyword_only

from pyspark.ml import Transformer
from pyspark.ml import Model
from pyspark.ml import Estimator

from pyspark.sql import DataFrame, Column
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark=(
    SparkSession.builder.
    appName("Recipes ML Model - Are you a dessert?").
    config("spark.driver.memory","8g").
    getOrCreate()
)

class ScalarNAFiller(
    Transformer, 
    HasInputCol, 
    HasOutputCol, 
    HasInputCols, 
    HasOutputCols,
    DefaultParamsReadable,
    DefaultParamsWritable,
):

    #estas madres existen!
    filler = Param(
        parent=Params._dummy(),
        name="filler",
        doc="Value we want to replace our null values with.",
        typeConverter=TypeConverters.toFloat,
    )

    @keyword_only    
    def __init__(
        self, 
        inputCol=None, 
        outputCol=None,
        inputCols=None,
        outputCols=None,
        filler=None
    ):
        super().__init__()
        self._setDefault(filler=None)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    #esta tiene que estar definida!
    #esta cosa regresa this
    @keyword_only
    def setParams(
        self, 
        inputCol=None, 
        outputCol=None,
        inputCols=None,
        outputCols=None,
        filler=None
    ):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setFiller(self, new_filler):
        return self.setParams(filler=new_filler)

    def setInputCol(self, new_inputCol):
        return self.setParams(inputCol=new_inputCol)

    def setOutputCol(self, new_outputCol):
        return self.setParams(outputCol=new_outputCol)

    def setInputCols(self, new_inputCols):
        return self.setParams(inputCols=new_inputCols)

    def setOutputCols(self, new_outputCols):
        return self.setParams(outputCols=new_outputCols)

    def getFiller(self):
        return self.getOrDefault(self.filler)

    def checkParams(self):
        if self.isSet("inputCol") and self.isSet("inputCols"):
            raise ValueError("Only one of InputCol or InputCols must be set.")

        if not self.isSet("inputCol") and not self.isSet("inputCols"):
            raise ValueError("One of InputCol or InputCols must be set.")
        
        if self.isSet("inputCols"):
            if len(self.getInputCols()) != len(self.getOutputCols()):
                raise ValueError(
                    "The length of InputCols does not match the length of outputCols."
                )
            
        
    def _transform(self, dataset: DataFrame):
        self.checkParams()
        

        input_columns = (
            [self.getInputCol()] 
            if self.isSet("inputCol") 
            else self.getInputCols()
        )
        
        output_columns = (
            [self.getOutputCol()] 
            if self.isSet("outputCol") 
            else self.getOutputCols()
        )

        answer = dataset

        if input_columns != output_columns:
            for i,o in zip(input_columns,output_columns):
                answer = answer.withColumn(o, F.col(i))

        print(answer is dataset)
        
        na_filler = self.getFiller()
        return answer.fillna(na_filler,subset=output_columns)


class _ExtremeValueCapperParams(
    HasInputCol,
    HasOutputCol,
    DefaultParamsReadable,
    DefaultParamsWritable,
):

    boundary = Param(
        parent=Params._dummy(),
        name="boundary",
        doc="Multiple of standard deviation for the cap and floor. Default = 0.0",
        typeConverter=TypeConverters.toFloat,
    )

    def __init__(self, *args):
        #print(type(super()))
        super().__init__(*args)
        self._setDefault(boundary=0.0)

    #se supone que no puedes hacer setboundary porque es el modelo
    #solo en el estimador puedes hacer setboundary
    def getBoundary(self):
        return self.getOrDefault(self.boundary)

class ExtremeValueCapperModel(Model, _ExtremeValueCapperParams):

    cap = Param(
        Params._dummy(),
        "cap",
        "Upper bound of the values inputCol can take."
        " Values Will be capped to this value.",
        typeConverter=TypeConverters.toFloat
    )

    floor = Param(
        Params._dummy(),
        "floor",
        "Lower bound of the values inputCol can take."
        " Values Will be floored to this value.",
        typeConverter=TypeConverters.toFloat
    )

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, cap=None, floor=None):
        super().__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, cap=None, floor=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)


    def setCap(self, new_cap):
        return self.setParams(cap=new_cap)

    def setFloor(self, new_floor):
        return self.setParams(floor=new_floor)

    def setInputCol(self, new_inputCol):
        return self.setParams(inputCol=new_inputCol)

    def setOutputCol(self, new_outputCol):
        return self.setParams(outputCol=new_outputCol)

    #el get cap esta en el mixin
    
    def getCap(self):
        return self.getOrDefault(self.cap)

    def getFloor(self):
        return self.getOrDefault(self.floor)

    
    def _transform(self, dataset):
        if not self.isSet("inputCol"):
            raise ValueError("No input column set for the ExtremeValueCapper model.")

        #print(self.getOutputCol())

        #antes son puros strings
        #aqui se crea la columna
        input_column = dataset[self.getInputCol()]
        output_column = self.getOutputCol()    

        cap_value = self.getOrDefault("cap")
        floor_value = self.getOrDefault("floor")

        return dataset.withColumn(
            output_column, 
            F.when(input_column > cap_value, cap_value).
            when(input_column < floor_value, floor_value).
            otherwise(input_column)
        )

class ExtremeValueCapper(Estimator, _ExtremeValueCapperParams):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, boundary=None):
        super().__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, boundary=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    #se hacen todos los sets y los gets the parametros de nosotros

    def setBoundary(self, new_boundary):
        self.setParams(boundary=new_boundary)

    def setInputCol(self, new_inputCol):
        return self.setParams(inputCol=new_inputCol)

    def setOutputCol(self, new_outputCol):
        return self.setParams(outputCol=new_outputCol)
        

    def _fit(self, dataset):
        input_column = self.getInputCol()
        output_column = self.getOutputCol()
        boundary = self.getBoundary()
        
        avg, stddev = dataset.agg(
            F.mean(input_column), 
            F.stddev(input_column)
        ).head()
    
        cap = avg + boundary * stddev
        floor = avg - boundary * stddev
    
        return ExtremeValueCapperModel(
            inputCol=input_column, 
            outputCol=output_column, 
            cap=cap, 
            floor=floor
        )

spark

In [2]:

from pyspark.ml import Pipeline
import pyspark.ml.feature as MF
from pyspark.ml.classification import LogisticRegression
from typing import Optional

food=spark.read.csv("epi_r.csv",inferSchema=True, header=True)

def sanitize_coloumn_name(name):

    answer = name
    
    for i,j in (
        (" ","_",),
        ("-","_",),
        ("/","_",),
        ("&","and",),
    ):
        answer = answer.replace(i,j)

    return "".join(
        [char for char in answer if char.isalpha() or char.isdigit() or char == "_"]
    )

food=food.toDF(*[sanitize_coloumn_name(name) for name in food.columns])


food = (
    food.where(
        (F.col("cakeweek").isin([0.0,1.0]) | F.col("cakeweek").isNull()) &
        (F.col("wasteless").isin([0.0,1.0]) | F.col("wasteless").isNull())
    )
)

IDENTIFIERS = ["title"]

TARGET = ["dessert"]

CONTINUOUS = ["rating","calories","protein","fat","sodium"]

BINARY = [x for x in food.columns if x not in IDENTIFIERS and x not in TARGET and x not in CONTINUOUS]

print(len(BINARY))

food = food.dropna(how="all",subset=[x for x in food.columns if x not in IDENTIFIERS])
food = food.dropna(subset=TARGET)

@F.udf(T.BooleanType())
def is_a_number(value: Optional[str]) -> bool:

    if not value:
        return True

    try:
        _ = float(value)
        
    except ValueError:
        return False

    return True

for column in ["rating", "calories"]:
    food = food.where(is_a_number(F.col(column)))
    #la va a reemplazar porq ese nombre ya existe!
    food = food.withColumn(column, F.col(column).cast(T.DoubleType()))

#ERASE TOO RARE FEATURES BINARY

#TODO:CHECK IF THIS WORKS WITHOUT FILLING NA BINARY
#food = food.fillna(value=0.0, subset=BINARY)

#esto sigue funcionando sin el filla

a=[F.sum(F.col(x)).alias(x) for x in BINARY]
b=food.select(*a).head().asDict()
num_rows = food.count()
too_rare_features = [k for k,v in b.items() if v < 10 or v > (num_rows - 10)]
BINARY = list(set(BINARY) - set(too_rare_features))

print(len(BINARY))

#RATIOS

food = food.withColumn("protein_ratio",F.col("protein") * 4 / F.col("calories"))
food = food.withColumn("fat_ratio",F.col("fat") * 9 / F.col("calories"))
food = food.fillna(0.0,subset=["protein_ratio","fat_ratio"])
CONTINUOUS += ["protein_ratio","fat_ratio"]


#CONTINUOUS

scalarBIN=ScalarNAFiller(inputCols=BINARY, outputCols=BINARY, filler=0.0)

evc_cal=ExtremeValueCapper(
    inputCol="calories",outputCol="calories", boundary=2.0
)

evc_pro=ExtremeValueCapper(
    inputCol="protein",outputCol="protein", boundary=2.0
)

evc_fat=ExtremeValueCapper(
    inputCol="fat",outputCol="fat", boundary=2.0
)

evc_sod=ExtremeValueCapper(
    inputCol="sodium",outputCol="sodium", boundary=2.0
)

imputer = MF.Imputer(
    strategy="mean",
    inputCols=["calories","protein","fat","sodium"],
    outputCols=["calories_i","protein_i","fat_i","sodium_i"]
)

continuous_assembler = MF.VectorAssembler(
    inputCols=["rating", "calories_i", "protein_i", "fat_i", "sodium_i"], 
    outputCol="continuous"
)

continuous_scaler = MF.MinMaxScaler(
    inputCol="continuous",
    outputCol="continuous_scaled"
)

#whole assembler

preml_assembler=MF.VectorAssembler(
    inputCols= BINARY + ["continuous_scaled"] + ["protein_ratio","fat_ratio"],
    outputCol= "features"
)

lr=LogisticRegression(
    featuresCol="features", 
    labelCol="dessert", 
    predictionCol="prediction"
)

food_pipeline = Pipeline(
    stages=[
        scalarBIN, #este es igual
        
        evc_cal,
        evc_fat,
        evc_pro,
        evc_sod,
        
        imputer,
        
        continuous_assembler,
        
        continuous_scaler,
        
        preml_assembler,
        
        lr
    ]
)



673
506


In [3]:
food_pipeline

Pipeline_435a9ac45deb

In [4]:
train,test=food.randomSplit([0.7,0.3],13)

#se cachea en memory!
train.cache()

#son minimos cuadrados con el probabilidad al final softmax!
food_pipeline_model=food_pipeline.fit(train)
results=food_pipeline_model.transform(test)


True
True


In [5]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

results
evaluator = BinaryClassificationEvaluator(
 labelCol="dessert",
 rawPredictionCol="rawPrediction",
 metricName="areaUnderROC",
)

accuracy = evaluator.evaluate(results)

In [6]:
print(f"Area under ROC = {accuracy} ")

Area under ROC = 0.9904326300584526 


In [7]:
food_pipeline_model.save("food")

In [10]:
from pyspark.ml.pipeline import PipelineModel

ale=PipelineModel.read().load("food")

In [12]:
ale.stages

[ScalarNAFiller_874f52567f8b,
 ExtremeValueCapperModel_5413e5c3d191,
 ExtremeValueCapperModel_97b15fb36990,
 ExtremeValueCapperModel_7bffc99319bc,
 ExtremeValueCapperModel_d2f14567598d,
 ImputerModel: uid=Imputer_f6c0dffb3820, strategy=mean, missingValue=NaN, numInputCols=4, numOutputCols=4,
 VectorAssembler_b93a0c0f2d30,
 MinMaxScalerModel: uid=MinMaxScaler_edde5f5690bc, numFeatures=5, min=0.0, max=1.0,
 VectorAssembler_cb2ba25b7574,
 LogisticRegressionModel: uid=LogisticRegression_de89d5965c91, numClasses=2, numFeatures=513]