# Tutorial: Sending PySpark DataFrame to Arize

In the current version of Arize Python SDK, only Pandas DataFrames are supported. To log Spark DataFrames, which have `rdds` as their underlying structure, we will use `mapInPandas` to log them to arize.

# Install Dependencies in Colab

In [None]:
!pip install -q pyspark
!pip install -q arize

# Parallelizing PySpark DataFrame
We first create a dummy PySpark DataFrame to send.


In [None]:
import pyspark
from pyspark.sql import Row, SparkSession
import pandas as pd
import uuid

spark = SparkSession.builder.getOrCreate()

# Read some dummy data for logging to Arize later
data = pd.read_csv(
    "https://storage.googleapis.com/arize-assets/fixtures/compare-model-a.csv"
)
df_pandas = data[
    ["loan_amount", "interest_rate", "grade", "prediction", "score"]
]

# create many rows with UUID
df_pandas = pd.concat([df_pandas] * 5)
df_pandas["prediction_id"] = [uuid.uuid4() for _ in range(len(df_pandas.index))]
df_pandas = df_pandas.astype(
    {"grade": "string", "prediction": "string", "prediction_id": "string"}
)

print("This is a pandas DataFrame:")
display(df_pandas)

# # Create PySpark dataframe unparallelized
df_spark = spark.createDataFrame(df_pandas)


# print("\nThis is the corresponding spark DataFrame")
df_spark.printSchema()

# Using `mapInPandas` to log each partition to Arize
`mapInPandas`, maps an iterator of batches in the current DataFrame using a Python native function that takes and outputs a pandas DataFrame, and returns the result as a DataFrame.

The function should take an iterator of pandas.DataFrames and return another iterator of pandas.DataFrames. All columns are passed together as an iterator of pandas.DataFrames to the function and the returned iterator of pandas.DataFrames are combined as a DataFrame. Each pandas.DataFrame size can be controlled by spark.sql.execution.arrow.maxRecordsPerBatch.

We should send `spark_df` to Arize with at least one of: `shap, prediction_labels, actual_labels`

## How To Log to Arize:

You will need to update the `API_KEY` and `SPACE_ID`
### Setting up Arize Client:
First, copy the Arize `API_KEY` and `SPACE_ID` from your Space Settings page linked below!




<img src="https://storage.googleapis.com/arize-assets/fixtures/copy-id-and-key.png" width="700">

In [None]:
from arize.pandas.logger import Client, Schema
from arize.utils.types import ModelTypes, Environments

SPACE_ID = "SPACE_ID"
API_KEY = "API_KEY"
arize_client = Client(space_id=SPACE_ID, api_key=API_KEY)

if SPACE_ID == "SPACE_ID" or API_KEY == "API_KEY":
    raise ValueError("❌ CHANGE SPACE_ID AND/OR API_KEY")

# Define Logging Function

In [None]:
import itertools
import uuid
import time


# Pandas transformation function returning pandas dataframe
def log_to_arize(dfs):
    for df in dfs:
        pandas_df_schema = Schema(
            prediction_id_column_name="prediction_id",  # REQUIRED
            prediction_label_column_name="prediction",
            prediction_score_column_name="score",
            feature_column_names=["loan_amount", "interest_rate", "grade"],
        )

        # Step 3: Log to arize, if response not 200, wait 10 second and try again, max retry 10 times
        max_tries = 10
        for i in range(max_tries):
            response = arize_client.log(
                dataframe=df,
                schema=pandas_df_schema,
                model_id="pyspark-loan-model",
                model_version="1.0",
                model_type=ModelTypes.SCORE_CATEGORICAL,
                environment=Environments.PRODUCTION,
            )

            if response.status_code != 200:
                time.sleep(10)
            else:
                yield df
                break

# Logging Example
Here we will take our spark dataframe and apply the `mapInPandas` method, with input args being our `log_to_arize` function, and specifying our spark shcema. We apply the `count` method to make enforce the entire spark dataframe is iterated over.

You should see your inference count that was sent to the Arize Platform

In [None]:
df_spark.mapInPandas(log_to_arize, df_spark.schema).count()

# **Overview**


Arize is an end-to-end ML observability and model monitoring platform. The platform is designed to help ML engineers and data science practitioners surface and fix issues with ML models in production faster with:
- Automated ML monitoring and model monitoring
- Workflows to troubleshoot model performance
- Real-time visualizations for model performance monitoring, data quality monitoring, and drift monitoring
- Model prediction cohort analysis
- Pre-deployment model validation
- Integrated model explainability

### Website
Visit Us At: https://arize.com/model-monitoring/

### Additional Resources
- [What is ML observability?](https://arize.com/what-is-ml-observability/)
- [Playbook to model monitoring in production](https://arize.com/the-playbook-to-monitor-your-models-performance-in-production/)
- [Using statistical distance metrics for ML monitoring and observability](https://arize.com/using-statistical-distance-metrics-for-machine-learning-observability/)
- [ML infrastructure tools for data preparation](https://arize.com/ml-infrastructure-tools-for-data-preparation/)
- [ML infrastructure tools for model building](https://arize.com/ml-infrastructure-tools-for-model-building/)
- [ML infrastructure tools for production](https://arize.com/ml-infrastructure-tools-for-production-part-1/)
- [ML infrastructure tools for model deployment and model serving](https://arize.com/ml-infrastructure-tools-for-production-part-2-model-deployment-and-serving/)
- [ML infrastructure tools for ML monitoring and observability](https://arize.com/ml-infrastructure-tools-ml-observability/)

Visit the [Arize Blog](https://arize.com/blog) and [Resource Center](https://arize.com/resource-hub/) for more resources on ML observability and model monitoring.
