# Fugue (Spark, Ray, Dask) Integration

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/whylabs/whylogs/blob/mainline/python/examples/integrations/Fugue_Profiling.ipynb)


Hi! Perhaps you're already feeling confident with our library, but you really wish there was an easy way to plug our profiling into your existing **Spark, Dask or Ray** clusters or existing **Databricks, Coiled or Anyscale** platforms. Well, glad you've made it here, because this is what we are going to cover in this example notebook 😃

If you wish to have other insights on how to use whylogs, feel free to check our [other existing examples](https://github.com/whylabs/whylogs/tree/mainline/python/examples), as they might be extremely useful!

## Installing the extra dependency

As we want to enable users to have exactly what they need to use from whylogs, the `pyspark` integration comes as an extra dependency. In order to have it available, simply uncomment and run the following cell:

In [2]:
%pip install 'whylogs[fugue]' 'fugue[spark]'

| Run Whylogs on ... | Installation Command |
|:---|:---|
| Any Spark cluster (including Databricks Notebooks) | `pip install 'whylogs[fugue]' 'fugue[spark]'` |
| Databricks (remote access) | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[databricks]'` |
| Any Ray cluster (including Anyscale Notebooks) | `pip install 'whylogs[fugue]' 'fugue[ray]'` |
| Anyscale (remote access) | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[anyscale]'` |
| Any Dask cluster | `pip install 'whylogs[fugue]' 'fugue[dask]'` |
| Coiled  | `pip install 'whylogs[fugue]' 'fugue-cloudprovider[coiled]'` |

The following environment variable should NOT need to be set in your environment.

In [1]:
import os

os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python"

## Constructing a dataset

In [4]:
import pandas as pd
import numpy as np

n = 100
np.random.seed(0)
tdf = pd.DataFrame(
    dict(
        a=np.random.choice([1, 2, 3], n),
        b=np.random.choice(["a", "b"], n),
        c=np.random.random(n),
        d=np.random.choice(["xy", "z"], n),
    )
)
tdf.to_parquet("/tmp/test.parquet")
tdf

Unnamed: 0,a,b,c,d
0,1,a,0.533206,xy
1,2,b,0.230533,z
2,1,a,0.394869,z
3,2,b,0.618809,z
4,2,b,0.474868,xy
...,...,...,...,...
95,1,b,0.904425,xy
96,3,a,0.645785,z
97,1,a,0.324683,xy
98,2,b,0.519711,z


## Profiling using Whylogs + Fugue

The simplest way to use `profile` is equivalent to use `why.log(df).view()`

In [3]:
from whylogs.api.fugue import profile

profile(tdf).to_pandas()

Unnamed: 0_level_0,cardinality/est,cardinality/lower_1,cardinality/upper_1,counts/n,counts/null,distribution/max,distribution/mean,distribution/median,distribution/min,distribution/n,...,distribution/stddev,frequent_items/frequent_strings,ints/max,ints/min,type,types/boolean,types/fractional,types/integral,types/object,types/string
column,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
a,3.0,3.0,3.00015,100,0,3.0,1.88,2.0,1.0,100,...,0.80754,"[FrequentItem(value='1', est=39, upper=39, low...",3.0,1.0,SummaryType.COLUMN,0,0,100,0,0
b,2.0,2.0,2.0001,100,0,,0.0,,,0,...,0.0,"[FrequentItem(value='a', est=57, upper=57, low...",,,SummaryType.COLUMN,0,0,0,0,100
c,100.000025,100.0,100.005018,100,0,0.992396,0.499929,0.487838,5.5e-05,100,...,0.294085,,,,SummaryType.COLUMN,0,100,0,0,0
d,2.0,2.0,2.0001,100,0,,0.0,,,0,...,0.0,"[FrequentItem(value='xy', est=53, upper=53, lo...",,,SummaryType.COLUMN,0,0,0,0,100


Now assuming you want to use Spark to profile the dataset distributedly and assuming this is how you get a SparkSession:

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

22/09/18 21:30:50 WARN Utils: Your hostname, codespaces-5144a4 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
22/09/18 21:30:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/18 21:30:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


If you want to profile the pandas df on Spark:

In [6]:
profile(tdf, engine=spark)

                                                                                

<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7f593b4fc6d0>

If you want to profile a SparkDataFrame:

In [8]:
spark_df = spark.createDataFrame(tdf)
profile(spark_df, engine=spark)

<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7f593a7e79a0>

You can also directly profile a parquet file or a folder of parquet files locally or on the cloud (the file will be loaded distributedly):

In [9]:
profile("/tmp/test.parquet", engine=spark)

                                                                                

<whylogs.core.view.dataset_profile_view.DatasetProfileView at 0x7f593a789030>

## WIP!!!