<a href="https://colab.research.google.com/github/XinyaoWa/hydroai-colab/blob/main/Use_RecDP_to_run_TwitterRecSys2021_data_preprocess.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Environment Preparation**

**Install Pyspark**

In [1]:
pip install pyspark==3.0.0



**Install RecDP**

In [2]:
pip install pyrecdp



**Import libs**

In [3]:
import gc,time,os
import pandas as pd
import numpy as np
from pyspark.sql import *
from pyspark import *
import pyspark.sql.functions as f
import pyspark.sql.types as t
import pyrecdp
from pyrecdp.data_processor import *
from pyrecdp.encoder import *
from pyrecdp.utils import *

**Prepare folder**

recsys2021: use to save all temporary data

spark_tmp: use to save spark temporary data

data: use to save temporary processing data

In [4]:
!mkdir recsys2021
os.chdir('recsys2021')
!mkdir spark_tmp
!mkdir data
!ls

mkdir: cannot create directory ‘recsys2021’: File exists
mkdir: cannot create directory ‘spark_tmp’: File exists
mkdir: cannot create directory ‘data’: File exists
data  original_TwitterRecSys2021_sample.parquet  spark_tmp


dicts_folder: use to save all target encoding dicts

In [5]:
path_prefix = './'
current_path = 'data/'
dicts_folder = "recsys_dicts/"

**Start Spark**

In [6]:
recdp_path = pyrecdp.__path__[0]
scala_udf_jars = recdp_path + "/ScalaProcessUtils/target/recdp-scala-extensions-0.1.0-jar-with-dependencies.jar"

spark = SparkSession.builder.master('local')\
    .config("spark.driver.memory", '4g')\
    .config("spark.local.dir", path_prefix+"/spark_tmp")\
    .config("spark.driver.extraClassPath", f"{scala_udf_jars}")\
    .getOrCreate()

## **Dataset Preparation**

**Download sample dataset**

In [7]:
! wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1jm_bqkzV1CYgH885iaYsDoal1N8J-sE6' -O original_TwitterRecSys2021_sample.parquet

--2022-01-30 12:02:06--  https://docs.google.com/uc?export=download&id=1jm_bqkzV1CYgH885iaYsDoal1N8J-sE6
Resolving docs.google.com (docs.google.com)... 172.217.1.206, 2607:f8b0:4004:832::200e
Connecting to docs.google.com (docs.google.com)|172.217.1.206|:443... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://doc-00-1s-docs.googleusercontent.com/docs/securesc/ha0ro937gcuc7l7deffksulhg5h7mbp1/tk59endgp4iq0acbnq2ha7po5adgia7e/1643544075000/18016102022855825607/*/1jm_bqkzV1CYgH885iaYsDoal1N8J-sE6?e=download [following]
--2022-01-30 12:02:07--  https://doc-00-1s-docs.googleusercontent.com/docs/securesc/ha0ro937gcuc7l7deffksulhg5h7mbp1/tk59endgp4iq0acbnq2ha7po5adgia7e/1643544075000/18016102022855825607/*/1jm_bqkzV1CYgH885iaYsDoal1N8J-sE6?e=download
Resolving doc-00-1s-docs.googleusercontent.com (doc-00-1s-docs.googleusercontent.com)... 142.251.33.193, 2607:f8b0:4004:837::2001
Connecting to doc-00-1s-docs.googleusercontent.com (doc-00-1s-docs.googleu

## **Data Preprocessing**

**Define RecDP processor**

In [8]:
proc = DataProcessor(spark, path_prefix,current_path=current_path, dicts_path=dicts_folder,shuffle_disk_capacity="10GB",spark_mode='local')

recdp-scala-extension is enabled
per core memory size is 2.000 GB and shuffle_disk maximum capacity is 10.000 GB


**Read Data with Spark**

In [9]:
df = spark.read.parquet(path_prefix + "original_TwitterRecSys2021_sample.parquet")

In [10]:
df = df.withColumnRenamed('enaging_user_following_count', 'engaging_user_following_count')
df = df.withColumnRenamed('enaging_user_is_verified', 'engaging_user_is_verified')
df = df.drop("tokens")
gc.collect()
print("data loaded!")

data loaded!


**Define Features**

target_list: target columns

feature_list: The final preprocessed features

TE_col_features: The columns used for target encoding

In [11]:
target_list = ['reply_timestamp', 'like_timestamp']
features_list = ['engaged_with_user_follower_count', 'engaging_user_is_verified', 'TE_engaged_with_user_id_reply_timestamp', 'TE_engaged_with_user_id_like_timestamp', 'TE_language_engaged_with_user_id_reply_timestamp', 'TE_language_engaged_with_user_id_like_timestamp', "len_media",]
TE_col_features = [['engaged_with_user_id'], ['language', 'engaged_with_user_id'],]


**Preprocess some basic features**

Add new features: len_media

Binarize two target columns

In [12]:
# fill nan features
op_fillna = FillNA(["present_media"],'')

# Use udf function to add new features
count_media = f.udf(lambda x: x.count('\t')+1 if x != '' else 0, t.IntegerType())
op_count_media = FeatureAdd(
    cols={'len_media': 'present_media'}, udfImpl=count_media)

# modify features
op_feature_target_classify = FeatureModification(cols={
    "reply_timestamp": "f.when(f.col('reply_timestamp') > 0, 1).otherwise(0)",
    "like_timestamp": "f.when(f.col('like_timestamp') > 0, 1).otherwise(0)"}, op='inline')
  
# execute
proc.reset_ops([op_fillna,
                op_count_media,
                op_feature_target_classify])
df = proc.transform(df, name="decode_data")

print("data decoded!")

save data to .//data//decode_data
data decoded!


**5-fold Target Endcoding**

Add a "fold" column

In [13]:
df = df.withColumn("fold", f.round(f.rand(seed=42)*(5-1)).cast("int"))

Calculate mean value for each target

In [14]:
y_mean_all = []  
for tgt in target_list:
    tmp = df.groupBy().mean(tgt).collect()[0]
    y_mean_all.append(tmp[f"avg({tgt})"])

Calculte target encoding dicts

In [15]:
te_dfs = []
for c in TE_col_features:
    out_name = ""
    out_col_list = []
    for tgt in target_list:
        out_col_list.append('TE_'+'_'.join(c)+'_'+tgt)
        out_name = 'TE_'+'_'.join(c)
    encoder = TargetEncoder(proc, c, target_list, out_col_list, out_name, out_dtype=t.FloatType(), y_mean_list=y_mean_all)
    te_train_df, _ = encoder.transform(df)
    te_dfs.append({'col_name': ['fold'] + (c if isinstance(c, list) else [c]), 'dict': te_train_df})
   
print("data encoded!")

data encoded!


Merge data with target encoding dicts 

In [16]:
op_merge_to_train = ModelMerge(te_dfs)
proc.reset_ops([op_merge_to_train])
df = proc.transform(df, name="te_data")

save data to .//data//te_data


## **Select final feature lists**

In [17]:
op_select = SelectFeature(target_list + features_list)
proc.reset_ops([op_select])
df = proc.transform(df, name="preprocessed_data")

save data to .//data//preprocessed_data


In [18]:
df.printSchema()

root
 |-- reply_timestamp: integer (nullable = true)
 |-- like_timestamp: integer (nullable = true)
 |-- engaged_with_user_follower_count: long (nullable = true)
 |-- engaging_user_is_verified: boolean (nullable = true)
 |-- TE_engaged_with_user_id_reply_timestamp: float (nullable = true)
 |-- TE_engaged_with_user_id_like_timestamp: float (nullable = true)
 |-- TE_language_engaged_with_user_id_reply_timestamp: float (nullable = true)
 |-- TE_language_engaged_with_user_id_like_timestamp: float (nullable = true)
 |-- len_media: integer (nullable = true)

