In [0]:
import time
import requests

import pandas as pd

import pyspark.sql.types as st
import pyspark.sql.functions as sf

In [0]:
flights = spark.sql("SELECT * FROM picpay_test_cae.default.airports")
display(flights)

id,year,month,day,dep_time,sched_dep_time,dep_delay,arr_time,sched_arr_time,arr_delay,carrier,flight,tailnum,origin,dest,air_time,distance,hour,minute,time_hour,name,_rescued_data
0,2013,1,1,517.0,515,2.0,830.0,819,11.0,UA,1545,N14228,EWR,IAH,227.0,1400,5,15,2013-01-01T05:00:00Z,United Air Lines Inc.,
1,2013,1,1,533.0,529,4.0,850.0,830,20.0,UA,1714,N24211,LGA,IAH,227.0,1416,5,29,2013-01-01T05:00:00Z,United Air Lines Inc.,
2,2013,1,1,542.0,540,2.0,923.0,850,33.0,AA,1141,N619AA,JFK,MIA,160.0,1089,5,40,2013-01-01T05:00:00Z,American Airlines Inc.,
3,2013,1,1,544.0,545,-1.0,1004.0,1022,-18.0,B6,725,N804JB,JFK,BQN,183.0,1576,5,45,2013-01-01T05:00:00Z,JetBlue Airways,
4,2013,1,1,554.0,600,-6.0,812.0,837,-25.0,DL,461,N668DN,LGA,ATL,116.0,762,6,0,2013-01-01T06:00:00Z,Delta Air Lines Inc.,
5,2013,1,1,554.0,558,-4.0,740.0,728,12.0,UA,1696,N39463,EWR,ORD,150.0,719,5,58,2013-01-01T05:00:00Z,United Air Lines Inc.,
6,2013,1,1,555.0,600,-5.0,913.0,854,19.0,B6,507,N516JB,EWR,FLL,158.0,1065,6,0,2013-01-01T06:00:00Z,JetBlue Airways,
7,2013,1,1,557.0,600,-3.0,709.0,723,-14.0,EV,5708,N829AS,LGA,IAD,53.0,229,6,0,2013-01-01T06:00:00Z,ExpressJet Airlines Inc.,
8,2013,1,1,557.0,600,-3.0,838.0,846,-8.0,B6,79,N593JB,JFK,MCO,140.0,944,6,0,2013-01-01T06:00:00Z,JetBlue Airways,
9,2013,1,1,558.0,600,-2.0,753.0,745,8.0,AA,301,N3ALAA,LGA,ORD,138.0,733,6,0,2013-01-01T06:00:00Z,American Airlines Inc.,


# Model


In [0]:
import mlflow
import mlflow.spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, abs
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

for column_name, dtype in flights.dtypes:
    if dtype in ['int', 'double']:
        flights = flights.fillna({column_name: 0})
    elif dtype == 'string':
        flights = flights.fillna({column_name: "missing"})

feature_cols = ['year', 'month', 'day', 'dep_time', 'sched_dep_time', 'dep_delay', 'air_time', 'distance', 'hour', 'minute']
target_col = 'arr_delay'

assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
flights = assembler.transform(flights).select('features', target_col)

train_data, test_data = flights.randomSplit([0.8, 0.2], seed=42)

In [0]:
rf = RandomForestRegressor(
    featuresCol='features',
    labelCol=target_col,
    numTrees=100,
    maxDepth=5
)

In [0]:
mlflow.set_experiment("/Users/fernandofelix@copin.ufcg.edu.br/flight_delay_prediction")

from mlflow.models.signature import infer_signature
import mlflow.pyfunc

with mlflow.start_run(run_name="RandomForestFlightDelayModel"):
    rf_model = rf.fit(train_data)

    predictions = rf_model.transform(test_data)

    evaluator = RegressionEvaluator(
        labelCol=target_col,
        predictionCol='prediction',
        metricName='rmse'
    )
    rmse = evaluator.evaluate(predictions)
    print(f"RMSE: {rmse:.2f}")
    mlflow.log_metric("rmse", rmse)

    sample_input = test_data.limit(5).toPandas()
    predictions = predictions.select("prediction").limit(5).toPandas()

    signature = infer_signature(sample_input, predictions)

    mlflow.spark.log_model(
        rf_model,
        "random_forest_flight_model",
        registered_model_name="FlightDelayRandomForestModel",
        signature=signature,
        await_registration_for=60,
    )

    print("Modelo registrado com sucesso no MLflow.")


RMSE: 21.52


2024/12/02 02:07:03 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
Registered model 'FlightDelayRandomForestModel' already exists. Creating a new version of this model...


Modelo registrado com sucesso no MLflow.


Created version '3' of model 'picpay_test_cae.default.flightdelayrandomforestmodel'.


In [0]:
! python --version

Python 3.12.3


In [0]:
!pip freeze

absl-py==1.0.0
accelerate==0.33.0
aiohttp==3.9.5
aiohttp-cors==0.7.0
aiosignal==1.2.0
alembic==1.13.3
annotated-types==0.7.0
anyio==4.2.0
argcomplete==3.5.1
argon2-cffi==21.3.0
argon2-cffi-bindings==21.2.0
arrow==1.2.3
astor==0.8.1
asttokens==2.0.5
astunparse==1.6.3
async-lru==2.0.4
attrs==23.1.0
audioread==3.0.1
azure-core==1.31.0
azure-cosmos==4.3.1
azure-identity==1.19.0
azure-storage-blob==12.23.0
azure-storage-file-datalake==12.17.0
Babel==2.11.0
backoff==2.2.1
bcrypt==3.2.0
beautifulsoup4==4.12.3
black==24.4.2
bleach==4.1.0
blinker==1.7.0
blis==0.7.11
boto3==1.34.69
botocore==1.34.69
Brotli==1.0.9
cachetools==5.3.3
catalogue==2.0.10
category-encoders==2.6.3
certifi==2024.6.2
cffi==1.16.0
chardet==4.0.0
charset-normalizer==2.0.4
circuitbreaker==2.0.0
click==8.1.7
cloudpathlib==0.19.0
cloudpickle==2.2.1
cmdstanpy==1.2.4
colorful==0.5.6
colorlog==6.8.2
comm==0.2.1
composer==0.24.1
confection==0.1.5
configparser==5.2.0
contourpy==1.2.0
coolname==2.2.0
cryptography==42.0.5
cycler==0.1

In [0]:
import pyspark
print(pyspark.__version__)

3.5.2


In [0]:
import mlflow
print(mlflow.__version__)

2.15.1


In [0]:
# Get Hadoop version
hadoop_version = spark.sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()

# Get Hive version
hive_version = spark.version if spark.conf.get("spark.sql.catalogImplementation") == "hive" else "Hive not configured"

print(hadoop_version)

3.3.6


## LOad


In [0]:
import mlflow
logged_model = 'runs:/07dd88439c0c47499ad34dce02d17e33/random_forest_flight_model'

# Load model
loaded_model = mlflow.spark.load_model(logged_model)

# Perform inference via model.transform()
# loaded_model.transform(data)

2024/12/02 04:38:45 INFO mlflow.spark: 'runs:/07dd88439c0c47499ad34dce02d17e33/random_forest_flight_model' resolved as 'dbfs:/databricks/mlflow-tracking/2579919712229027/07dd88439c0c47499ad34dce02d17e33/artifacts/random_forest_flight_model'
