# Training Demo
Demonstrates reading a dataset table (from Avro) and writing Cleanlab columns (both to Avro and to Parquet).

Cleanlab columns are split into separate files by analysis type (for this example, cleanlab and OOD)

In [14]:
%load_ext autoreload
%autoreload 2

import os
import pathlib
import shutil
from typing import Iterator

import pyspark.sql
import pyspark.sql.functions

import sys; sys.path.insert(0, "../src/")
from random_cleanlab_columns import generate_random_ood_columns, generate_random_cleanlab_columns
from sample_data import DATA_DIR, fetch_dataset

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.12:3.3.2 pyspark-shell'

spark = pyspark.sql.SparkSession.builder.appName(
    "dataset_view_demo"
).getOrCreate()

23/02/28 22:35:50 WARN Utils: Your hostname, Ryans-MacBook-Pro-3.local resolves to a loopback address: 127.0.0.1; using 192.168.0.4 instead (on interface en0)
23/02/28 22:35:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/ryansingman/.pyenv/versions/3.10.4/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/ryansingman/.ivy2/cache
The jars for the packages stored in: /Users/ryansingman/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7d91104e-6e21-4a94-b93b-3ad1e97db8c1;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.3.2 in central
	found org.tukaani#xz;1.9 in central
	found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 134ms :: artifacts dl 3ms
	:: modules in use:
	org.apache.spark#spark-avro_2.12;3.3.2 from central in [default]
	org.spark-project.spark#unused;1.0.0 from central in [default]
	org.tukaani#xz;1.9 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      d

23/02/28 22:35:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/28 22:35:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
# download dataset to use
DATASET_TO_USE = "Tweets-1M.csv"
DATASET_PATH = fetch_dataset(DATASET_TO_USE)

In [6]:
# load dataset to dataframe
dataset_df = spark.read.option("header", True).option("escape", '"').format("csv").load(str(DATASET_PATH))
dataset_df.show()

# save dataset to Avro
AVRO_DATASET_PATH = DATASET_PATH.with_suffix(".avro")
dataset_df.write.format("avro").save(str(AVRO_DATASET_PATH), mode="overwrite")

+--------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+
|            tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|           name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|      tweet_location|       user_timezone|
+--------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+
|                   0|          neutral|                         1.0|          null|                     null|Virgi

## Training Demo
The following demo shows how to run write Cleanlab columns to be persisted, both in Avro and Parquet format.

Note: the results of the demos are ugly because the datasets have flaws!

### Create Cleanlab/OOD columns
This shows how to generate cleanlab and OOD columns to PySpark tables.

In [15]:
# load spark table from avro
dataset_table = spark.read.format("avro").load(str(AVRO_DATASET_PATH))

id_column = "tweet_id"
label_column = "airline_sentiment"

# generate randomized cleanlab columns
cl_cols = generate_random_cleanlab_columns(dataset_table.alias("cl_cols_df"), id_column=id_column, label_column=label_column)
cl_cols.show()

# generate randomized OOD columns
ood_cols = generate_random_ood_columns(dataset_table.alias("ood_cols_df"), id_column=id_column)
ood_cols.show()


+--------+-----------------+------------------------+------------------------+---------------+----------------------+
|tweet_id|airline_sentiment|cleanlab_suggested_label|cleanlab_corrected_label|cleanlab_action|cleanlab_label_quality|
+--------+-----------------+------------------------+------------------------+---------------+----------------------+
|       0|          neutral|                 neutral|                    null|           null|    0.6876725562745707|
|       0|          neutral|                positive|                    null|           null|    0.5061493433191113|
|       0|          neutral|                 neutral|                    null|           null|    0.6403897667359648|
|       0|          neutral|                negative|                    null|           null|    0.5641236245015528|
|       0|          neutral|                negative|                    null|           null|    0.5832089912792499|
|       0|          neutral|                negative|   

### Save to Avro -- Batched vs Un-batched

In [8]:
avro_filename_template = str(AVRO_DATASET_PATH).replace(".avro", "-{filetype}-{save_mode}.avro")

def _create_avro_filename(filetype: str, save_mode: str) -> pathlib.Path:
    return pathlib.Path(avro_filename_template.format(filetype=filetype, save_mode=save_mode))

def save_df(df_table: pyspark.sql.DataFrame, filetype: str, save_mode: str):
    outfile = str(_create_avro_filename(filetype, save_mode))

    df_table.write.format("avro").save(outfile, mode=save_mode)

In [11]:
# saving to Avro in batches
num_batches = 10
save_mode = "append"

def _get_slices(df_table: pyspark.sql.DataFrame, num_slices: int) -> Iterator[pyspark.sql.DataFrame]:
    slice_size = df_table.count() // num_slices
    slice_indices = [n * slice_size for n in range(num_slices)] + [df_table.count() - 1]

    df_table = df_table.withColumn("__index", pyspark.sql.functions.monotonically_increasing_id())
    df_table = df_table.orderBy("__index")
    for start, end in zip(slice_indices[:-1], slice_indices[1:]):
        yield df_table.filter(
            start < pyspark.sql.functions.col("__index")
        ).filter(
            end >= pyspark.sql.functions.col("__index")
        )


# cleanup existing
if _create_avro_filename("cl_cols", save_mode).is_dir():
    shutil.rmtree(avro_filename_template.format(filetype="cl_cols", save_mode=save_mode))
if _create_avro_filename("ood_cols", save_mode).is_dir():
    shutil.rmtree(avro_filename_template.format(filetype="ood_cols", save_mode=save_mode))

# save cleanlab columns
for cl_cols_slice in _get_slices(cl_cols, num_batches):
    save_df(cl_cols_slice, filetype="cl_cols", save_mode=save_mode)

# save ood columns
for ood_cols_slice in _get_slices(ood_cols, num_batches):
    save_df(ood_cols_slice, filetype="ood_cols", save_mode=save_mode)


[Stage 158:>                                                        (0 + 1) / 1]

23/02/28 22:39:02 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:02 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:02 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:02 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:02 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:02 WARN AvroSerializer: Writing Avro files with non-nullable Avro

[Stage 163:>                                                        (0 + 1) / 1]

23/02/28 22:39:04 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:04 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:04 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:04 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:04 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:04 WARN AvroSerializer: Writing Avro files with non-nullable Avro

[Stage 168:>                                                        (0 + 1) / 1]

23/02/28 22:39:06 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:06 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:06 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:06 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:06 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:06 WARN AvroSerializer: Writing Avro files with non-nullable Avro

[Stage 173:>                                                        (0 + 1) / 1]

23/02/28 22:39:08 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:08 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:08 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:08 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:08 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:08 WARN AvroSerializer: Writing Avro files with non-nullable Avro

[Stage 178:>                                                        (0 + 1) / 1]

23/02/28 22:39:10 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:10 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:10 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:10 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:10 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:10 WARN AvroSerializer: Writing Avro files with non-nullable Avro

[Stage 183:>                                                        (0 + 1) / 1]

23/02/28 22:39:12 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:12 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:12 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:12 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:12 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:12 WARN AvroSerializer: Writing Avro files with non-nullable Avro

[Stage 188:>                                                        (0 + 1) / 1]

23/02/28 22:39:14 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:14 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:14 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:14 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:14 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:14 WARN AvroSerializer: Writing Avro files with non-nullable Avro

[Stage 193:>                                                        (0 + 1) / 1]

23/02/28 22:39:16 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:16 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:16 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:16 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:16 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:16 WARN AvroSerializer: Writing Avro files with non-nullable Avro

[Stage 198:>                                                        (0 + 1) / 1]

23/02/28 22:39:18 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:18 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:18 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:18 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:18 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:18 WARN AvroSerializer: Writing Avro files with non-nullable Avro

[Stage 203:>                                                        (0 + 1) / 1]

23/02/28 22:39:20 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:20 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:20 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:20 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:20 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:39:20 WARN AvroSerializer: Writing Avro files with non-nullable Avro

                                                                                

In [12]:
# save to Avro un-batched
save_mode = "overwrite"

save_df(cl_cols, filetype="cl_cols", save_mode=save_mode)
save_df(ood_cols, filetype="ood_cols", save_mode=save_mode)

23/02/28 22:40:17 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.
23/02/28 22:40:17 WARN AvroSerializer: Writing Avro files with non-nullable Avro schema and nullable catalyst schema will throw runtime exception if there is a record with null value.


                                                                                

### Save to Parquet

In [16]:
parquet_filename_template = str(AVRO_DATASET_PATH).replace(".avro", "-{filetype}.parquet")

cl_cols.write.parquet(parquet_filename_template.format(filetype="cl_cols"))
ood_cols.write.parquet(parquet_filename_template.format(filetype="ood_cols"))

                                                                                