In [None]:
!sudo wget -O /usr/local/spark/jars/hadoop-aws-3.3.1.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar

In [None]:
!sudo wget -O /usr/local/spark/jars/aws-java-sdk-bundle-1.11.901.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.901/aws-java-sdk-bundle-1.11.901.jar

In [3]:
from pyspark.conf import SparkConf
from pyspark.ml.feature import Imputer, StringIndexer, IndexToString
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import when, count, col, lit, udf, isnan
from pyspark.sql.types import *
from pyspark import SparkContext
import os

In [4]:
AWS_S3_CUSTOM_ENDPOINT = "http://storage:9000"
AWS_ACCESS_KEY_ID = "minioadmin"
AWS_SECRET_ACCESS_KEY = "minioadmin"

FRAUD_DETECTION_SRC_DIR = "s3a://ml-data/fraud-data-source"

FRAUD_DETECTION_DB="ml_fraud_detection_db"
FRAUD_DETECTION_SRC_TBL=f"{FRAUD_DETECTION_DB}.tb_fraud"

FRAUD_DETECTION_OUTPUT_DB_DIR = f"s3a://ml-data/dev/{FRAUD_DETECTION_DB}"
FRAUD_DETECTION_OUTPUT_DIR = f"{FRAUD_DETECTION_OUTPUT_DB_DIR}/tb_fraud"
LABEL_COLUMN = "isFraud"

In [5]:
conf = SparkConf()
conf.setMaster("local[6]")
conf.set("spark.driver.memory", "10g")
conf.set("spark.executor.memory", "4g")
conf.set("spark.executor.cores", "1")
conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.hadoop.parquet.enable.summary-metadata", "false")
conf.set("spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
conf = conf.set("hive.metastore.uris", "thrift://hive-metastore:9083")

In [7]:
conf.set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID)
conf.set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
conf.set("spark.hadoop.fs.s3a.endpoint", AWS_S3_CUSTOM_ENDPOINT)
conf.set("spark.hadoop.fs.s3a.path.style.access", True)
conf=conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

In [8]:
spark = SparkSession.builder.appName("EnrichProcessor").config(conf=conf).enableHiveSupport().getOrCreate()

In [9]:
train_transaction_df = spark.read.csv(f"{FRAUD_DETECTION_SRC_DIR}/train_transaction.csv",inferSchema=True, header=True)
train_identity_df = spark.read.csv(f"{FRAUD_DETECTION_SRC_DIR}/train_identity.csv",inferSchema=True, header=True)

In [10]:
spark.conf.set("spark.sql.shuffle.partitions", '12')
union_df = train_transaction_df \
    .join(train_identity_df, on="transactionID", how="left") \
    .orderBy("transactionDT")

In [11]:
final_df = union_df.select(
        ['TransactionAmt', 'ProductCD', 'card1', 'card2', 'card3', 'card4', 'card5', 'card6', 'dist1',
         'P_emaildomain', 'R_emaildomain', 'C1', 'C5',
         'C13', 'D1', 'D3', 'D4', 'D5', 'D10', 'D11', 'D15', 'M2', 'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'M9', 'V2', 'V3',
         'V4', 'V6', 'V8', 'V10',
         'V12', 'V15', 'V19', 'V22', 'V23', 'V25', 'V29', 'V35', LABEL_COLUMN]
    )

In [12]:
def get_categorical_columns(df: DataFrame, label_column_name: str) -> list:
    fields = df.schema.fields
    cat_columns = list()
    for f in fields:
        if f.dataType.typeName() == 'string' and f.name != label_column_name:
            cat_columns.append(f.name)
    return cat_columns

def get_numerical_columns(df: DataFrame, label_column_name: str) -> list:
    fields = df.schema.fields
    num_columns = list()
    for f in fields:
        if f.dataType.typeName() != 'string' and f.name != label_column_name:
            num_columns.append(f.name)
    return num_columns

In [13]:
def calculate(df, columns, strategy="mean"):
    df = df.select(columns)
    imputed_values = dict()
    if strategy == "mode":
        for c in columns:
            calculate_mode(c, df, imputed_values)
    elif strategy == "mean":
        calculate_mean(columns, df, imputed_values)
    return imputed_values


def calculate_mean(columns, df, imputed_values):
    single_cat_df = df.withColumn("df_id", lit(1))
    single_cat_df.createOrReplaceTempView(f"group_df")

    avg_columns = [f"avg({c}) as {c}" for c in columns]
    avg_columns_str = ",".join(avg_columns)
    group_sql_df = spark.sql(f"select {avg_columns_str} from group_df group by df_id")
    avg_row = group_sql_df.take(1)[0]
    for c in columns:
        imputed_values[c] = avg_row[c]


def calculate_mode(c, df, imputed_values):
    single_cat_df = df.select(c)
    group_df = single_cat_df.groupby(c).count()
    group_df.createOrReplaceTempView(f"{c}_group_df")
    group_sql_df = spark.sql(
        f"select {c}, rank() over (partition by count order by count desc) as max_count, count from {c}_group_df where {c} is not null")
    imputed_values[c] = group_sql_df.take(1)[0][c]

In [14]:
categorical_columns = get_categorical_columns(final_df, LABEL_COLUMN)
print(categorical_columns)

numerical_columns = get_numerical_columns(final_df, LABEL_COLUMN)
print(numerical_columns)

['ProductCD', 'card4', 'card6', 'P_emaildomain', 'R_emaildomain', 'M2', 'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'M9']
['TransactionAmt', 'card1', 'card2', 'card3', 'card5', 'dist1', 'C1', 'C5', 'C13', 'D1', 'D3', 'D4', 'D5', 'D10', 'D11', 'D15', 'V2', 'V3', 'V4', 'V6', 'V8', 'V10', 'V12', 'V15', 'V19', 'V22', 'V23', 'V25', 'V29', 'V35']


In [15]:
mode_values = calculate(final_df, categorical_columns, strategy="mode")

In [16]:
cat_imputed_final_df = final_df.select(
        [(when(isnan(c) | col(c).isNull(), mode_values[c]).otherwise(final_df[c])).alias(c) for c in
         categorical_columns] + numerical_columns + [LABEL_COLUMN])

In [17]:
mean_values = calculate(cat_imputed_final_df, numerical_columns, strategy="mean")

In [18]:
num_imputed_final_df = cat_imputed_final_df \
        .select([(when(isnan(c) | col(c).isNull(), mean_values[c]).otherwise(final_df[c])).alias(c) for c in
                 numerical_columns] + categorical_columns + [LABEL_COLUMN])

num_imputed_final_df.createOrReplaceTempView("imputed_final_tmp_tbl")

In [20]:
# num_imputed_final_df.coalesce(1).write.parquet(FRAUD_DETECTION_OUTPUT_DIR, mode="overwrite", compression="snappy")
table_df = spark.sql(f"CREATE DATABASE IF NOT EXISTS {FRAUD_DETECTION_DB} LOCATION '{FRAUD_DETECTION_OUTPUT_DB_DIR}'")

In [21]:
final_output_table_columns = [f"{field.name} {field.dataType.simpleString()}" for field in num_imputed_final_df.schema.fields]

In [22]:
final_output_table_columns_str = ",".join(final_output_table_columns)

In [23]:
final_output_table_columns_str

'TransactionAmt double,card1 double,card2 double,card3 double,card5 double,dist1 double,C1 double,C5 double,C13 double,D1 double,D3 double,D4 double,D5 double,D10 double,D11 double,D15 double,V2 double,V3 double,V4 double,V6 double,V8 double,V10 double,V12 double,V15 double,V19 double,V22 double,V23 double,V25 double,V29 double,V35 double,ProductCD string,card4 string,card6 string,P_emaildomain string,R_emaildomain string,M2 string,M3 string,M4 string,M5 string,M6 string,M7 string,M8 string,M9 string,isFraud int'

In [25]:
spark.sql(f"DROP TABLE IF EXISTS {FRAUD_DETECTION_SRC_TBL}")
create_table_df = spark.sql(f"CREATE TABLE IF NOT EXISTS {FRAUD_DETECTION_SRC_TBL} ({final_output_table_columns_str}) STORED AS PARQUET LOCATION '{FRAUD_DETECTION_OUTPUT_DIR}'")

In [26]:
inserted_response_df = spark.sql(f"INSERT OVERWRITE {FRAUD_DETECTION_SRC_TBL} select * from imputed_final_tmp_tbl distribute by 4")

In [27]:
spark.sql(f"select count(*) from {FRAUD_DETECTION_SRC_TBL}").show()

+--------+
|count(1)|
+--------+
|  590540|
+--------+

