# Setting Spark

In [1]:
import findspark
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd

from configs.config import DATA_SOURCE_DIR, PROJECT_DIR
from pyspark.sql import SparkSession
from train.transforms.utils import *
from train.transforms.splitting import stratified_splitting
from train.transforms.scaling import robust_scaling
from train.transforms.outliers import log_transformation
from train.transforms.categorical_data import * 
from train.transforms.categorical_data import *
from train.transforms.correlation import *

In [2]:
findspark.init()
findspark.find()

spark= SparkSession \
       .builder \
       .master("local") \
       .appName("Feature Engineering") \
       .config("spark.driver.bindAddress", "0.0.0.0") \
       .config("spark.driver.host", "127.0.0.1") \
       .config("spark.driver.port", "4041") \
       .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/31 14:49:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv(str(DATA_SOURCE_DIR),header=True,escape="\"")

In [4]:
df = cast_incorrect_column_type(df)

# Handle outliers
- Performing log transformation on some feature columns to reduce skewness
- Apply robust scaler to centralized data when the data still have outliers

In [5]:
df = log_transformation(df, target_cols=["person_age", "person_income", "person_emp_exp", "loan_amnt", "loan_percent_income", "cb_person_cred_hist_length"])

In [6]:
df = robust_scaling(df, target_cols=["person_age", "person_income", "person_emp_exp", "loan_amnt", "loan_percent_income", "cb_person_cred_hist_length"])

# Transforming categorical data

In [7]:
categorical_cols = find_categorical_cols(df)
pipeline = onehot_encoding_pipeline(df, categorical_cols)
df = pipeline.transform(df)

In [8]:
columns_to_drop = categorical_cols + [col + "_index" for col in categorical_cols]
df = df.drop(*columns_to_drop)

In [9]:
df_pandas = df.toPandas()

                                                                                

In [10]:
for c in categorical_cols:
    encoded_cols = df_pandas[c + "_encoded"].map(lambda x: list(x.toArray()))
    expanded_df = pd.DataFrame(encoded_cols.tolist(), columns=[c + f"_encoded_{i}" for i in range(len(encoded_cols.iloc[0]))])
    df_pandas = pd.concat([df_pandas, expanded_df], axis=1)
    df_pandas = df_pandas.drop(c + "_encoded", axis=1)

# Stratified Splitting

In [11]:
train_df, test_df, val_df = stratified_splitting(
    df_pandas, train_size=0.8, test_size=0.1, val_size=0.1, random_state=42
)

# Save as HDFS file

In [12]:
save_to_parquet(train_df, str(PROJECT_DIR / "data" / "features" / "feature_engineering_2.h5"), key="train", mode="w")
save_to_parquet(test_df, str(PROJECT_DIR / "data" / "features" / "feature_engineering_2.h5"), key="test")
save_to_parquet(val_df, str(PROJECT_DIR / "data" / "features" / "feature_engineering_2.h5"), key="val")

24/12/31 23:42:35 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 933455 ms exceeds timeout 120000 ms
24/12/31 23:42:35 WARN SparkContext: Killing executors is not supported by current scheduler.
24/12/31 23:42:39 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$