In [1]:
%run -i start_spark.py --master local[*] --driver-memory 10g
#%run -i start_spark.py --master local[*] --driver-memory 2g
#%run -i start_spark.py --master spark://192.168.0.100:7077 --driver-memory 10g

USE_IBM_COS = False

Creating SparkContext...
SparkContext created.
Master URL: local[*]


**Do not forget to close the session with spark.stop()**

In [2]:
%matplotlib inline

import coursera_common as cc
import os

from pyspark import SparkFiles
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler, StringIndexer, StringIndexerModel, VectorAssembler, VectorIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

from pyspark.sql.types import *
from pyspark.sql.functions import udf, substring, stddev, mean, col

# Data preparation

## ETL: Convert CSV to Parquet

The raw data available in a CSV file is converted in a Parquet dataset for efficiency reasons. The CSV file can be located in the IBM Watson Studio Cloud Object storage to allow parallel read over Spark.

In [3]:
#Writing payments to parquet
if os.path.exists( cc.PAYMENTS_PQT_FILENAME ):
    payments = cc.readParquet(spark, cc.PAYMENTS_PQT_FILENAME)
else:
    filename = cc.PAYMENTS_CSV_FILENAME

    #Check if IBM Cloud Object Storage will be used
    if USE_IBM_COS :
        import ibmos2spark
        # @hidden_cell
        credentials = {
            'endpoint': 'https://s3.eu-geo.objectstorage.service.networklayer.com',
            'service_id': 'iam-ServiceId-d30feaff-721a-4cf0-b305-db594743f4f7',
            'iam_service_endpoint': 'https://iam.eu-de.bluemix.net/oidc/token',
            'api_key': 'hLy1fiHN4uR1fjiwu0IcuNxWKPk_XZc0po_xrkocAiEm'
        }

        configuration_name = 'os_7d393bdcf999474b8bdc4df51a6fb856_configs'
        cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')
        filename = cos.url('PS_20174392719_1491204439457_log.csv', 'advanceddatasciencecapstone-donotdelete-pr-jfzvg92smkwger')

    #
    # Workaround to force run SparkFiles.get on workers
    # spark.sparkContext.parallelize(range(1)).map(lambda x: SparkFiles.get(PAYMENTS_CSV_FILENAME)).first()
    #
    print('Reading data from {}'.format(filename))
    payments = spark.read.csv(filename, inferSchema=True, header=True, mode='DROPMALFORMED')
    print('CSV file loaded!')

    cc.writeParquet(payments, cc.PAYMENTS_PQT_FILENAME)

print('Schema:')
payments.printSchema()

payments.createOrReplaceTempView('payments')
print('payments view created!')

payments.show(5)

Restoring data/payments.v2.parquet...
Complete!
Schema:
root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)

payments view created!
+----+-------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
| 156|CASH_IN| 27287.59|C1634109320|   8673566.52|    8700854

## Feature Engineering

Features are extracted and encoded. This notebook contains only the common part. Model specific encodings, normalizations, etc are part of the model notebooks.

It can be observed that the IDs of Origin and Destination customers embed the customer type (C: Customer, M: Merchant). This allows a fast encoding of the Features. The Origin and Destination types can be also extracted as a separate features. The two variables will anywya highly correlate with the customer IDs and might result redundant.

In [4]:
def encodeName( s ):
    '''
    the fuction transforms a name string of type [A-Z][0-9]+ into an integer

    :param s: the string to be converted. It should be of type [A-Z][0-9]+
    '''
    # 1E12 is the maximum length of a string
    return int(1E12 * ord(s[0])) + int(s[1:])

udf_encodeName = udf(encodeName, LongType())

payments = payments.withColumn('vOrig', udf_encodeName(payments.nameOrig)) \
                    .withColumn('vOrigType', substring(payments.nameOrig, 0, 1)) \
                    .withColumn('vDest', udf_encodeName(payments.nameDest)) \
                    .withColumn('vDestType', substring(payments.nameDest, 0, 1))

payments.show(5)

+----+-------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+---------+--------------+---------+
|step|   type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|         vOrig|vOrigType|         vDest|vDestType|
+----+-------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+--------------+---------+--------------+---------+
| 156|CASH_IN| 27287.59|C1634109320|   8673566.52|    8700854.11|C1542157370|     558897.81|     531610.22|      0|             0|67001634109320|        C|67001542157370|        C|
| 156|CASH_IN|165163.88|C1684617623|   8700854.11|    8866017.99|C1163394047|     272799.53|     107635.65|      0|             0|67001684617623|        C|67001163394047|        C|
| 156|CASH_IN| 79644.29| C577027691|   8866017.99|    8945662.28|C1036272123|     226798.95|   

In [5]:
payments.select('vOrigType').distinct().show()

+---------+
|vOrigType|
+---------+
|        C|
+---------+



vOrigType does not add any information as the value is constant. We can drop the feature from the list

In [6]:
payments.select('vDestType').distinct().show()

+---------+
|vDestType|
+---------+
|        M|
|        C|
+---------+



The destination can be either a customer or a merchant. We keep the feature

In [7]:
payments.createOrReplaceTempView('payments')

In [8]:
spark.sql("""
select vOrigType, vDestType, type, count(*) as cnt from payments where isFraud = 1 group by vOrigType, vDestType, type
""" ).show()

+---------+---------+--------+----+
|vOrigType|vDestType|    type| cnt|
+---------+---------+--------+----+
|        C|        C|CASH_OUT|4116|
|        C|        C|TRANSFER|4097|
+---------+---------+--------+----+



As observed during the data exploration phase, the frauds affect only transactions of type CASH_OUT and TRANSFER. As both vOriginType and vDestType are of type C, the feature is not relevant.

In [9]:
SCALE_FEATURES = False

cat_columns = ['type', 'vOrig', 'vDest']
num_columns = ['step', 'amount', 'oldbalanceOrg', 'oldbalanceDest']
lbl_columns = ['isFraud']
dl_columns = []

# Encode categorical variables. Columns type and nameOrig

pipe_stages = [
    StringIndexer(inputCol='type', outputCol='typeEnc')
]

pipeline = Pipeline(stages=pipe_stages)

ps = payments.select(num_columns + cat_columns + lbl_columns)

if SCALE_FEATURES:
    print('Normalizing the numerical features...')
    mean_sttdev = payments.select([mean(c) for c in num_columns] + [stddev(c) for c in num_columns]).first()

    # Generate Normalized version of the numerical features
    for nc in num_columns:
        ps = ps.withColumn( f'{nc}Norm', (col(nc) - mean_sttdev[f'avg({nc})']) / mean_sttdev[f'stddev_samp({nc})'] )
        num_columns_scaled.append(f'{nc}Norm')

    print('Encoding variables {}....'.format(['type'] + num_columns))
else:
    print('Encoding variables {}....'.format(['type']))

payments_enc = pipeline.fit(ps).transform(ps) 
print('Complete!')

payments_enc.show(5)

Encoding variables ['type']....
Complete!
+----+---------+-------------+--------------+-------+--------------+--------------+-------+-------+
|step|   amount|oldbalanceOrg|oldbalanceDest|   type|         vOrig|         vDest|isFraud|typeEnc|
+----+---------+-------------+--------------+-------+--------------+--------------+-------+-------+
| 156| 27287.59|   8673566.52|     558897.81|CASH_IN|67001634109320|67001542157370|      0|    2.0|
| 156|165163.88|   8700854.11|     272799.53|CASH_IN|67001684617623|67001163394047|      0|    2.0|
| 156| 79644.29|   8866017.99|     226798.95|CASH_IN|67000577027691|67001036272123|      0|    2.0|
| 156|304420.49|   8945662.28|     1160376.3|CASH_IN|67000276094466|67001040552729|      0|    2.0|
| 156|142280.37|   9250082.78|     645067.07|CASH_IN|67001174098243|67000508275836|      0|    2.0|
+----+---------+-------------+--------------+-------+--------------+--------------+-------+-------+
only showing top 5 rows



It resulted that the selected deep learning models benefit from the additional 'step' feature. Therefore a dedicated vector is assembled for the purpose. The vector to calculate the correlation matrix over the whole dataset need the additional 'isFraud' feature, to spot possible correlations with the other features.

In [10]:
# Add a vector column feature

dl_only_columns = ['step']
filtered_columns = dl_only_columns + ['type','vOrigType', 'vDestType']

if SCALE_FEATURES:
    corr_features_cols = [x for x in payments_enc.columns if x not in filtered_columns and x not in num_columns]
else:
    corr_features_cols = [x for x in payments_enc.columns if x not in filtered_columns]

print('Creating features vector for correlation from columns: {}'.format(corr_features_cols))
vassembler = VectorAssembler(inputCols=corr_features_cols,outputCol='corrFeatures')
payments_enc = vassembler.transform(payments_enc)
print('complete!')

features_cols = [x for x in corr_features_cols if x not in lbl_columns]

print('Creating features vector for random forest from columns: {}'.format(features_cols))
vassembler = VectorAssembler(inputCols=features_cols,outputCol='features')
payments_enc = vassembler.transform(payments_enc)
print('complete!')

print('Indexing features vector...')
vi = VectorIndexer(inputCol="features", outputCol="featuresIndexed", maxCategories=10)
payments_enc = vi.fit(payments_enc).transform(payments_enc)
print('Complete!')

dl_features_cols = dl_only_columns + features_cols

print('Creating features vector for deep learning models from columns: {}'.format(dl_features_cols))
vassembler = VectorAssembler(inputCols=dl_features_cols,outputCol='dlFeatures')
payments_enc = vassembler.transform(payments_enc)
print('complete!')

print('Indexing dl_features vector...')
vi = VectorIndexer(inputCol="dlFeatures", outputCol="dlFeaturesIndexed", maxCategories=10)
payments_enc = vi.fit(payments_enc).transform(payments_enc)
print('Complete!')

Creating features vector for correlation from columns: ['amount', 'oldbalanceOrg', 'oldbalanceDest', 'vOrig', 'vDest', 'isFraud', 'typeEnc']
complete!
Creating features vector for random forest from columns: ['amount', 'oldbalanceOrg', 'oldbalanceDest', 'vOrig', 'vDest', 'typeEnc']
complete!
Indexing features vector...
Complete!
Creating features vector for deep learning models from columns: ['step', 'amount', 'oldbalanceOrg', 'oldbalanceDest', 'vOrig', 'vDest', 'typeEnc']
complete!
Indexing dl_features vector...
Complete!


In [11]:
payments_enc.show(5)

+----+---------+-------------+--------------+-------+--------------+--------------+-------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
|step|   amount|oldbalanceOrg|oldbalanceDest|   type|         vOrig|         vDest|isFraud|typeEnc|        corrFeatures|            features|     featuresIndexed|          dlFeatures|   dlFeaturesIndexed|
+----+---------+-------------+--------------+-------+--------------+--------------+-------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+
| 156| 27287.59|   8673566.52|     558897.81|CASH_IN|67001634109320|67001542157370|      0|    2.0|[27287.59,8673566...|[27287.59,8673566...|[27287.59,8673566...|[156.0,27287.59,8...|[156.0,27287.59,8...|
| 156|165163.88|   8700854.11|     272799.53|CASH_IN|67001684617623|67001163394047|      0|    2.0|[165163.88,870085...|[165163.88,870085...|[165163.88,870085...|[156.0,165163.88,.

In [14]:
#Writing payments to parquet
cc.writeParquet(payments_enc, cc.PAYMENTS_ENC_PQT_FILENAME)

Storing to parquet file data/payments_enc.v6.parquet...
Complete!


The column names used for each feature vector is stored for future use.

In [15]:
cc.save_feature_cols( cc.FEATURES_CONFIG_FILENAME, corr_features_cols, features_cols, dl_features_cols)

writing data/payments_enc.v6.parquet.json
Complete!
