In [1]:
import os
import logging.config

import numpy as np
import pandas as pd

from typing import List

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import PipelineModel

from lightautoml.dataset.roles import CategoryRole
from lightautoml.dataset.roles import DatetimeRole
from lightautoml.dataset.roles import FoldsRole
from lightautoml.dataset.roles import NumericRole
from lightautoml.dataset.roles import TargetRole

from lightautoml.spark.dataset.base import SparkDataset
from lightautoml.spark.reader.base import SparkToSparkReader
from lightautoml.spark.tasks.base import SparkTask
from lightautoml.spark.pipelines.features.lgb_pipeline import SparkLGBSimpleFeatures

from lightautoml.spark.transformers.categorical import SparkOrdinalEncoderTransformer
from lightautoml.spark.transformers.datetime import SparkTimeToNumTransformer

from lightautoml.spark.utils import (log_exec_timer, 
                                     logging_config, 
                                     VERBOSE_LOGGING_FORMAT, 
                                     SparkDataFrame)

from pyspark.ml import PipelineModel

from pyspark.ml import Transformer

In [2]:
logging.config.dictConfig(logging_config(level=logging.INFO, log_filename='/tmp/lama.log'))
logging.basicConfig(level=logging.INFO, format=VERBOSE_LOGGING_FORMAT)
logger = logging.getLogger(__name__)

In [3]:
def get_spark_session():
    if os.environ.get("SCRIPT_ENV", None) == "cluster":
        return SparkSession.builder.getOrCreate()

    spark_sess = (
        SparkSession
        .builder
        .master("local[*]")
        .config("spark.jars", "../../jars/spark-lightautoml_2.12-0.1.jar")
        .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.9.5,org.apache.hadoop:hadoop-azure:3.3.2")
        .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
        .config("spark.sql.shuffle.partitions", "16")
        .config("spark.driver.memory", "57g")
        .config("spark.executor.memory", "57g")
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")
        .getOrCreate()
    )


    spark_sess.sparkContext.setCheckpointDir("/tmp/spark_checkpoints")

    spark_sess.sparkContext.setLogLevel("WARN")

    return spark_sess

In [4]:
spark = get_spark_session()

22/05/05 11:59:43 WARN Utils: Your hostname, desktop resolves to a loopback address: 127.0.1.1; using 192.168.0.104 instead (on interface wlp7s0)
22/05/05 11:59:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
https://mmlspark.azureedge.net/maven added as a remote repository with the name: repo-1


:: loading settings :: url = jar:file:/home/azamat/projects/LightAutoML/.venv/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/azamat/.ivy2/cache
The jars for the packages stored in: /home/azamat/.ivy2/jars
com.microsoft.azure#synapseml_2.12 added as a dependency
org.apache.hadoop#hadoop-azure added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3ab76869-b833-4881-a899-638526db5a9b;1.0
	confs: [default]
	found com.microsoft.azure#synapseml_2.12;0.9.5 in central
	found com.microsoft.azure#synapseml-core_2.12;0.9.5 in central
	found org.scalactic#scalactic_2.12;3.0.5 in central
	found org.scala-lang#scala-reflect;2.12.4 in central
	found io.spray#spray-json_2.12;1.3.2 in central
	found com.jcraft#jsch;0.1.54 in central
	found org.apache.httpcomponents#httpclient;4.5.6 in central
	found org.apache.httpcomponents#httpcore;4.4.10 in central
	found commons-logging#commons-logging;1.2 in central
	found commons-codec#commons-codec;1.10 in central
	found org.apache.httpcomponents#httpmime;4.5.6 in central
	found com.linkedin.isolation-forest#isolation-f

22/05/05 11:59:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Loading and preparing dataset

In [7]:
data = pd.read_csv(
    "../data/sampled_app_train.csv",
    usecols=[
        "TARGET",
        "NAME_CONTRACT_TYPE",
        "AMT_CREDIT",
        "NAME_TYPE_SUITE",
        "AMT_GOODS_PRICE",
        "DAYS_BIRTH",
        "DAYS_EMPLOYED",
    ],
)

In [8]:
data["BIRTH_DATE"] = (np.datetime64("2018-01-01") + data["DAYS_BIRTH"].astype(np.dtype("timedelta64[D]"))).astype(str)
data["EMP_DATE"] = (
    np.datetime64("2018-01-01") + np.clip(data["DAYS_EMPLOYED"], None, 0).astype(np.dtype("timedelta64[D]"))
).astype(str)
data.drop(["DAYS_BIRTH", "DAYS_EMPLOYED"], axis=1, inplace=True)

In [9]:
data["__fold__"] = np.random.randint(0, 5, len(data))

In [10]:
dataset_sdf = spark.createDataFrame(data)
dataset_sdf = dataset_sdf.select(
    '*',
    F.monotonically_increasing_id().alias(SparkDataset.ID_COLUMN)
).cache()
dataset_sdf.write.mode('overwrite').format('noop').save()
dataset_sdf = dataset_sdf.select(F.col("__fold__").cast("int").alias("__fold__"), *[c for c in dataset_sdf.columns if c != "__fold__"])

                                                                                

In [11]:
dataset_sdf.printSchema()

root
 |-- __fold__: integer (nullable = true)
 |-- TARGET: long (nullable = true)
 |-- NAME_CONTRACT_TYPE: string (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- NAME_TYPE_SUITE: string (nullable = true)
 |-- BIRTH_DATE: string (nullable = true)
 |-- EMP_DATE: string (nullable = true)
 |-- _id: long (nullable = false)



# Pipeline init and fit

In [12]:
task = SparkTask("binary")

check_roles = {
    TargetRole(): "TARGET",
    CategoryRole(dtype=str): ["NAME_CONTRACT_TYPE", "NAME_TYPE_SUITE"],
    NumericRole(np.float32): ["AMT_CREDIT", "AMT_GOODS_PRICE"],
    DatetimeRole(seasonality=["y", "m", "wd"]): ["BIRTH_DATE", "EMP_DATE"],
    FoldsRole(): "__fold__",
}

In [13]:
sreader = SparkToSparkReader(task=task, advanced_roles=False)
sdataset = sreader.fit_read(dataset_sdf, roles=check_roles)

2022-05-05 12:00:25,495 INFO base base.py:225 Reader starting fit_read
2022-05-05 12:00:25,500 INFO base base.py:226 [1mTrain data columns: ['__fold__', 'TARGET', 'NAME_CONTRACT_TYPE', 'AMT_CREDIT', 'AMT_GOODS_PRICE', 'NAME_TYPE_SUITE', 'BIRTH_DATE', 'EMP_DATE', '_id'][0m

2022-05-05 12:00:25,578 INFO utils utils.py:241 Cacher default_cacher (RDD Id: 14). Starting to materialize data.
2022-05-05 12:00:25,972 INFO utils utils.py:243 Cacher default_cacher (RDD Id: 21). Finished data materialization.
2022-05-05 12:00:28,420 INFO base base.py:375 Reader finished fit_read          


In [14]:
pipe = SparkLGBSimpleFeatures(cacher_key='cacher_key')

features pipeline ctr


In [15]:
pipe.input_roles = sdataset.roles
pipe.fit_transform(sdataset)

2022-05-05 12:00:31,243 INFO base base.py:148 SparkFeaturePipeline is started
2022-05-05 12:00:31,248 INFO base base.py:220 Number of layers in the current feature pipeline <lightautoml.spark.pipelines.features.lgb_pipeline.SparkLGBSimpleFeatures object at 0x7ff31c810490>: 1
2022-05-05 12:00:31,250 INFO base base.py:169 In transformer <class 'lightautoml.spark.transformers.base.SparkChangeRolesTransformer'>. Columns: ['AMT_CREDIT', 'AMT_GOODS_PRICE', 'BIRTH_DATE', 'EMP_DATE', 'NAME_CONTRACT_TYPE', 'NAME_TYPE_SUITE', 'TARGET', '__fold__', '_id']
2022-05-05 12:00:31,252 INFO base base.py:169 In transformer <class 'lightautoml.spark.transformers.datetime.SparkTimeToNumTransformer'>. Columns: ['AMT_CREDIT', 'AMT_GOODS_PRICE', 'BIRTH_DATE', 'EMP_DATE', 'NAME_CONTRACT_TYPE', 'NAME_TYPE_SUITE', 'TARGET', '__fold__', '_id']
2022-05-05 12:00:31,322 INFO categorical categorical.py:222 [<class 'lightautoml.spark.transformers.categorical.SparkOrdinalEncoderEstimator'> (ORD)] fit is started
2022-05

SparkDataset (DataFrame[_id: bigint, TARGET: int, __fold__: int, NAME_CONTRACT_TYPE: string, AMT_CREDIT: double, AMT_GOODS_PRICE: double, NAME_TYPE_SUITE: string, BIRTH_DATE: string, EMP_DATE: string, dtdiff__BIRTH_DATE: double, dtdiff__EMP_DATE: double, ord__NAME_CONTRACT_TYPE: double, ord__NAME_TYPE_SUITE: double])

# Save pipeline transformer

In [16]:
pipe.transformer.write().overwrite().save("file:///tmp/SparkLGBSimpleFeatures")

2022-05-05 12:00:34,464 INFO mlwriters mlwriters.py:51 Save SparkChangeRolesTransformer to 'file:///tmp/SparkLGBSimpleFeatures/stages/0_PipelineModel_08f61fb776d0/stages/0_PipelineModel_72e080f5b198/stages/0_PipelineModel_ee518362475b/stages/0_SparkChangeRolesTransformer_e6edd42c2ab2'
2022-05-05 12:00:35,706 INFO mlwriters mlwriters.py:51 Save SparkTimeToNumTransformer to 'file:///tmp/SparkLGBSimpleFeatures/stages/0_PipelineModel_08f61fb776d0/stages/0_PipelineModel_72e080f5b198/stages/0_PipelineModel_ee518362475b/stages/1_SparkTimeToNumTransformer_0235408d5ee1'
2022-05-05 12:00:36,292 INFO mlwriters mlwriters.py:143 Save SparkOrdinalEncoderTransformer to 'file:///tmp/SparkLGBSimpleFeatures/stages/0_PipelineModel_08f61fb776d0/stages/0_PipelineModel_72e080f5b198/stages/0_PipelineModel_ee518362475b/stages/2_SparkOrdinalEncoderTransformer_2baa9d837965'


# Loading pipeline model from file

In [17]:
pipeline_model = PipelineModel.load("file:///tmp/SparkLGBSimpleFeatures")



# Expanding nested stages

In [18]:
stages = []

def expand_pipeline(pipeline_model):
    for stage in pipeline_model.stages:
        if type(stage) is PipelineModel:
            expand_pipeline(stage)
        else:
            stages.append(stage)

In [19]:
expand_pipeline(pipeline_model)

In [20]:
stages

[SparkChangeRolesTransformer_e6edd42c2ab2,
 SparkTimeToNumTransformer_0235408d5ee1,
 SparkOrdinalEncoderTransformer_2baa9d837965,
 DropColumnsTransformer_645632f801cd,
 NoOpTransformer_1295e8289d2e]

In [21]:
model = PipelineModel(stages=stages)

# Attaching Debug Transformer after stages

In [22]:
class SparkDebugTransformer(Transformer):
    def __init__(self):
        super().__init__()
        
    def _transform(self, dataset: SparkDataFrame) -> SparkDataFrame:
        with log_exec_timer(f"{self.uid}"):
            dataset = dataset.cache()
            dataset.write.mode('overwrite').format('noop').save()
        return dataset

In [23]:
def attach_debug_transformer(pipeline: PipelineModel, after_transformers: List):
    stages = []
    for stage in pipeline.stages:
        stages.append(stage)
        if type(stage) in after_transformers:
            stages.append(SparkDebugTransformer())
            
    return PipelineModel(stages=stages)

In [24]:
model = attach_debug_transformer(model, [SparkOrdinalEncoderTransformer, SparkTimeToNumTransformer])

In [25]:
model.stages

[SparkChangeRolesTransformer_e6edd42c2ab2,
 SparkTimeToNumTransformer_0235408d5ee1,
 SparkDebugTransformer_6b62308ab2f5,
 SparkOrdinalEncoderTransformer_2baa9d837965,
 SparkDebugTransformer_4cb016c6be0f,
 DropColumnsTransformer_645632f801cd,
 NoOpTransformer_1295e8289d2e]

# Increasing dataset size

In [26]:
dataset_sdf.count()

10000

In [27]:
dataset_increase_factor = 30000
execs = 1
cores = 8

if dataset_increase_factor > 1:
    dataset_sdf = dataset_sdf.withColumn("new_col", F.explode(F.array(*[F.lit(0) for i in range(dataset_increase_factor)])))
    dataset_sdf = dataset_sdf.drop("new_col")
    dataset_sdf = dataset_sdf.select(
        *[c for c in dataset_sdf.columns if c != SparkDataset.ID_COLUMN],
        F.monotonically_increasing_id().alias(SparkDataset.ID_COLUMN),
    ).cache()
    dataset_sdf = dataset_sdf.repartition(execs * cores, SparkDataset.ID_COLUMN).cache()
    dataset_sdf = dataset_sdf.cache()
    dataset_sdf.write.mode('overwrite').format('noop').save()
    print(f"Duplicated dataset size: {dataset_sdf.count()}")

22/05/05 12:01:46 WARN CacheManager: Asked to cache already cached data.
                                                                                

Duplicated dataset size: 300000000


In [28]:
dataset_sdf.count()

300000000

# Pipeline inference

In [29]:
with log_exec_timer(f"infer on dataset with {dataset_sdf.count()} row"):
    preds = model.transform(dataset_sdf)
    preds = preds.cache()
    preds.write.mode('overwrite').format('noop').save()

2022-05-05 12:04:06,104 INFO base base.py:169 In transformer <class 'lightautoml.spark.transformers.base.SparkChangeRolesTransformer'>. Columns: ['AMT_CREDIT', 'AMT_GOODS_PRICE', 'BIRTH_DATE', 'EMP_DATE', 'NAME_CONTRACT_TYPE', 'NAME_TYPE_SUITE', 'TARGET', '__fold__', '_id']
2022-05-05 12:04:06,105 INFO base base.py:169 In transformer <class 'lightautoml.spark.transformers.datetime.SparkTimeToNumTransformer'>. Columns: ['AMT_CREDIT', 'AMT_GOODS_PRICE', 'BIRTH_DATE', 'EMP_DATE', 'NAME_CONTRACT_TYPE', 'NAME_TYPE_SUITE', 'TARGET', '__fold__', '_id']
2022-05-05 12:06:08,200 INFO utils utils.py:121 Exec time of SparkDebugTransformer_6b62308ab2f5 timer: 122.031888
2022-05-05 12:06:08,202 INFO base base.py:169 In transformer <class 'lightautoml.spark.transformers.categorical.SparkOrdinalEncoderTransformer'>. Columns: ['AMT_CREDIT', 'AMT_GOODS_PRICE', 'BIRTH_DATE', 'EMP_DATE', 'NAME_CONTRACT_TYPE', 'NAME_TYPE_SUITE', 'TARGET', '__fold__', '_id', 'dtdiff__BIRTH_DATE', 'dtdiff__EMP_DATE']
2022-05