In [2]:
#pip install mlflow

Collecting mlflow
  Obtaining dependency information for mlflow from https://files.pythonhosted.org/packages/d7/70/167ba0b7bf8912571af0f22711122ed5eb0c71c250e0d9e8c045b60b4a62/mlflow-2.11.1-py3-none-any.whl.metadata
  Downloading mlflow-2.11.1-py3-none-any.whl.metadata (15 kB)
Collecting entrypoints<1 (from mlflow)
  Obtaining dependency information for entrypoints<1 from https://files.pythonhosted.org/packages/35/a8/365059bbcd4572cbc41de17fd5b682be5868b218c3c5479071865cab9078/entrypoints-0.4-py3-none-any.whl.metadata
  Downloading entrypoints-0.4-py3-none-any.whl.metadata (2.6 kB)
Collecting gitpython<4,>=3.1.9 (from mlflow)
  Obtaining dependency information for gitpython<4,>=3.1.9 from https://files.pythonhosted.org/packages/67/c7/995360c87dd74e27539ccbfecddfb58e08f140d849fcd7f35d2ed1a5f80f/GitPython-3.1.42-py3-none-any.whl.metadata
  Downloading GitPython-3.1.42-py3-none-any.whl.metadata (12 kB)
Collecting sqlparse<1,>=0.4.0 (from mlflow)
  Obtaining dependency information for sqlp

In [None]:
# Ejemplo que se esta siguiendo: https://towardsdatascience.com/finding-donors-classification-project-with-pyspark-485fb3c94e5e

In [42]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import Row
from pyspark.sql.functions import col

""" ML Flow packages """
import mlflow
import mlflow.pyfunc
import mlflow.sklearn        
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec      
from mlflow.models import infer_signature

In [3]:
spark = SparkSession.builder \
    .appName("Spark Example DataFrame") \
    .getOrCreate()

24/03/08 12:58:09 WARN Utils: Your hostname, fmedrano resolves to a loopback address: 127.0.1.1; using 192.168.0.18 instead (on interface wlp0s20f3)
24/03/08 12:58:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/08 12:58:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [23]:
#cargamos el archivo 
file_location = "adult.csv"
file_type = "csv"
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
display(df)

DataFrame[Age: int, Workclass: string, Final Weight: int, Education: string, EducationNum: int, Marital Status: string, Occupation: string, Relationship: string, Race: string, Gender: string, Capital Gain: int, capital loss: int, Hours per Week: int, Native Country: string, Income: string]

In [24]:
df.show()

+---+-----------------+------------+-------------+------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|Age|        Workclass|Final Weight|    Education|EducationNum|      Marital Status|        Occupation|  Relationship|               Race| Gender|Capital Gain|capital loss|Hours per Week|Native Country|Income|
+---+-----------------+------------+-------------+------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov|       77516|    Bachelors|          13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc|       83311|    Bachelors|          13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|           0| 

In [28]:
# Convert the "Age" column from string to double
df = df.withColumn("Age", col("Age").cast("int"))
df = df.withColumn("EducationNum", col("EducationNum").cast("int"))
df = df.withColumn("Capital Gain", col("Capital Gain").cast("double"))
df = df.withColumn("Capital Loss", col("Capital Loss").cast("double"))
df = df.withColumn("Hours per Week", col("Hours per Week").cast("double"))



In [31]:
display(df.describe())

DataFrame[summary: string, Age: string, Workclass: string, Final Weight: string, Education: string, EducationNum: string, Marital Status: string, Occupation: string, Relationship: string, Race: string, Gender: string, Capital Gain: string, Capital Loss: string, Hours per Week: string, Native Country: string, Income: string]

In [32]:
# 1.Prepare the data
# Import pyspark functions
from pyspark.sql import functions as F
# Create add new column to the dataset
df = df.withColumn('>50K', F.when(df.Income == '<=50K', 0).otherwise(1))
# Drop the Income label
df = df.drop('Income')
# Show dataset's columns
df.columns

['Age',
 'Workclass',
 'Final Weight',
 'Education',
 'EducationNum',
 'Marital Status',
 'Occupation',
 'Relationship',
 'Race',
 'Gender',
 'Capital Gain',
 'Capital Loss',
 'Hours per Week',
 'Native Country',
 '>50K']

In [38]:
# Vectorizing Numerical Features and One-Hot Encodin Categorical Features 
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import (DecisionTreeClassifier, GBTClassifier, RandomForestClassifier, LogisticRegression)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Selecting categorical features
categorical_columns = [
 'Workclass',
 'Education',
 'Marital Status',
 'Occupation',
 'Relationship',
 'Race',
 'Gender',
 'Hours per Week',
 'Native Country',
 ]
# The index of string values multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns]
# The encode of indexed values multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers]

# Vectorizing encoded values
categorical_encoded = [encoder.getOutputCol() for encoder in encoders]
numerical_columns = ['Age', 'EducationNum', 'Capital Gain', 'Capital Loss']
inputcols = categorical_encoded + numerical_columns
assembler = VectorAssembler(inputCols=inputcols, outputCol="features")
pipeline = Pipeline(stages=indexers + encoders+[assembler])
model = pipeline.fit(df)
# Transform data
transformed = model.transform(df)
display(transformed)

# Transform data
final_data = transformed.select('features', '>50K')

DataFrame[Age: int, Workclass: string, Final Weight: int, Education: string, EducationNum: int, Marital Status: string, Occupation: string, Relationship: string, Race: string, Gender: string, Capital Gain: double, Capital Loss: double, Hours per Week: double, Native Country: string, >50K: int, Workclass_indexed: double, Education_indexed: double, Marital Status_indexed: double, Occupation_indexed: double, Relationship_indexed: double, Race_indexed: double, Gender_indexed: double, Hours per Week_indexed: double, Native Country_indexed: double, Workclass_indexed_encoded: vector, Education_indexed_encoded: vector, Marital Status_indexed_encoded: vector, Occupation_indexed_encoded: vector, Relationship_indexed_encoded: vector, Race_indexed_encoded: vector, Gender_indexed_encoded: vector, Hours per Week_indexed_encoded: vector, Native Country_indexed_encoded: vector, features: vector]

In [47]:
df_test = df_train.drop('>50K')

In [48]:
df_test.show()

+--------------------+
|            features|
+--------------------+
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
|(200,[0,9,25,32,4...|
+--------------------+
only showing top 20 rows



In [59]:
# Define MLflow tracking URI (optional)
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("MLFLow_Model")

<Experiment: artifact_location='mlflow-artifacts:/416436688303622593', creation_time=1709904237106, experiment_id='416436688303622593', last_update_time=1709904237106, lifecycle_stage='active', name='MLFLow_Model', tags={}>

In [60]:
# Model implementation
with mlflow.start_run() as run:
    rfc = RandomForestClassifier(numTrees=150, labelCol='>50K', featuresCol='features')
    df_train, df_test = final_data.randomSplit([0.8,0.2])
    rfc_model = rfc.fit(df_train)
    #model_signature = infer_signature(df_train.drop('>50K'), rfc_model.transform(df_test.limit(1)).select(">50K")) # coge la signatura como el tipo de output/inpt que espera
    mlflow.spark.log_model(rfc_model, 'model')
    print('Model logged in run {}'.format(run.info.run_uuid))
    mlflow.log_metric("foo", 1)

24/03/08 14:34:05 ERROR Instrumentation: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "mlflow-artifacts"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:673)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.super$save(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$4(Pipeline.scala:344)
	at org.apache.spark.ml.MLEvents.withSaveInst

Model logged in run 8f6ab2799c57408cb74703a9c1ed720a


In [None]:
#Evaluating model performance 
my_eval = BinaryClassificationEvaluator(labelCol='>50K')
