In [0]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.neighbors import KNeighborsRegressor
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np
import pandas as pd
from pyspark.sql.functions import sum as _sum
import matplotlib.pyplot as plt
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col, when
import seaborn as sns

In [0]:
%fs ls /mnt/Avocado_Project

In [0]:
%fs ls /mnt/Avocado_Project/Silver

In [0]:
df = spark.read.parquet("dbfs:/mnt/Avocado_Project/Silver/avocado_silver.parquet")
display(df)

DATA VISUALIZATION

In [0]:

volume_total_tipos = df.select(
    _sum("SmallHass").alias("SmallHass"),
    _sum("LargeHass").alias("LargeHass"),
    _sum("XLargeHass").alias("XLargeHass")
).toPandas().melt(var_name="Type", value_name="Volume")

plt.figure(figsize=(6,4))
sns.barplot(data=volume_total_tipos, x="Type", y="Volume")
plt.title("Total Volume by Avocado Type (Hass)")
plt.tight_layout()
plt.show()


In [0]:
from pyspark.sql.functions import sum as _sum
import matplotlib.pyplot as plt

# Agrupar volume total por região
volume_por_regiao = df.groupBy("region").agg(_sum("volume").alias("total_volume")).orderBy("total_volume", ascending=False)

# Convert to Pandas DataFrame for plotting
volume_por_regiao_pd = volume_por_regiao.toPandas()

# Opcional: limitar para as 10 regiões com maior volume
volume_top10 = volume_por_regiao_pd.head(10)

# Criar o gráfico
plt.figure(figsize=(8, 8))
plt.pie(volume_top10["total_volume"], labels=volume_top10["region"], autopct='%1.1f%%', startangle=140)
plt.title("Top 10 Regiões com Maior Volume Vendido de Abacates")
plt.axis('equal')  # Garantir o formato circular
plt.tight_layout()
plt.show()



In [0]:
%python
from pyspark.sql.functions import sum as _sum

volume_por_type = df.groupBy("type").agg(_sum("volume").alias("volume")).toPandas()

plt.figure(figsize=(6,4))
sns.barplot(data=volume_por_type, x="type", y="volume")
plt.title("Volume Total por Tipo (Orgânico x Convencional)")
plt.xlabel("Tipo")
plt.ylabel("Volume Total")
plt.tight_layout()
plt.show()

In [0]:
%python
preco_por_type = (
    df.groupby("type")
    .agg({"AveragePrice": "mean"})
    .withColumnRenamed("avg(AveragePrice)", "AveragePrice")
    .toPandas()
    .reset_index()
)

plt.figure(figsize=(6,4))
sns.barplot(data=preco_por_type, x="type", y="AveragePrice")
plt.title("Preço Médio por Tipo de Abacate")
plt.xlabel("Tipo")
plt.ylabel("Preço Médio (USD)")
plt.tight_layout()
plt.show()

MACHINE LEARNING MODEL

In [0]:
df_ml = spark.read.parquet("dbfs:/mnt/Avocado_Project/Silver/avocado_silver_ml_ready.parquet/")
display(df_ml.limit((10)))

In [0]:
df_ml.columns

In [0]:
df_ml.select("region_vec").distinct().count()


In [0]:
# Converter os vetores para arrays
df_ml = df_ml.withColumn("type_vec_arr", vector_to_array("type_vec")) \
             .withColumn("region_vec_arr", vector_to_array("region_vec"))


In [0]:
df_ml = df_ml.withColumn("type_bin", when(col("type_vec_arr")[0] == 1, 0).otherwise(1))


In [0]:
from pyspark.sql.functions import expr

# Encontrar o índice do 1 no vetor da região
df_ml = df_ml.withColumn("region_index", expr("array_position(region_vec_arr, 1) - 1"))

In [0]:
df_ml = df_ml.drop("type_vec", "region_vec", "type_vec_arr", "region_vec_arr")

In [0]:
display(df_ml.select("type_bin", "region_index").distinct().orderBy("region_index"))


In [0]:
df_ml = df_ml.filter(df_ml.region_index != -1)


In [0]:
df_pd = df_ml.toPandas()

# Preparar X e y
X = df_pd.drop(columns=["date", "AveragePrice"])  # ou selecione as colunas manualmente
y = df_pd["AveragePrice"]

In [0]:
# Colunas categóricas
categorical = ["region_index"]

# Colunas numéricas (menos o type_bin porque já está binário)
numeric = [
    "Volume", "SmallHass", "LargeHass", "XLargeHass",
    "TotalBags", "SmallBags", "LargeBags", "xlarge_bags", "Year"
]

# Binária
binary = ["type_bin"]


In [0]:
# Pré-processadores
numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="mean")),
    ("scaler", StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot", OneHotEncoder(handle_unknown="ignore"))
])

# Combinar
preprocessor = ColumnTransformer(transformers=[
    ("num", numeric_transformer, numeric),
    ("cat", categorical_transformer, categorical),
    ("bin", "passthrough", binary)
])

In [0]:
# Modelo base
model = RandomForestRegressor(random_state=42)

# Pipeline completo
pipeline = Pipeline(steps=[
    ("preprocessor", preprocessor),
    ("regressor", model)
])

In [0]:
# Separar treino/teste
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Modelos e grids
modelos_grids = {
    "DecisionTree": (DecisionTreeRegressor(), {
        "regressor__criterion": ["squared_error", "friedman_mse"],
        "regressor__splitter": ["best", "random"],
        "regressor__min_samples_split": [2, 5],
        "regressor__min_samples_leaf": [1, 5]
    }),
    "RandomForest": (RandomForestRegressor(), {
        "regressor__n_estimators": [50, 100],
        "regressor__max_depth": [None, 10],
        "regressor__min_samples_split": [2, 5]
    }),
    "KNN": (KNeighborsRegressor(), {
        "regressor__n_neighbors": [5, 10],
        "regressor__p": [1, 2]
    }),
    "LinearRegression": (LinearRegression(), {
        "regressor__fit_intercept": [True, False],
        "regressor__positive": [True, False]
    }),
    "SVM": (SVR(), {
        "regressor__C": [1.0, 10.0],
        "regressor__kernel": ["rbf", "linear"],
        "regressor__epsilon": [0.01, 0.1]
    }),
    "MLP": (MLPRegressor(max_iter=300), {
        "regressor__activation": ["relu", "logistic"],
        "regressor__solver": ["adam", "sgd"],
        "regressor__hidden_layer_sizes": [(50,), (100,)]
    })
}

# Executar grid search para todos os modelos
resultados = {}

In [0]:
for nome, (modelo, params) in modelos_grids.items():
    pipeline = Pipeline(steps=[
        ("preprocessor", preprocessor),
        ("regressor", modelo)
    ])
    grid = GridSearchCV(pipeline, params, cv=3, scoring="neg_root_mean_squared_error", n_jobs=-1)
    grid.fit(X_train, y_train)
    y_pred = grid.predict(X_test)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    r2 = r2_score(y_test, y_pred)
    resultados[nome] = {
        "Melhores Parâmetros": grid.best_params_,
        "RMSE": rmse,
        "R²": r2
    }

resultados_df = pd.DataFrame(resultados).T
display(resultados_df)


In [0]:
display(resultados_df)


In [0]:
resultados_df.sort_values(by="RMSE")  # quanto menor, melhor
# ou
resultados_df.sort_values(by="R²", ascending=False)  # quanto maior, melhor


In [0]:
resultados_df["RMSE"].plot(kind="barh", title="RMSE by Model", figsize=(8,5))
plt.xlabel("RMSE")
plt.show()

In [0]:
dados = {
    "Modelo": [
        "Random Forest", "KNN", "MLP (Neural Net)",
        "SVM (RBF)", "Decision Tree", "Regressão Linear"
    ],
    "RMSE": [0.144, 0.145, 0.195, 0.197, 0.171, 0.254],
    "R²": [0.851, 0.849, 0.726, 0.722, 0.789, 0.536]
}

df_modelos = pd.DataFrame(dados).sort_values("RMSE")
# Gráfico comparativo com RMSE e R² lado a lado
fig, ax = plt.subplots(1, 2, figsize=(14, 6))

# Gráfico RMSE
ax[0].barh(df_modelos["Modelo"], df_modelos["RMSE"], color="skyblue")
ax[0].set_title("Ranking de Modelos por RMSE")
ax[0].set_xlabel("RMSE (Erro Médio)")
ax[0].invert_yaxis()
ax[0].grid(True)

# Gráfico R²
df_modelos_r2 = df_modelos.sort_values("R²", ascending=False)
ax[1].barh(df_modelos_r2["Modelo"], df_modelos_r2["R²"], color="lightgreen")
ax[1].set_title("Ranking de Modelos por R²")
ax[1].set_xlabel("R² Score (quanto maior, melhor)")
ax[1].invert_yaxis()
ax[1].grid(True)

plt.tight_layout()
plt.show()


In [0]:
# Salvar versão pronta para machine learning (com colunas vetorizadas)
df_ml.write.mode("overwrite").parquet("dbfs:/mnt/Avocado_Project/Gold/avocado_gold.parquet")
