In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ABN-assignment').getOrCreate()

In [2]:
clients = spark.read.csv("../data/input/dataset_one.csv", header=True)

In [3]:
clients.show()

+---+----------+----------+--------------------+--------------+
| id|first_name| last_name|               email|       country|
+---+----------+----------+--------------------+--------------+
|  1|    Feliza|    Eusden|  feusden0@ameblo.jp|        France|
|  2| Priscilla|   Le Pine|plepine1@biglobe....|        France|
|  3|    Jaimie|    Sandes|jsandes2@reuters.com|        France|
|  4|      Nari|   Dolphin|ndolphin3@cbsloca...|        France|
|  5|     Garik|     Farre|gfarre4@economist...|        France|
|  6|   Kordula|   Broodes| kbroodes5@amazon.de|        France|
|  7|     Rakel|   Ingliby|    ringliby6@ft.com| United States|
|  8|      Derk| Mattielli|dmattielli7@slide...| United States|
|  9|    Karrah|   Boshard|   kboshard8@ihg.com|        France|
| 10| Friedrich|  Kreutzer|fkreutzer9@busine...|        France|
| 11|      Conn|   Claiden| cclaidena@vimeo.com|        France|
| 12|     Karel|   Crippin| kcrippinb@google.pl|        France|
| 13| Millisent|     Joint| mjointc@stat

In [4]:
client_data = spark.read.csv("../data/input/dataset_two.csv", header=True)

In [5]:
client_data.show()

+---+--------------------+--------------------+------------------+
| id|               btc_a|                cc_t|              cc_n|
+---+--------------------+--------------------+------------------+
|  1|1wjtPamAZeGhRnZfh...|       visa-electron|  4175006996999270|
|  2|1Js9BA1rV31hJFmN2...|                 jcb|  3587679584356527|
|  3|1CoG9ciLQVQCnia5o...| diners-club-enroute|   201876885481838|
|  4|1GNvinVKGzPBVNZSc...|              switch|564182038040530730|
|  5|1DHTzZ7ypu3EzWtLB...|                 jcb|  3555559025151828|
|  6|1LWktvit3XBCJNrsj...|                 jcb|  3580083825272493|
|  7|1J71SRGqUjhqPuHaZ...|              switch|491193585665108260|
|  8|1Q5FAwgXbhRxP1uYp...|          mastercard|  5100174550682620|
|  9|1QKy8RoeWR48nrwkn...|diners-club-carte...|    30343863921001|
| 10|1NRDQBCtuDqm8Qomr...|diners-club-carte...|    30559612937267|
| 11|1HcqQ5Ys77sJm3ZJv...|                visa|     4937793997478|
| 12|1EncEr6Vd5ywk96un...|                 jcb|  3569513122126

In [7]:
from transformers import ColumnRemover, CountryFilter, RenameColumns
from pyspark.ml import Pipeline

In [17]:
country_filter = CountryFilter(countries_to_filter=['Netherlands', 'United Kingdom'])
remove_pii_clients = ColumnRemover(columns_to_remove=["first_name", "last_name"])
column_renamer = RenameColumns(column_dict={"id":"client_identifier", "btc_a":"bitcoin_address", "cc_t": "credit_card_type"})
remove_credit_card = ColumnRemover(columns_to_remove=["cc_n"])

client_pipeline = Pipeline(stages=[remove_pii_clients, country_filter])
client_data_pipeline = Pipeline(stages=[remove_credit_card])

In [18]:
client_fit = client_pipeline.fit(clients)
client_output = client_fit.transform(clients)

In [19]:
client_data_fit = client_data_pipeline.fit(client_data)
client_data_output = client_data_fit.transform(client_data)

In [20]:
final_data = client_output.join(client_data_output, on="id")
final_data = column_renamer.transform(final_data)

In [21]:
final_data.show()

+-----------------+--------------------+--------------+--------------------+--------------------+
|client_identifier|               email|       country|     bitcoin_address|    credit_card_type|
+-----------------+--------------------+--------------+--------------------+--------------------+
|               18|rdrinanh@odnoklas...|United Kingdom|1ErM8yuF3ytzzxLy1...|      china-unionpay|
|               32|wbamfordv@t-onlin...|United Kingdom|12sxmYnPcADAXw1Yk...|             maestro|
|               33|swestallw@blinkli...|United Kingdom|1GZ7QB7GUFSWnkBHm...|          mastercard|
|               34|erosengrenx@usato...|United Kingdom|12o8zrHx6snCPbtko...|       visa-electron|
|               36|dbuckthorpz@tmall...|   Netherlands|15X53Z9B9jUNrvFpb...|diners-club-inter...|
|               62|  bbarham1p@wisc.edu|   Netherlands|16qpYVt6YAAx4JYjz...|                 jcb|
|               67|lbeavors1u@techno...|United Kingdom|12ya1ED93ApPBQRSC...|            bankcard|
|               70|f

In [17]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

class ColumnRemover(Transformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable,):

    columns_to_remove = Param(
        Params._dummy(),
        "columns_to_remove",
        "Column to remove",
    )

    @keyword_only
    def __init__(self, columns_to_remove = ['first_name', 'last_name']):
        super(ColumnRemover, self).__init__()
        self._setDefault(columns_to_remove=['first_name', 'last_name'])
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self,  columns_to_remove=['first_name', 'last_name']):
        """
        setParams(self, value=0.0)
        Sets params for this SetValueTransformer.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setValue(self, columns_to_remove):
        """
        Sets the value of :py:attr:`value`.
        """
        return self._set(columns_to_remove=columns_to_remove)

    def getColumnToRemove(self):
        """
        Gets the value of :py:attr:`value` or its default value.
        """
        return self.getOrDefault(self.columns_to_remove)

    def _transform(self, dataset):
        return dataset.drop(*self.getColumnToRemove())

In [22]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
import pyspark.sql.functions as F

class CountryFilter(Transformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable,):

    countries_to_filter = Param(
        Params._dummy(),
        "countries_to_filter",
        "What countries to keep.",
    )

    @keyword_only
    def __init__(self, countries_to_filter = ['first_name', 'last_name']):
        super(CountryFilter, self).__init__()
        self._setDefault(countries_to_filter=['first_name', 'last_name'])
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self,  countries_to_filter=['first_name', 'last_name']):
        """
        setParams(self, value=0.0)
        Sets params for this SetValueTransformer.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setValue(self, countries_to_filter):
        """
        Sets the value of :py:attr:`value`.
        """
        return self._set(countries_to_filter=countries_to_filter)

    def getCountriesToFilter(self):
        """
        Gets the value of :py:attr:`value` or its default value.
        """
        return self.getOrDefault(self.countries_to_filter)

    def _transform(self, dataset):
        return dataset.filter(F.col("country").isin(self.getCountriesToFilter()))

In [25]:
cs = CountryFilter(countries_to_filter=['Netherlands', 'United Kingdom'])

transformed_df = cs.transform(clients)
transformed_df.show()

+---+----------+---------+--------------------+--------------+
| id|first_name|last_name|               email|       country|
+---+----------+---------+--------------------+--------------+
| 18|   Richard|   Drinan|rdrinanh@odnoklas...|United Kingdom|
| 32|    Wallis|  Bamford|wbamfordv@t-onlin...|United Kingdom|
| 33|   Saundra|  Westall|swestallw@blinkli...|United Kingdom|
| 34|     Ewart|Rosengren|erosengrenx@usato...|United Kingdom|
| 36|    Daniel|Buckthorp|dbuckthorpz@tmall...|   Netherlands|
| 62|      Boyd|   Barham|  bbarham1p@wisc.edu|   Netherlands|
| 67|     Lorry|  Beavors|lbeavors1u@techno...|United Kingdom|
| 70|  Freedman|   Dresse|fdresse1x@bloglin...|United Kingdom|
| 91|      Audy|   Eplate|aeplate2i@webeden...|United Kingdom|
|105|     Lacie|  Infante|linfante2w@telegr...|   Netherlands|
|108|   Rhianna|  Partkya|  rpartkya2z@cdc.gov|   Netherlands|
|109|    Marnia|     Dory|    mdory30@uiuc.edu|   Netherlands|
|110|     Rhody|  Swindle|rswindle31@tmall.com|   Nethe

In [29]:
country_filter = CountryFilter(countries_to_filter=['Netherlands', 'United Kingdom'])
transformed_clients = country_filter.transform(clients)
remove_pii_clients = ColumnRemover(columns_to_remove=["first_name", "last_name"])
transformed_clients = remove_pii_clients.transform(transformed_clients)

In [30]:
transformed_clients.show()

+---+--------------------+--------------+
| id|               email|       country|
+---+--------------------+--------------+
| 18|rdrinanh@odnoklas...|United Kingdom|
| 32|wbamfordv@t-onlin...|United Kingdom|
| 33|swestallw@blinkli...|United Kingdom|
| 34|erosengrenx@usato...|United Kingdom|
| 36|dbuckthorpz@tmall...|   Netherlands|
| 62|  bbarham1p@wisc.edu|   Netherlands|
| 67|lbeavors1u@techno...|United Kingdom|
| 70|fdresse1x@bloglin...|United Kingdom|
| 91|aeplate2i@webeden...|United Kingdom|
|105|linfante2w@telegr...|   Netherlands|
|108|  rpartkya2z@cdc.gov|   Netherlands|
|109|    mdory30@uiuc.edu|   Netherlands|
|110|rswindle31@tmall.com|   Netherlands|
|124|wdarco3f@geocitie...|   Netherlands|
|128|vnapthine3j@ning.com|   Netherlands|
|165|hheinecke4k@alter...|   Netherlands|
|177|awissbey4w@geocit...|United Kingdom|
|189|     etsar58@ovh.net|   Netherlands|
|194| igreated5d@live.com|   Netherlands|
|197|  pvolette5g@ask.com|United Kingdom|
+---+--------------------+--------

In [31]:
remove_credit_card = ColumnRemover(columns_to_remove=["cc_n"])
transformed_client_data = remove_credit_card._transform(client_data)

In [32]:
transformed_client_data.show()

+---+--------------------+--------------------+
| id|               btc_a|                cc_t|
+---+--------------------+--------------------+
|  1|1wjtPamAZeGhRnZfh...|       visa-electron|
|  2|1Js9BA1rV31hJFmN2...|                 jcb|
|  3|1CoG9ciLQVQCnia5o...| diners-club-enroute|
|  4|1GNvinVKGzPBVNZSc...|              switch|
|  5|1DHTzZ7ypu3EzWtLB...|                 jcb|
|  6|1LWktvit3XBCJNrsj...|                 jcb|
|  7|1J71SRGqUjhqPuHaZ...|              switch|
|  8|1Q5FAwgXbhRxP1uYp...|          mastercard|
|  9|1QKy8RoeWR48nrwkn...|diners-club-carte...|
| 10|1NRDQBCtuDqm8Qomr...|diners-club-carte...|
| 11|1HcqQ5Ys77sJm3ZJv...|                visa|
| 12|1EncEr6Vd5ywk96un...|                 jcb|
| 13|14bMXV3h1S6KxGHde...|                 jcb|
| 14|1Gi1ZJsBDqCztVjtc...|            bankcard|
| 15|1GnNjsnbBTw6w9WHn...|                 jcb|
| 16|17y4HG6vY9wDZmeu5...|                 jcb|
| 17|14reD6Z1kUjg8QC5Y...|                 jcb|
| 18|1ErM8yuF3ytzzxLy1...|      china-un

In [36]:
trasf = transformed_clients.join(transformed_client_data, on="id")

In [44]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
import pyspark.sql.functions as F

class RenameColumns(Transformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable,):

    column_dict = Param(
        Params._dummy(),
        "column_dict",
        "What countries to keep.",
    )

    @keyword_only
    def __init__(self, column_dict = {}):
        super(RenameColumns, self).__init__()
        self._setDefault(column_dict=['first_name', 'last_name'])
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def getColumnDict(self):
        """
        Gets the value of :py:attr:`value` or its default value.
        """
        return self.getOrDefault(self.column_dict)

    def _transform(self, dataset):
        column_mapping = self.getColumnDict()
        all_columns = dataset.columns
        renamed_columns = [F.col(column).alias(column_mapping[column]) if column in column_mapping else F.col(column) for column in all_columns]
        return dataset.select(renamed_columns)

In [45]:
column_renamer = RenameColumns(column_dict={"id":"client_identifier", "btc_a":"bitcoin"})
output = column_renamer.transform(trasf)

In [46]:
output.show()

+-----------------+--------------------+--------------+--------------------+--------------------+
|client_identifier|               email|       country|             bitcoin|                cc_t|
+-----------------+--------------------+--------------+--------------------+--------------------+
|               18|rdrinanh@odnoklas...|United Kingdom|1ErM8yuF3ytzzxLy1...|      china-unionpay|
|               32|wbamfordv@t-onlin...|United Kingdom|12sxmYnPcADAXw1Yk...|             maestro|
|               33|swestallw@blinkli...|United Kingdom|1GZ7QB7GUFSWnkBHm...|          mastercard|
|               34|erosengrenx@usato...|United Kingdom|12o8zrHx6snCPbtko...|       visa-electron|
|               36|dbuckthorpz@tmall...|   Netherlands|15X53Z9B9jUNrvFpb...|diners-club-inter...|
|               62|  bbarham1p@wisc.edu|   Netherlands|16qpYVt6YAAx4JYjz...|                 jcb|
|               67|lbeavors1u@techno...|United Kingdom|12ya1ED93ApPBQRSC...|            bankcard|
|               70|f