## PySpark2PMML
### Python package for converting Apache Spark ML pipelines to PMML.
### https://github.com/jpmml/pyspark2pmml

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import SQLTransformer
from pyspark.sql.types import StringType
from pyspark2pmml import PMMLBuilder

In [2]:
# Specify additional jars for Spark jobs
spark_jars = "../jars/*"

spark_packages = [
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2',
    'org.apache.kafka:kafka-clients:3.2.3',
    'org.jpmml:pmml-sparkml:2.2.0',
    'org.jpmml:pmml-sparkml-lightgbm:2.2.0',
    'org.jpmml:pmml-sparkml-xgboost:2.2.0'
]

spark = SparkSession.builder \
    .appName("PMML for Spark-ML") \
    .master("local[*]") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.extraClassPath", spark_jars) \
    .config("spark.executor.extraClassPath", spark_jars) \
    .config("spark.jars.packages", ",".join(spark_packages)) \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

In [3]:
# Load dataset
df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .load("../data/audit.csv")

df = df.withColumn("Adjusted", df["Adjusted"].cast(StringType()))
df.show()

+---+----------+----------+---------+----------+---------+------+----------+-----+--------+
|Age|Employment| Education|  Marital|Occupation|   Income|Gender|Deductions|Hours|Adjusted|
+---+----------+----------+---------+----------+---------+------+----------+-----+--------+
| 38|   Private|   College|Unmarried|   Service|  81838.0|Female|     false|   72|       0|
| 35|   Private| Associate|   Absent| Transport|  72099.0|  Male|     false|   30|       0|
| 32|   Private|    HSgrad| Divorced|  Clerical|154676.74|  Male|     false|   40|       0|
| 45|   Private|  Bachelor|  Married|    Repair| 27743.82|  Male|     false|   55|       1|
| 60|   Private|   College|  Married| Executive|  7568.23|  Male|     false|   40|       1|
| 74|   Private|    HSgrad|  Married|   Service|  33144.4|  Male|     false|   30|       0|
| 43|   Private|  Bachelor|  Married| Executive| 43391.17|  Male|     false|   50|       1|
| 35|   Private|      Yr12|  Married| Machinist| 59906.65|  Male|     false|   4

In [4]:
statement = """
	SELECT *,
	ln(Income) AS Log_Income,
	CASE
		WHEN Employment = "Consultant" THEN "Private"
		WHEN Employment = "Private" THEN "Private"
		WHEN Employment = "PSFederal" THEN "Public"
		WHEN Employment = "PSLocal" THEN "Public"
		WHEN Employment = "PSState" THEN "Public"
		WHEN Employment = "SelfEmp" THEN "Private"
		WHEN Employment = "Volunteer" THEN "Other"
	END AS Revalue_Employment
	FROM __THIS__
	"""
sqlTransformer = SQLTransformer(statement = statement)

In [5]:
formula = "Adjusted ~ . - Income - Employment + Gender:Marital"
rFormula = RFormula(formula = formula)
classifier = LogisticRegression()
pipeline = Pipeline(stages = [sqlTransformer, rFormula, classifier])
pipelineModel = pipeline.fit(df)

In [6]:
pmmlBuilder = PMMLBuilder(spark.sparkContext, df, pipelineModel) \
    .verify(df.sample(False, 0.005)) \
    .putOption(classifier, "representation", "RegressionModel")

pmmlBuilder.buildFile("../data/LogisticRegressionAudit.pmml")


'/Users/Shared/dmmil/development/jupyter-app/jupyter-app/python-libararies/../data/LogisticRegressionAudit.pmml'

In [7]:
spark.stop()