# Preprocessing

In [3]:
import pathlib
import prefect
import pyspark
import transformers

from IPython.core.display import display
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [9]:
def init_spark(*, spark_driver_memory: str) -> pyspark.sql.SparkSession:
    spark = (
        pyspark.sql.SparkSession.builder
        .master("local[*]")
        .appName("bod-seats")
        .config("spark.driver.memory", spark_driver_memory)
        .getOrCreate()
    )
    display(spark)
    return spark
def load(spark: pyspark.sql.SparkSession, *, path: pathlib.Path) -> pyspark.sql.DataFrame:
    df = spark.read.format("csv").load(str(path), header=True)
    df = df.drop("_c0")
    df = df.withColumnRenamed("BoD Total KEEP", "label")
    df = df.withColumn("label", col("label").cast(ByteType()))
    df = df.withColumnRenamed("Raw Text", "text")
    df.printSchema()
    df.show()
    df.summary().show()
    df = df.na.drop()
    df.summary().show()
    return df
def tokenize(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    tokenizer = transformers.AlbertTokenizerFast.from_pretrained("albert-base-v2")
    @udf(StructType([
        StructField("input_ids", ArrayType(ShortType(), False), False),
        StructField("attention_mask", ArrayType(ShortType(), False), False),
        StructField("length", ArrayType(IntegerType(), False), False),
        StructField("offset_mapping", ArrayType(ShortType(), False), False),
        StructField("special_tokens_mask", ArrayType(ShortType(), False), False),
        StructField("token_type_ids", ArrayType(ShortType(), False), False),
    ]))
    def _tokenize(text: str) -> dict[str, list[int]]:
        encoding = dict(**tokenizer(
            text,
            return_length=True,
            return_offsets_mapping=True,
            return_special_tokens_mask=True,
        ))
        return encoding
    df = df.withColumn("text", _tokenize("text"))
    df.printSchema()
    df.show()
    df.summary().show()
    return df
def save(df: pyspark.sql.DataFrame, *, path: pathlib.Path) -> pyspark.sql.DataFrame:
    df.write.format("parquet").mode("overwrite").save(str(path))
with prefect.Flow("bod-seats") as flow:
    spark = init_spark(spark_driver_memory="16g")
    df = load(spark, path=pathlib.Path().resolve() / "total.csv")
    df = tokenize(df)
    save(df, path=pathlib.Path().resolve() / "total.parquet")
flow.run()


root
 |-- id: string (nullable = true)
 |-- label: byte (nullable = true)
 |-- text: string (nullable = true)

+--------------------+-----+--------------------+
|                  id|label|                text|
+--------------------+-----+--------------------+
|CA_C1961614_02.26...|    0|"﻿ARTICLES OF INC...|
|CA_C1961614_03.12...|    0|"﻿A473381 filed'	...|
|CA_C1961614_04.24...|    0|"﻿A475165k / (pl1...|
|DE_3013451_03.05....|    0|﻿SECRETARY OF STA...|
|DE_3013451_04.28....|    0|"﻿SECRETARY OF ST...|
|DE_3013451_06.10....|    0|﻿SECRETARY OF STA...|
|DE_3013451_08.16....|    0|"﻿SECRETARY OF ST...|
|DE_3013451_12.21....|    0|"﻿SECRETARY OF ST...|
|CA_C2235922_05.25...|    0|"﻿2235922IsARTICL...|
|DE_3115145_07.10....|    0|﻿SECRETARY OF STA...|
|DE_3115145_07.10....|    0|﻿CERTIFICATE OF C...|
|DE_3115145_07.16....|    0|"﻿SECRETARY OF ST...|
|DE_3115145_10.22....|    0|﻿849W P ■ 02-/02C...|
|DE_3115145_11.27....|    0|"﻿Stafg of Delawa...|
|17445_955DREAMS_C...|    0|"﻿State of 

<Success: "All reference tasks succeeded.">