In [53]:
import findspark
findspark.init()
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder, MinMaxScaler
from hdfs.client import Client
from pyspark.sql import SparkSession
import sys
import argparse



def apply_pyspark_feature_one_column(df, feature, inputCols, outputCols, fit=True, transform=True):
    configured_feature = feature().setInputCols(inputCols).setOutputCol(outputCols)
    if fit:
        df = configured_feature.fit(df)
    if transform:
        df = configured_feature.transform(df)   
    return df

def apply_pyspark_feature_multiple_columns(df, feature, inputCols, outputCols, fit=True, transform=True):
    configured_feature = feature().setInputCols(inputCols).setOutputCols(outputCols)
    if fit:
        df = configured_feature.fit(df)
        print(df)
    if transform:
        df = configured_feature.transform(df)   
    return df



In [55]:
#parser = argparse.ArgumentParser()
#parser.add_argument("--partition", help="some useful description.")
#args = parser.parse_args()
#if args.partition:
#    partition = args.partition
#else:
#    partition = sys.argv[1]
partition = 18

file_name = f"/user/airflow/input_files/partition_{partition}.parquet"
target = 'TX_FRAUD'
spark = SparkSession.builder.master("yarn").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
df = spark.read.format("parquet").load(file_name)

numericColumns = list(map(lambda x: x[0], 
                    filter(lambda x: x[1] == "double" or x[1] == "int", 
                        df.dtypes)))
stringColumns = ['TERMINAL_ID']
stringColumnsIndexed = list(map(lambda x: x + "_Indexed", stringColumns))

numeric = apply_pyspark_feature_one_column(df, VectorAssembler, inputCols=numericColumns, outputCols='features', fit=False)
indexed = apply_pyspark_feature_multiple_columns(numeric, StringIndexer, inputCols=stringColumns, outputCols=stringColumnsIndexed)

catColumns = list(map(lambda x: x + "_Coded", stringColumnsIndexed))
encoded = apply_pyspark_feature_multiple_columns(indexed, OneHotEncoder, inputCols=stringColumnsIndexed, outputCols=catColumns)
scaled = apply_pyspark_feature_one_column(encoded, MinMaxScaler, inputCols="features", outputCols="scaledFeatures")

featureColumns = ['scaledFeatures'] + catColumns

result = apply_pyspark_feature_one_column(scaled, VectorAssembler, inputCols=featureColumns, outputCols="featuresFinal", fit=False).select('featuresFinal', target)

result.write.format("parquet").mode("overwrite").save(f"/user/airflow/processed_files/partition_{partition}.parquet")


StringIndexerModel: uid=StringIndexer_e71900dd06f5, handleInvalid=error, numInputCols=1, numOutputCols=1


AttributeError: 'StringIndexer' object has no attribute 'transform'

In [56]:
spark.stop()

In [43]:
stringColumnsIndexed

['TERMINAL_ID_Indexed']

In [44]:
catColumns

['TERMINAL_ID_Indexed_Coded']

In [24]:
stringColumnsIndexed

'TERMINAL_ID_Indexed'

In [12]:
stringColumnsIndexed

['TERMINAL_ID_Indexed']

In [13]:
stringColumns

['TERMINAL_ID']

In [15]:
numeric = apply_pyspark_feature(df, VectorAssembler, inputCols=numericColumns, outputCols='features', fit=False)

In [16]:
numeric.dtypes

[('TRANSACTION_ID', 'bigint'),
 ('TX_DATETIME', 'timestamp'),
 ('CUSTOMER_ID', 'bigint'),
 ('TERMINAL_ID', 'bigint'),
 ('TX_AMOUNT', 'double'),
 ('TX_TIME_SECONDS', 'bigint'),
 ('TX_TIME_DAYS', 'bigint'),
 ('TX_FRAUD', 'bigint'),
 ('TX_FRAUD_SCENARIO', 'bigint'),
 ('features', 'vector')]

In [14]:
numeric

TRANSACTION_ID,TX_DATETIME,CUSTOMER_ID,TERMINAL_ID,TX_AMOUNT,TX_TIME_SECONDS,TX_TIME_DAYS,TX_FRAUD,TX_FRAUD_SCENARIO,features
0,2018-04-01 00:51:53,14,28,7.47,3113,0,0,0,[7.47]
1,2018-04-01 01:11:00,8,16,17.84,4260,0,0,0,[17.84]
2,2018-04-01 01:18:19,181,20,156.38,4699,0,0,0,[156.38]
3,2018-04-01 01:21:07,39,82,104.85,4867,0,0,0,[104.85]
4,2018-04-01 01:28:15,298,24,11.9,5295,0,0,0,[11.9]
5,2018-04-01 01:42:41,64,78,57.73,6161,0,0,0,[57.73]
6,2018-04-01 01:57:13,242,95,30.75,7033,0,0,0,[30.75]
7,2018-04-01 01:59:15,159,10,68.81,7155,0,0,0,[68.81]
8,2018-04-01 02:11:29,451,55,50.34,7889,0,0,0,[50.34]
9,2018-04-01 02:13:54,354,49,55.71,8034,0,0,0,[55.71]


In [None]:
numeric.show(10)