# Automunge integration with Apache Spark

The Automunge library can be thought of as an extension of the Pandas dataframe library. As such, any external library with build in Pandas support should be capable of direct integration in a push-button manner. In this notebook we demonstrate the inttegratino of Automunge preprocessing into a Spark workflow.

Spark is a library from the Apache team that in some sense is capable of more sophisticated forms of data processing than Pandas from the standpoint that it can be applied to streaming data in a distributed environment. In this notebook we will provide demonstrations making use of PySpark which is their python based API.

The easiest way to integrate Pandas functions into a PySpark workflow is probably to do so in a maner that makes use of their native pyspark.pandas module, and in this notebook we will demonstrate using a pandas_on_spark.apply_batch() function on a Spark dataframe, which can be thought of as a method to apply pandas functions to a Spark streaming session in a manner that processes batches of data at a time.

This demonstration will not be a fully optimized implementation, for example we understand that Spark may benefit from additional forms of specification on returned data types etc, this notebook can be thought of a simple demonstration of integrate data preprocessing with the Automunge API directly into a Spark session as a kind of proof of concept.

In [25]:
#install Automunge if not yet installed:
!pip install Automunge



In [26]:
#install pyspark if not yet installed:
!pip install pyspark



In [27]:
#This notebook also assumes that java is installed
#if not ready to install on your local session
#this notebook can be run in an online Colab notebook


In [28]:
#imports

import pandas as pd

from Automunge import *
am = AutoMunge()

import pyspark.pandas as ps


In [29]:
#we'll also assume the availability of a dataset
#here we'll rely on Titanic set, a simple tabular benchmark
#Titanic set can be accessed from Kaggle among other places
#we'll assume the 'train.csv' and 'test.csv' files
#are in the same folder as this notebook
#and the conventions of train data set having addition label and ID columns

#titanic set
df_train = pd.read_csv('train.csv')
df_test = pd.read_csv('test.csv')

#Automunge convention to accomodate different/extra columns in train versus test set
labels_column = 'Survived'
trainID_column = 'PassengerId'

The workflow to prepare streams of data in spark would be as follows:
- identify the in memory scale training corpus to be used to establish a basis
- run that training corpus through automunge() as a pandas dataframe
- this will result in an encoded training dataframe and a populated postprocess_dict which can be used to prepare additional data
- can either then translate the training corpus to a spark dataframe if you like or otherwise just use to traing a supervised learning model in whatever convention is desired
- then for streams of additional test data channeled through spark can encode on a consistent basis (eg for inference) with apply_batch and postmunge
- where to pass parameters to postmunge we could create a wrapper function that accepts a dataframe as input and internally has a set of parameters to pass to postmunge
- note that a small deviation may be appropriate for postmunge, since apply_batch appears to rely on an assumptino that the  pandas function returns a dataframe, the sets of dataframes returned from postmunge() by default will need to be translated to a single dataframe
- we could either thus return a single dataframe from the wrapper function or alternatively could make use of the postmunge returnedsets parameter which adjusts the postmunge() function to return a few different scenarios of returned set configuration


In [None]:
#here we populate a postproccess_dict using a pandas df_train set as basis
#where postprocess_dict cabn be used as a key to consistently prepare additional data
#using postmunge(.) function

#although this operation could be sped up by omitting MLinfill
#the postmunge latency is less sensitive to ML infill since is just running inference

#the returned sets (like train and labels) could bhe used to train a model

train, train_ID, labels, \
val, val_ID, val_labels, \
test, test_ID, test_labels, \
postprocess_dict = \
am.automunge(df_train,
             labels_column = labels_column,
             trainID_column = trainID_column,
             MLinfill = True,
             shuffletrain = False,
             printstatus=False,
             )

train.head()

In [32]:
#in order to apply postmunge(.) to encode Spark dataframes
#one way to simplify the operation is to prepare a wrapper function
#which handles postmunge parameters internally
#and returns a single dataframe

# here is the postmunge wrapper function

#first here is a support function for data type conversions
def pd_dtype_convert(df):

    #since spark doesn't support uint data types
    #and since automunge defaults to uint for ordinal type encodings
    #we will convert any uint to int in the wrapper function

    for column in df.columns:

        if str(df[column].dtype) == 'uint8':
            df[column] = df[column].astype('int16')

        elif str(df[column].dtype) == 'uint16':
            df[column] = df[column].astype('int32')

        elif str(df[column].dtype) == 'uint32':
            df[column] = df[column].astype('int64')

    return df

#now here is the postmunge wrapper function for spark's apply_batch()
#where postmunge is used to encode additional data on a training data basis

def postmunge_wrapper_for_spark_ab(pandas_df):
  #returns a pandas dataframe encoded by postmunge
  #assumes a postprocess_dict is available in memory
  #(python allows you to access dictionaries in a function
  #even when not explicitly passed to the function)

  #here are the postmunge parameters
  #normally we would defer to defaults when not specified
  #however since this is for a wrapper function
  #let's go ahead an make explicit specifications

  testID_column = False
  pandasoutput = 'dataframe'
  printstatus = False #'summary'
  inplace = True #False
  dupl_rows = False
  TrainLabelFreqLevel = False
  featureeval = False
  traindata = False
  noise_augment = 0
  driftreport = False
  inversion = False
  # returnedsets = True #specified below
  shuffletrain = False
  entropy_seeds = False
  random_generator = False
  sampling_dict = False
  randomseed = False
  encrypt_key = False
  logger = {}

  #refer to readme docs on postmunge returnedsets parameter
  #for this scenario where postmunge returns a single dataframe
  test = \
  am.postmunge(postprocess_dict,
  pandas_df,
  returnedsets = False, #this resutls in postmunge returning single set
  testID_column = testID_column,
  pandasoutput = pandasoutput,
  printstatus = printstatus,
  inplace = inplace,
  dupl_rows = dupl_rows,
  TrainLabelFreqLevel = TrainLabelFreqLevel,
  featureeval = featureeval,
  traindata = traindata,
  noise_augment = noise_augment,
  driftreport = driftreport,
  inversion = inversion,
  # returnedsets = returnedsets,
  shuffletrain = shuffletrain,
  entropy_seeds = entropy_seeds,
  random_generator = random_generator,
  sampling_dict = sampling_dict,
  randomseed = randomseed,
  encrypt_key = encrypt_key,
  logger = logger,
  )

  test = pd_dtype_convert(test)

  return test


In [None]:
#then encoding Spark DataFrames
#can be demonstrated be creating a test dataset in spark
#we can use the same Titanic test set to demonstrate


from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("My Spark App").getOrCreate()

# Convert the Pandas DataFrame to a Spark DataFrame
# spark_df_test = spark.createDataFrame(df_test)
spark_df_test = spark.read.option("header", True).csv("test.csv")

spark_df_test = ps.DataFrame(spark_df_test)

print(spark_df_test.head())

test_sparkdf_encoded = spark_df_test.pandas_on_spark.apply_batch(postmunge_wrapper_for_spark_ab)

# print(test_sparkdf_encoded.head())

In [34]:
type(test_sparkdf_encoded)

pyspark.pandas.frame.DataFrame

In [36]:
test_sparkdf_encoded.head()

Unnamed: 0,Pclass_nmbr,Sex_bnry,Age_nmbr,SibSp_nmbr,Parch_nmbr,Ticket_nmbr,Fare_nmbr,Pclass_NArw,Name_NArw,Name_hash_0,Name_hash_1,Name_hash_2,Name_hash_3,Name_hash_4,Name_hash_5,Name_hash_6,Name_hash_7,Name_hash_8,Name_hash_9,Name_hash_10,Name_hash_11,Name_hash_12,Name_hash_13,Sex_NArw,Age_NArw,SibSp_NArw,Parch_NArw,Ticket_NArw,Fare_NArw,Cabin_NArw,Cabin_1010_0,Cabin_1010_1,Cabin_1010_2,Cabin_1010_3,Cabin_1010_4,Cabin_1010_5,Cabin_1010_6,Cabin_1010_7,Embarked_NArw,Embarked_1010_0,Embarked_1010_1
0,0.826913,1,0.330491,-0.474279,-0.473408,0.149684,-0.490508,0,0,344,1004,494,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0
1,0.826913,0,1.190988,0.43255,-0.473408,0.218302,-0.507194,0,0,437,538,494,71,581,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,1
2,-0.369157,1,2.223584,-0.474279,-0.473408,-0.042498,-0.453112,0,0,716,1004,585,504,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0
3,0.826913,1,-0.185806,-0.474279,-0.473408,0.116273,-0.473739,0,0,108,1004,826,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,1
4,0.826913,0,-0.530005,0.43255,0.767199,6.024011,-0.400792,0,0,706,538,849,292,519,921,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,1
