In [1]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.linalg import Vectors
from pyspark.ml.pipeline import Pipeline
from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import udf
from pyspark.sql.session import SparkSession
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.types import Row, StringType

In [2]:
def convert_value_to_row(df: DataFrame):
    temp = df.rdd.map(lambda x: Vectors.dense(x.split(',')[:-1])).toDF(['features'])
    return temp

In [3]:
spark = SparkSession \
        .builder \
        .appName("CustomerProfiling") \
        .getOrCreate()

In [4]:
historic_data = spark\
        .read\
        .csv('transfer_history.csv', header=True, inferSchema=True)

In [7]:
historic_data.show()

+---+-----------+---------+--------+-----------+----------+
|_c0|TRANSFER_ID| PARTY_ID|CURRENCY|     AMOUNT|VALUE_DATE|
+---+-----------+---------+--------+-----------+----------+
|  0|  369832601|240924299|     HUF|   500000.0|2018-08-22|
|  1|  369832704|323871147|     HUF|   190000.0|2018-08-22|
|  2|  369832705|349707008|     HUF|  1454874.0|2018-08-22|
|  3|  369832706|349707008|     HUF|  1502821.0|2018-08-22|
|  4|  369832784|240285235|     HUF|   369000.0|2018-08-22|
|  5|  369832962|234958234|     HUF|   522821.0|2018-08-22|
|  6|  369832963|234958234|     HUF|  3000000.0|2018-08-22|
|  7|  369832964|240253860|     HUF|   275844.0|2018-08-22|
|  8|  369832987|240253860|     HUF|     2273.0|2018-08-22|
|  9|  369834619|254379184|     HUF|     6749.0|2018-08-23|
| 10|  369834658|254379184|     HUF|     6749.0|2018-08-23|
| 11|  369834659|254379184|     HUF|    27229.0|2018-08-23|
| 12|  369834744|240695709|     HUF|   142364.0|2018-08-23|
| 13|  369836864|327135911|     HUF|  45

In [6]:
historic_data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- TRANSFER_ID: integer (nullable = true)
 |-- PARTY_ID: integer (nullable = true)
 |-- CURRENCY: string (nullable = true)
 |-- AMOUNT: double (nullable = true)
 |-- VALUE_DATE: string (nullable = true)



## Convert Value_date to a timestamp


In [8]:
import pyspark.sql.functions as F

dt1 = F.to_timestamp(F.col("VALUE_DATE"), 'yyyy-MM-dd')
historic_data = historic_data.withColumn("VALUE_DATE_dt",dt1)

historic_data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- TRANSFER_ID: integer (nullable = true)
 |-- PARTY_ID: integer (nullable = true)
 |-- CURRENCY: string (nullable = true)
 |-- AMOUNT: double (nullable = true)
 |-- VALUE_DATE: string (nullable = true)
 |-- VALUE_DATE_dt: timestamp (nullable = true)



## One-Hot Encoder for CURRENCY


In [9]:
customers_1 = historic_data.groupBy('CURRENCY').count().orderBy('count')
customers_1.show()

+--------+------+
|CURRENCY| count|
+--------+------+
|     HUF|307402|
+--------+------+



In [10]:
grouped_curr = historic_data.groupBy('PARTY_ID') \
   .agg(F.collect_list('CURRENCY').alias('transfer_curr'))

In [11]:
grouped_curr.show()

+---------+--------------------+
| PARTY_ID|       transfer_curr|
+---------+--------------------+
|235379892|[HUF, HUF, HUF, H...|
|251660351|               [HUF]|
|356882768|          [HUF, HUF]|
|362100367|[HUF, HUF, HUF, H...|
|399979245|[HUF, HUF, HUF, H...|
|414669076|     [HUF, HUF, HUF]|
|420642369|          [HUF, HUF]|
|364296518|[HUF, HUF, HUF, HUF]|
|570509789|[HUF, HUF, HUF, H...|
|250704129|[HUF, HUF, HUF, H...|
|251660375|[HUF, HUF, HUF, H...|
|258509472|          [HUF, HUF]|
|341219830|[HUF, HUF, HUF, H...|
|350315114|[HUF, HUF, HUF, H...|
|380989746|     [HUF, HUF, HUF]|
|430208176|[HUF, HUF, HUF, H...|
|564302036|[HUF, HUF, HUF, H...|
|583029222|[HUF, HUF, HUF, H...|
|353440239|[HUF, HUF, HUF, H...|
|240557364|[HUF, HUF, HUF, H...|
+---------+--------------------+
only showing top 20 rows



In [12]:
#Easier way to do the OnehotEncoding compared to the previous method

from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol='transfer_curr', outputCol='TRANSFER_CURRENCY_VEC')

transformed_df = cv.fit(grouped_curr).transform(grouped_curr)
transformed_df.show()

+---------+--------------------+---------------------+
| PARTY_ID|       transfer_curr|TRANSFER_CURRENCY_VEC|
+---------+--------------------+---------------------+
|235379892|[HUF, HUF, HUF, H...|       (1,[0],[17.0])|
|251660351|               [HUF]|        (1,[0],[1.0])|
|356882768|          [HUF, HUF]|        (1,[0],[2.0])|
|362100367|[HUF, HUF, HUF, H...|       (1,[0],[20.0])|
|399979245|[HUF, HUF, HUF, H...|      (1,[0],[491.0])|
|414669076|     [HUF, HUF, HUF]|        (1,[0],[3.0])|
|420642369|          [HUF, HUF]|        (1,[0],[2.0])|
|364296518|[HUF, HUF, HUF, HUF]|        (1,[0],[4.0])|
|570509789|[HUF, HUF, HUF, H...|        (1,[0],[6.0])|
|250704129|[HUF, HUF, HUF, H...|       (1,[0],[23.0])|
|251660375|[HUF, HUF, HUF, H...|     (1,[0],[1485.0])|
|258509472|          [HUF, HUF]|        (1,[0],[2.0])|
|341219830|[HUF, HUF, HUF, H...|        (1,[0],[6.0])|
|350315114|[HUF, HUF, HUF, H...|       (1,[0],[22.0])|
|380989746|     [HUF, HUF, HUF]|        (1,[0],[3.0])|
|430208176

In [13]:
transformed_df.count()

2174

## AMOUNT aggregated features

In [14]:
sum_A = historic_data.groupBy('PARTY_ID').sum('AMOUNT')
sum_A.show()

+---------+------------+
| PARTY_ID| sum(AMOUNT)|
+---------+------------+
|362100367|     68987.0|
|235379892|    976021.0|
|399979245|1.30070542E8|
|251660351|      2273.0|
|420642369|      4546.0|
|356882768|     78000.0|
|414669076|     10629.0|
|570509789|    161819.0|
|364296518|      9092.0|
|251660375|1.52409733E8|
|350315114|     50006.0|
|258509472|      4546.0|
|564302036| 4.9792972E7|
|583029222| 4.8543471E7|
|250704129|     52279.0|
|380989746|   2280094.0|
|341219830|     79083.0|
|430208176|     67027.0|
|353440239|     13638.0|
|320518642| 4.6434395E7|
+---------+------------+
only showing top 20 rows



In [15]:
max_A = historic_data.groupBy('PARTY_ID').max('AMOUNT')
max_A.show()

+---------+-----------+
| PARTY_ID|max(AMOUNT)|
+---------+-----------+
|362100367|    25800.0|
|235379892|   219395.0|
|399979245|1.0114407E7|
|251660351|     2273.0|
|420642369|     2273.0|
|356882768|    40000.0|
|414669076|     3543.0|
|570509789|    75000.0|
|364296518|     2273.0|
|251660375|  4200000.0|
|350315114|     2273.0|
|258509472|     2273.0|
|564302036|  2822821.0|
|583029222|  5761218.0|
|250704129|     2273.0|
|380989746|  1543000.0|
|341219830|    29083.0|
|430208176|    25000.0|
|353440239|     2273.0|
|320518642|  4250000.0|
+---------+-----------+
only showing top 20 rows



In [16]:
min_A = historic_data.groupBy('PARTY_ID').min('AMOUNT')
min_A.show()

+---------+-----------+
| PARTY_ID|min(AMOUNT)|
+---------+-----------+
|362100367|     2273.0|
|235379892|     2979.0|
|399979245|     1000.0|
|251660351|     2273.0|
|420642369|     2273.0|
|356882768|    38000.0|
|414669076|     3543.0|
|570509789|     2273.0|
|364296518|     2273.0|
|251660375|     1000.0|
|350315114|     2273.0|
|258509472|     2273.0|
|564302036|      809.0|
|583029222|       69.0|
|250704129|     2273.0|
|380989746|   295051.0|
|341219830|     2000.0|
|430208176|     2273.0|
|353440239|     2273.0|
|320518642|      986.0|
+---------+-----------+
only showing top 20 rows



In [17]:
avg_A = historic_data.groupBy('PARTY_ID').mean('AMOUNT')
avg_A.show()

+---------+------------------+
| PARTY_ID|       avg(AMOUNT)|
+---------+------------------+
|362100367|           3449.35|
|235379892|           57413.0|
|399979245|264909.45417515276|
|251660351|            2273.0|
|420642369|            2273.0|
|356882768|           39000.0|
|414669076|            3543.0|
|570509789|26969.833333333332|
|364296518|            2273.0|
|251660375|102632.81683501684|
|350315114|            2273.0|
|258509472|            2273.0|
|564302036|145169.01457725948|
|583029222| 201425.1908713693|
|250704129|            2273.0|
|380989746| 760031.3333333334|
|341219830|           13180.5|
|430208176|          8378.375|
|353440239|            2273.0|
|320518642| 138610.1343283582|
+---------+------------------+
only showing top 20 rows



In [18]:
A_df = sum_A.join(avg_A, on=["PARTY_ID"], how="inner")
A_df = A_df.join(min_A, on=["PARTY_ID"], how="inner")
A_df = A_df.join(max_A, on=["PARTY_ID"], how="inner")

In [19]:
A_df.show()

+---------+------------+------------------+-----------+-----------+
| PARTY_ID| sum(AMOUNT)|       avg(AMOUNT)|min(AMOUNT)|max(AMOUNT)|
+---------+------------+------------------+-----------+-----------+
|362100367|     68987.0|           3449.35|     2273.0|    25800.0|
|235379892|    976021.0|           57413.0|     2979.0|   219395.0|
|399979245|1.30070542E8|264909.45417515276|     1000.0|1.0114407E7|
|251660351|      2273.0|            2273.0|     2273.0|     2273.0|
|420642369|      4546.0|            2273.0|     2273.0|     2273.0|
|356882768|     78000.0|           39000.0|    38000.0|    40000.0|
|414669076|     10629.0|            3543.0|     3543.0|     3543.0|
|570509789|    161819.0|26969.833333333332|     2273.0|    75000.0|
|364296518|      9092.0|            2273.0|     2273.0|     2273.0|
|251660375|1.52409733E8|102632.81683501684|     1000.0|  4200000.0|
|350315114|     50006.0|            2273.0|     2273.0|     2273.0|
|258509472|      4546.0|            2273.0|     

## Transfer_COUNT feature

In [21]:
tranCount = historic_data.groupBy('PARTY_ID').count()
tranCount.show()

+---------+-----+
| PARTY_ID|count|
+---------+-----+
|362100367|   20|
|235379892|   17|
|399979245|  491|
|251660351|    1|
|420642369|    2|
|356882768|    2|
|414669076|    3|
|570509789|    6|
|364296518|    4|
|251660375| 1485|
|350315114|   22|
|258509472|    2|
|564302036|  343|
|583029222|  241|
|250704129|   23|
|380989746|    3|
|341219830|    6|
|430208176|    8|
|353440239|    6|
|320518642|  335|
+---------+-----+
only showing top 20 rows



## UNIQUE_Transfer_ID_COUNT per PARTY_ID feature


In [25]:
from pyspark.sql.functions import countDistinct

unique_trans = historic_data.groupBy('PARTY_ID').agg(countDistinct('TRANSFER_ID'))
unique_trans.show()

+---------+------------------+
| PARTY_ID|count(TRANSFER_ID)|
+---------+------------------+
|235379892|                17|
|399979245|               491|
|570509789|                 6|
|414669076|                 3|
|362100367|                20|
|356882768|                 2|
|364296518|                 4|
|420642369|                 2|
|251660351|                 1|
|251660375|              1485|
|583029222|               241|
|564302036|               343|
|430208176|                 8|
|250704129|                23|
|258509472|                 2|
|380989746|                 3|
|350315114|                22|
|353440239|                 6|
|341219830|                 6|
|320518642|               335|
+---------+------------------+
only showing top 20 rows



## Merging all transformations in one table


In [26]:
final_df = transformed_df.join(A_df, on=["PARTY_ID"], how="inner")
final_df.printSchema()

root
 |-- PARTY_ID: integer (nullable = true)
 |-- transfer_curr: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- TRANSFER_CURRENCY_VEC: vector (nullable = true)
 |-- sum(AMOUNT): double (nullable = true)
 |-- avg(AMOUNT): double (nullable = true)
 |-- min(AMOUNT): double (nullable = true)
 |-- max(AMOUNT): double (nullable = true)



In [27]:
final_df = final_df.join(tranCount, on=["PARTY_ID"], how="inner")
final_df.printSchema()

root
 |-- PARTY_ID: integer (nullable = true)
 |-- transfer_curr: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- TRANSFER_CURRENCY_VEC: vector (nullable = true)
 |-- sum(AMOUNT): double (nullable = true)
 |-- avg(AMOUNT): double (nullable = true)
 |-- min(AMOUNT): double (nullable = true)
 |-- max(AMOUNT): double (nullable = true)
 |-- count: long (nullable = false)



In [28]:
final_df = final_df.join(unique_trans, on=["PARTY_ID"], how="inner")
final_df.printSchema()

root
 |-- PARTY_ID: integer (nullable = true)
 |-- transfer_curr: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- TRANSFER_CURRENCY_VEC: vector (nullable = true)
 |-- sum(AMOUNT): double (nullable = true)
 |-- avg(AMOUNT): double (nullable = true)
 |-- min(AMOUNT): double (nullable = true)
 |-- max(AMOUNT): double (nullable = true)
 |-- count: long (nullable = false)
 |-- count(TRANSFER_ID): long (nullable = false)



In [31]:
final_df = final_df.withColumnRenamed("count(TRANSFER_ID)","UNIQ_TRANSFER_ID_COUNT")
final_df.printSchema()

root
 |-- PARTY_ID: integer (nullable = true)
 |-- transfer_curr: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- TRANSFER_CURRENCY_VEC: vector (nullable = true)
 |-- sum(AMOUNT): double (nullable = true)
 |-- avg(AMOUNT): double (nullable = true)
 |-- min(AMOUNT): double (nullable = true)
 |-- max(AMOUNT): double (nullable = true)
 |-- count: long (nullable = false)
 |-- UNIQ_TRANSFER_ID_COUNT: long (nullable = false)



In [32]:
final_df.show()

+---------+--------------------+---------------------+------------+------------------+-----------+-----------+-----+----------------------+
| PARTY_ID|       transfer_curr|TRANSFER_CURRENCY_VEC| sum(AMOUNT)|       avg(AMOUNT)|min(AMOUNT)|max(AMOUNT)|count|UNIQ_TRANSFER_ID_COUNT|
+---------+--------------------+---------------------+------------+------------------+-----------+-----------+-----+----------------------+
|235379892|[HUF, HUF, HUF, H...|       (1,[0],[17.0])|    976021.0|           57413.0|     2979.0|   219395.0|   17|                    17|
|251660351|               [HUF]|        (1,[0],[1.0])|      2273.0|            2273.0|     2273.0|     2273.0|    1|                     1|
|356882768|          [HUF, HUF]|        (1,[0],[2.0])|     78000.0|           39000.0|    38000.0|    40000.0|    2|                     2|
|362100367|[HUF, HUF, HUF, H...|       (1,[0],[20.0])|     68987.0|           3449.35|     2273.0|    25800.0|   20|                    20|
|364296518|[HUF, HUF

In [33]:
final_df.toPandas().to_csv('pyspark_transfers.csv')