In [1]:
import os
import sys

from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo

from importlib.util import find_spec

if find_spec("pyspark") is None:
    # running a notebook on a driver?

    os.environ["SPARK_HOME"] = "/usr/share/spark-3.2"
    os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
    
    from glob import glob

    spark_python = os.path.join(os.environ["SPARK_HOME"], "python")
    py4j = glob(os.path.join(spark_python, "lib", "py4j-*.zip"))[0]
    sys.path[:0] = sys_path = [spark_python, py4j]


In [2]:
# %%
import pyspark
import pyspark.sql
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext, SparkSession, DataFrame
import pyspark.sql.functions as F

from pyspark.sql.window import Window
from pyspark.sql.functions import col, udf, concat, lit,broadcast
from pyspark.sql.types import IntegerType, FloatType, StringType, ArrayType, StructType, DoubleType, StructField,DateType

In [3]:
import os
import sys
import datetime
from pathlib import Path
import subprocess

from pyspark import SparkConf
from pyspark.sql import SQLContext, SparkSession
import pyspark.sql.functions as F
from pyspark.ml.classification import GBTClassifier
from pyspark.sql.functions import col, udf, lit
from pyspark.ml.functions import vector_to_array
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
import numpy as np
import config as config

In [4]:
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)

In [5]:
def get_spark_session(**kwargs) -> SparkSession:
    conf = (
        SparkConf()
        .setAppName(kwargs.get("app_name", "test"))
        .set("spark.executor.memory", kwargs.get("executor_memory", "8g"))
        .set("spark.executor.memoryOverhead", "24g")
        .set("spark.driver.memory", kwargs.get("driver_memory", "16g"))
        .set("spark.driver.maxResultSize", kwargs.get("max_result_size", "25g"))
        .set("spark.executor.instances", kwargs.get("num_executors", "128"))
        .set("spark.executor.cores", kwargs.get("num_cores", "4"))
        .set("spark.sql.crossJoin.enabled", True)
        .set("spark.network.timeout", kwargs.get("timeout", "3600s"))
        .set("spark.executor.heartbeatInterval", kwargs.get("heartbeat", "16s"))
        .set("spark.sql.shuffle.partitions", 4000)
        .set("spark.dynamicAllocation.enabled", False)
        .set("spark.yarn.queue", kwargs.get("spark_yarn_queue", "mkplalgo-prod"))
        .set("spark.default.parallelism", "2000")
        .set("spark.sql.execution.arrow.pyspark.enabled", True)
        .set("spark.sql.execution.arrow.pyspark.fallback.enabled", False)
        .set("spark.yarn.priority", "60")
        .set("spark.kryoserializer.buffer.max", "1g")
        .set("spark.files.overwrite", "true")
        .set("spark.sql.broadcastTimeout", 300000)
        .set("spark.rpc.message.maxSize", "1024")
        .set("spark.memory.fraction", "0.8")
        .set("spark.driver.cores", "3")
        .set("spark.yarn.am.cores", "3")
        
        .set("spark.sql.sources.parallelPartitionDiscovery.threshold", "8")
        .set("spark.sql.sources.parallelPartitionDiscovery.parallelism", "2000")
        
        .set("spark.sql.hive.filesourcePartitionFileCacheSize", "1048576000")
        
        # .set("spark.yarn.dist.archives", "hdfs://R2/projects/regds_seller/spark_virtual_env/pyspark_dl_conda_env.tar.gz#environment")
        .set("spark.yarn.dist.archives", "/ldap_home/boyang.yue/py310_conda_env.tar.gz#environment")
        
        
        
        .set("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.10.2")
        # .set("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
    )
    spark_session = (
        SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
    )
    return spark_session

In [6]:
spark = get_spark_session(
    app_name=f"test_lightgbm_with_pyspark",
    executor_memory="16g",
    driver_memory="16g",
    num_cores=2,
    num_executors=100,
    spark_yarn_queue="mkplalgo-dev"
)

sc = spark.sparkContext
print(f"applicationId: {sc.applicationId}")

:: loading settings :: url = jar:file:/opt/spark-3.2.1-sdi-041-bin-3.3.sdi-043/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /ldap_home/boyang.yue/.ivy2/cache
The jars for the packages stored in: /ldap_home/boyang.yue/.ivy2/jars
com.microsoft.azure#synapseml_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-eac45bce-4674-4923-88ea-99dbda78880f;1.0
	confs: [default]
	found com.microsoft.azure#synapseml_2.12;0.10.2 in central
	found com.microsoft.azure#synapseml-core_2.12;0.10.2 in central
	found org.scalactic#scalactic_2.12;3.2.14 in central
	found org.scala-lang#scala-reflect;2.12.15 in central
	found io.spray#spray-json_2.12;1.3.5 in central
	found com.jcraft#jsch;0.1.54 in central
	found org.apache.httpcomponents.client5#httpclient5;5.1.3 in central
	found org.apache.httpcomponents.core5#httpcore5;5.1.3 in central
	found org.apache.httpcomponents.core5#httpcore5-h2;5.1.3 in central
	found org.slf4j#slf4j-api;1.7.25 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpmime;4.5.13 in central
	fo

applicationId: application_1678345849510_4508796


23/03/31 16:47:25 WARN TableMapping: net.topology.table.file.name not configured. 
23/03/31 16:47:25 WARN TableMapping: Failed to read topology table. /default-rack will be used for all nodes.


In [7]:
import synapse.ml

In [8]:
print(f"applicationId: {sc.applicationId}")

applicationId: application_1678345849510_4508796


In [9]:
import config as config

In [10]:

def check_hadoop_path(path):
    process = subprocess.Popen(['/usr/share/hadoop-client/bin/hadoop', 'fs', '-find', path])
    status_code = process.wait()
    if status_code == 0:
        return True
    else:
        return False

def findValidPath(grass_date,grass_region,basePath):
    i = 0
    while (i<=30):
        date = start_date_ndays(i, grass_date)
        user_sequence_path = basePath + f"grass_date={date}/grass_region={grass_region}"
        print(user_sequence_path)
        if check_hadoop_path(user_sequence_path+'/_SUCCESS'):
            return user_sequence_path
        i += 1
    return
def start_date_ndays(n, p_date):
    res = datetime.datetime.strptime(
        p_date, '%Y-%m-%d') - datetime.timedelta(days=n)
    res = res.strftime("%Y-%m-%d")
    return res
def generateRelevance(P,c):
        rand = np.random.random()
        if rand<=P:
            return 1
        return max(0,c)
generateRelevance = udf(generateRelevance, IntegerType())


def dataProcessing(df,features):
    vecAssembler = VectorAssembler(outputCol="features")
    vecAssembler.setInputCols(features)
    df = vecAssembler.transform(df)
    df = df.drop(*features)
    return df

In [11]:

grass_region = 'ID'
grass_date = '2023-03-26'

In [12]:
id_features = config.id_features
features = config.features
label = config.label

train_data_path = findValidPath(grass_date,grass_region,config.train_data_path)
save_latest_path = config.save_latest_path+f"grass_region={grass_region}"
save_old_path = config.save_old_path+f"grass_date={grass_date}/grass_region={grass_region}"
# model parameters
position_limit = config.position_limit
iteration=config.iteration
num_tree=config.num_tree
gamma_initial=config.gamma_initial
max_depth=config.max_depth

sample_ratio = config.sample_ratio

hdfs://R2/projects/regds_seller/bundle_deal/PDP/train_data/grass_date=2023-03-26/grass_region=ID
hdfs://R2/projects/regds_seller/bundle_deal/PDP/train_data/grass_date=2023-03-26/grass_region=ID/_SUCCESS


In [39]:
train_data_path

'hdfs://R2/projects/regds_seller/bundle_deal/PDP/train_data/grass_date=2023-03-26/grass_region=ID'

In [13]:
df = spark.read.parquet(train_data_path)
df = df.withColumn('pos_id',col("pos_id").cast("int"))\
        .where((col("pos_id")<=position_limit)&(col("pos_id")>=0))\
        .select(features+label+id_features)\
        .sample(sample_ratio[grass_region])

df = dataProcessing(df,features)


df = df.withColumn("gamma0",lit(gamma_initial))\
        .withColumn("theta0",1/(1+F.log(col("pos_id")+1)))

                                                                                

In [14]:
df = df.repartition(2000)

In [15]:
from synapse.ml.core.platform import *

from synapse.ml.core.platform import materializing_display as display

In [16]:
from synapse.ml.lightgbm import LightGBMClassifier

In [27]:
# print(LightGBMClassifier.__doc__)

In [14]:
# df.count()

                                                                                

32204653

23/03/31 14:34:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...


In [17]:
for i in range(iteration):
    # print(i)
    df = df.withColumn("P_E1_R0_C0",col(f"theta{i}")*(1-col(f"gamma{i}"))/(1-col(f"theta{i}")*col(f"gamma{i}")))\
            .withColumn("P_E0_R1_C0",(1-col(f"theta{i}"))*col(f"gamma{i}")/(1-col(f"theta{i}")*col(f"gamma{i}")))

    df = df.withColumn("Relevance", generateRelevance(col("P_E0_R1_C0"),col("click_label")))
    
    
    
    # gbt = GBTClassifier(maxIter=num_tree, maxDepth=max_depth, labelCol = 'Relevance')
    # gbtModel = Pipeline(stages=[gbt]).fit(df)
    gbt = LightGBMClassifier(numIterations=num_tree, maxDepth=max_depth, labelCol="Relevance")
    gbtModel = gbt.fit(df)
    df = gbtModel.transform(df)
    
    
    
    df_theta = df.groupby("pos_id")\
                    .agg(
                    (F.sum(col("click_label")+(1-col("click_label"))*col("P_E1_R0_C0"))/F.count("pos_id")).alias(f"theta{i+1}")
                    )
    theta = df_theta.toPandas()
    theta_dict = dict(zip(theta['pos_id'],theta[f'theta{i+1}']))
    
    def updateTheta(pos_id):
        return theta_dict[pos_id]
    updateTheta = udf(updateTheta, FloatType())

    
    df = df.withColumn(f"theta{i+1}",updateTheta("pos_id"))\
            .withColumn(f"gamma{i+1}",vector_to_array("probability")[1])\
            .drop("rawPrediction","probability","prediction")

23/03/31 16:50:37 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...
Traceback (most recent call last):
  File "/usr/share/spark-3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/share/spark-3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 475, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/ldap_home/boyang.yue/miniconda3/envs/py310_conda_env/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
[Stage 33:>                                                     (0 + 200) / 200]

KeyboardInterrupt: 

In [18]:
df_theta = df_theta.withColumnRenamed(f"theta{iteration}","theta")\
                    .coalesce(1)
df_theta.cache()

DataFrame[pos_id: int, theta: double]

In [19]:
df_theta.count()

                                                                                

101

In [21]:
df_theta.orderBy('pos_id').toPandas()

Unnamed: 0,pos_id,theta
0,0,1.0
1,1,0.88687
2,2,0.833587
3,3,0.77806
4,4,0.540496
5,5,0.508217
6,6,0.457045
7,7,0.446989
8,8,0.426251
9,9,0.42172


In [24]:
df.count()

                                                                                

18552338

In [26]:
df.limit(5).show()



+-----------+---------+-----------+------+--------------------+------+-------------------+-------------------+--------------------+---------+----------+-------------------+----------+-------------------+----------+-------------------+----------+-------------------+----------+-------------------+----------+-------------------+----------+-------------------+----------+-------------------+----------+-------------------+----------+-------------------+
|click_label|  user_id|    item_id|pos_id|            features|gamma0|             theta0|         P_E1_R0_C0|          P_E0_R1_C0|Relevance|    theta1|             gamma1|    theta2|             gamma2|    theta3|             gamma3|    theta4|             gamma4|    theta5|             gamma5|    theta6|             gamma6|    theta7|             gamma7|    theta8|             gamma8|    theta9|             gamma9|   theta10|            gamma10|
+-----------+---------+-----------+------+--------------------+------+-------------------+------

                                                                                



In [22]:
save_latest_path

'hdfs://R2/projects/regds_seller/bundle_deal/PDP/position_bias_estimation/latest/grass_region=SG'

In [23]:
spark.read.parquet('hdfs://R2/projects/regds_seller/bundle_deal/PDP/position_bias_estimation/latest/grass_region=SG').orderBy('pos_id').toPandas()

                                                                                

Unnamed: 0,pos_id,theta
0,0,1.0
1,1,0.869443
2,2,0.814692
3,3,0.759742
4,4,0.52255
5,5,0.493076
6,6,0.444317
7,7,0.434559
8,8,0.414384
9,9,0.41444


In [29]:
old_result = spark.read.parquet('hdfs://R2/projects/regds_seller/bundle_deal/PDP/position_bias_estimation/latest/grass_region=SG')

In [30]:
new_result = df_theta

In [31]:
old_result.createOrReplaceTempView('old_tab')

23/03/31 16:13:04 WARN TrackerClient: No usable SQL lineage information found, no need to lineage data to EDA.


In [32]:
new_result.createOrReplaceTempView('new_tab')

23/03/31 16:13:14 WARN TrackerClient: No usable SQL lineage information found, no need to lineage data to EDA.


In [35]:
spark.sql("""
    select
        o.pos_id as pos_id
        , o.theta as old_theta
        , n.theta as new_theta
    from
        old_tab o
    inner join
        new_tab n
    on
        o.pos_id = n.pos_id
""").orderBy('pos_id').coalesce(1).write.saveAsTable('dev_mpi_prom.gbdt_to_lgbm_sg_20230326')

23/03/31 16:19:57 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


In [36]:
spark.sql("""
    select
        *
    from
        dev_mpi_prom.gbdt_to_lgbm_sg_20230326
""").toPandas()

Unnamed: 0,pos_id,old_theta,new_theta
0,0,1.0,1.0
1,1,0.869443,0.88687
2,2,0.814692,0.833587
3,3,0.759742,0.77806
4,4,0.52255,0.540496
5,5,0.493076,0.508217
6,6,0.444317,0.457045
7,7,0.434559,0.446989
8,8,0.414384,0.426251
9,9,0.41444,0.42172


In [21]:
spark.stop()