# Predict Missing Wine Ratings Using Pyspark

In this demo, we train and use multiple models to impute missing values.  We start with a dataset of wines consisting of key features like acidity. Some of the records are missing feature values. In addition, wine quality scores are given to some but not all of the wines. 

We will build a workflow that trains a linear model to impute the missing features from the other features and then train a decision tree model to rate the un-rated wines using the imputed features. 

**Throughout this notebook, you'll see a decorator (`@aq.op`) above functions. This decorator allows Aqueduct to run your functions as a part of a workflow automatically.**

**To run this notebook, you will have to connect the following integrations:**
- A Databricks or Spark integration
- A data integration (ie Snowflake)
- S3 (must also be used as metadata store)

In [None]:
import aqueduct
from aqueduct.decorator import op, check, metric

# You can use `localhost` if you're running this notebook on the same machine as the server.
# If you're running your notebook on a separate machine from your
# Aqueduct server, change this to the address of your Aqueduct server.
address = "http://localhost:8080"

# If you're running youre notebook on a separate machine from your
# Aqueduct server, you will have to copy your API key here rather than
# using `get_apikey()`.
api_key = aqueduct.get_apikey()
client = aqueduct.Client(api_key, address)

In [None]:
aqueduct.global_config({'engine': '<spark or databricks integration>', 'lazy': True})

## Getting the Data 

In this demo, we will use the wine table in a snowflake data warehouse.

In [None]:
snowflake_warehouse = client.integration("<snowflake integration>")
wine_table = snowflake_warehouse.sql("select * from wine;")

## Cleaning the Data
There are some missing values in the residula sugar column that we need to clean.  Here we will replace the residual sugar with a value predicted by other columns

In [None]:
@op()
def fix_residual_sugar(df):
    """
    This function takes in a DataFrame representing wines data and cleans
    the DataFrame by replacing any missing values in the `residual_sugar`
    column with the values that would be predicted based on the other columns.
    """
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql.functions import col, when
    from pyspark.sql.types import FloatType

    # Convert residual_sugar back to numeric values with missing values as NaN
    df = df.withColumn("RESIDUAL_SUGAR", col("RESIDUAL_SUGAR").cast(FloatType()))
    print("missing residual sugar values:", df.filter(col("RESIDUAL_SUGAR").isNull()).count())

    # Filter out non-numeric columns
    numeric_cols = [col for col in df.columns if col not in ["QUALITY", "RESIDUAL_SUGAR", "ID"]
                    and df.select(col).dtypes[0][1] in ['double', 'float']]
    # Fit a LinearRegression model on the other numeric columns
    assembler = VectorAssembler(inputCols=numeric_cols, outputCol="FEATURES")
    df = assembler.transform(df)

    lr = LinearRegression(featuresCol="FEATURES", labelCol="RESIDUAL_SUGAR")
    training_df = df.filter(col("RESIDUAL_SUGAR").isNotNull())
    model = lr.fit(training_df)

    # Use the trained model to predict the missing values of `residual_sugar`
    predicted_sugar = model.transform(df.filter(col("RESIDUAL_SUGAR").isNull())).select("ID", "PREDICTION")

    # Replace the NaN values with the predicted values
    df = df.join(predicted_sugar, "ID", "left_outer").withColumnRenamed("PREDICTION", "PREDICTED_SUGAR")
    df = df.withColumn("RESIDUAL_SUGAR", 
                       when(col("RESIDUAL_SUGAR").isNull(), col("PREDICTED_SUGAR"))
                       .otherwise(col("RESIDUAL_SUGAR")))\
            .drop("PREDICTED_SUGAR")
    print("missing residual sugar values after prediction:", df.filter(col("RESIDUAL_SUGAR").isNull()).count())
    
    return df

In [None]:
wines_cleaned = fix_residual_sugar(wine_table)

## Tracking number of a Labeled wines 

As a sanity check, we want to make sure there are enough wines with quality scores to render reliable predictions.

In [None]:
@metric()
def get_number_labeled_wines(df):
    from pyspark.sql.functions import col, sqrt
    """
    This function takes in a DataFrame of wine data and returns
    how many wines are missing a quality value. This function is based
    on the assumption that missing values are encoded as `\\N` in the
    underlying DataFrame. The typical, non-null value is expected to
    be numeric.
    """
    return df.filter(col("QUALITY").isNotNull()).count()

In [None]:
num_labeled = get_number_labeled_wines(wines_cleaned)
num_labeled.bound(lower=1000, severity="error")

## Predicting the Quality of Wines

In the following operator we:
1. Fit a decision tree model to the wines that do have quality ratings
2. Make quality rating predictions for all the wines in the table.

In [None]:
@op()
def predict_quality(df):
    """
    This function takes in data about wines and fills in any missing
    values for the wine quality by building a machine learning model
    that predicts the quality of the wine itself. The expectation for
    this function is that many or most of the wines will already be labeled
    with their quality. This function uses the existing wine quality
    labels as guidance to train its model and fills in missing
    values with the model.

    Under the hood, this function uses sklearn's DecisionTreeRegressor
    model to predict the missing wines' qualities.
    """
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml.regression import DecisionTreeRegressor
    from pyspark.sql.functions import col
    from pyspark.sql.types import DoubleType

    # Convert the quality column to numerica and replace the "\N" with NaN
    df = df.withColumn("QUALITY", col("QUALITY").cast(DoubleType()))
    print("missing quality values:", df.filter(col("quality").isNull()).count())

    # Filter out non-numeric columns
    numeric_cols = [col for col in df.columns if col not in ["QUALITY", "ID"]
                    and df.select(col).dtypes[0][1] in ['double', 'float']]

    # Fit a model to the columns that are of numerical types but aren't the wine's ID or the quality that we're predicting
    assembler = VectorAssembler(inputCols=numeric_cols, outputCol="QUALITY_FEATURES")
    df = assembler.transform(df)
    dt = DecisionTreeRegressor(featuresCol="QUALITY_FEATURES", labelCol="QUALITY", maxDepth=3)
    training_df = df.filter(col("QUALITY").isNotNull())
    model = dt.fit(training_df)

    # Add a `pred_quality` column with the predicted quality for each wine
    df = model.transform(df).withColumnRenamed("PREDICTION", "PRED_QUALITY")
    
    return df.drop("QUALITY_FEATURES", "FEATURES")

In [None]:
predicted_quality = predict_quality(wines_cleaned)

## Checking Our Predictions

As a sanity check, we also verify that the wine quality predictions are reasonable. We'll do this by defining another `metric` on the `predicted_quality` table that calculates the RMSE of the predictions for the wines for which we have actual labels.

In [None]:
@metric()
def get_rmse(df):
    """
    This metric function takes in a DataFrame and assumes it has two columns,
    `quality` and `pred_quality`. It uses numpy to calculate the root mean squared
    error of the predicted quality column. It ignores any rows for which the quality
    column does not have a valid value.
    """
    from pyspark.sql.functions import col, sqrt
    from pyspark.sql.types import FloatType

    # Compute the RMSE between the "quality" and "pred_quality" columns
    residuals = df.select(sqrt(((col("QUALITY") - col("PRED_QUALITY")) ** 2)).alias("RESIDUAL")).dropna()
    rmse = residuals.agg({"RESIDUAL": "mean"}).withColumnRenamed("avg(RESIDUAL)", "rmse").select("rmse").first()[0]

    return rmse

In [None]:
rmse = get_rmse(predicted_quality)
rmse.bound(upper=1.0)
rmse.bound(upper=3.0, severity="error")

## Saving the Predicted Wine Quality



In [None]:
snowflake_warehouse.save(predicted_quality, table_name="pred_wine_quality", update_mode="replace")

## Schedule Workflow to Run Daily



In [None]:
from textwrap import dedent

client.publish_flow(
    "WineRating",
    dedent(
        """
        This workflow builds a model to predict missing ratings for wines 
        and then uses that model to fill in missing ratings.
        """
    ),
    schedule=aqueduct.daily(),
    artifacts=[predicted_quality, rmse, num_labeled],
)