In [5]:
# Disable warnings, set Matplotlib inline plotting and load Pandas package
import warnings
warnings.filterwarnings('ignore')

%matplotlib inline
import pandas as pd



In [8]:
#!python -m pip install findspark

Collecting findspark
  Using cached findspark-1.1.0-py2.py3-none-any.whl
Installing collected packages: findspark
Successfully installed findspark-1.1.0


You are using pip version 9.0.1, however version 9.0.3 is available.
You should consider upgrading via the 'python -m pip install --upgrade pip' command.


In [9]:
import findspark

findspark.init()

In [10]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession


spark = SparkSession\
        .builder\
        .appName("Transaction_Classification")\
        .config('spark.sql.warehouse.dir','file:///home/sudipto21048867/Pyspark/spark-warehouse')\
        .getOrCreate()

In [12]:
TransactionDf = spark.read.csv('F:/Llyods/Phase_1_codes/Phase_1_codes/TransactionSummaryLabelled.csv',                            
                          header='true', 
                          inferSchema='true', encoding='ISO-8859-1')

In [13]:
TransactionDf.show()

+---------+----+--------------------+-------+--------+---------+--------+--------------------+
|     Date|Type|         Description|Paid in|Paid out|  Balance|category|   Detailed Category|
+---------+----+--------------------+-------+--------+---------+--------+--------------------+
|25-Aug-17| POS|3818 23AUG17 , PA...|      -|   365.3|   £89.02|  Travel|            Agencies|
|25-Aug-17| POS|3818 23AUG17 , PA...|      -|   365.3|   £89.02|  Travel|            Agencies|
|22-Nov-17| POS|3818 21NOV17 , PA...|      -|   14.75|  £184.37|  Retail|            Clothing|
|18-Apr-17| POS|1177 13APR17 CD ,...|      -|    24.5|  £159.76|  Retail|            Clothing|
| 6-Feb-17| POS|3818 02FEB17 C , ...|      -|       2|  £737.19|  Retail|            Clothing|
| 7-Dec-16| POS|3818 05DEC16 , PR...|      -|      62|  £525.51|  Retail|            Clothing|
| 8-Feb-17| POS|3818 06FEB17 , PR...|      -|    32.3|    £9.41|  Retail|            Clothing|
|10-Jul-17| POS|3818 07JUL17 , PR...|      -|   11

In [14]:
TransactionDf.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Paid in: string (nullable = true)
 |-- Paid out: string (nullable = true)
 |-- Balance: string (nullable = true)
 |-- category: string (nullable = true)
 |-- Detailed Category: string (nullable = true)



In [15]:
from pyspark.sql.types import FloatType
from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction as udf
from pyspark.sql.functions import expr

In [16]:
paid_in_cleaned = udf(lambda y: y.replace(',', '') if y!='-' else '0' , StringType())
paid_out_cleaned = udf(lambda y: y.replace(',', '') if y!='-' else '0', StringType())
#paid_in_out_cleaned = udf(lambda k: paid_in_out_clean[k,l], FloatType(),FloatType())
#paid_paid_cleaned = udf(lambda k: paid_fxinal_clean[k,l], FloatType())

In [17]:
TransactionDf = TransactionDf.withColumn('Paid in',paid_in_cleaned(TransactionDf['Paid in']))\
                              .withColumn('Paid out',paid_out_cleaned(TransactionDf['Paid out']))

In [18]:
#TransactionDf.show()
def string_to_float(x):
    return float(x)

udfstring_to_float = udf(string_to_float, FloatType())


TransactionDf = TransactionDf.withColumn('Paid in',udfstring_to_float(TransactionDf['Paid in']))\
                              .withColumn('Paid out',udfstring_to_float(TransactionDf['Paid out']))

In [19]:
TransactionDf.show()

+---------+----+--------------------+-------+--------+---------+--------+--------------------+
|     Date|Type|         Description|Paid in|Paid out|  Balance|category|   Detailed Category|
+---------+----+--------------------+-------+--------+---------+--------+--------------------+
|25-Aug-17| POS|3818 23AUG17 , PA...|    0.0|   365.3|   £89.02|  Travel|            Agencies|
|25-Aug-17| POS|3818 23AUG17 , PA...|    0.0|   365.3|   £89.02|  Travel|            Agencies|
|22-Nov-17| POS|3818 21NOV17 , PA...|    0.0|   14.75|  £184.37|  Retail|            Clothing|
|18-Apr-17| POS|1177 13APR17 CD ,...|    0.0|    24.5|  £159.76|  Retail|            Clothing|
| 6-Feb-17| POS|3818 02FEB17 C , ...|    0.0|     2.0|  £737.19|  Retail|            Clothing|
| 7-Dec-16| POS|3818 05DEC16 , PR...|    0.0|    62.0|  £525.51|  Retail|            Clothing|
| 8-Feb-17| POS|3818 06FEB17 , PR...|    0.0|    32.3|    £9.41|  Retail|            Clothing|
|10-Jul-17| POS|3818 07JUL17 , PR...|    0.0|   11

In [20]:
from  pyspark.sql.functions import abs
def final_paid(x,y):
    return float(x-y)

final_paid = udf(final_paid, FloatType())

TransactionDf = TransactionDf.withColumn('final_paid',final_paid(TransactionDf['Paid in'],TransactionDf['Paid out']))

TransactionDf = TransactionDf.withColumn('final_paid',abs(TransactionDf['final_paid']))
                              

In [21]:
TransactionDf.show()

+---------+----+--------------------+-------+--------+---------+--------+--------------------+----------+
|     Date|Type|         Description|Paid in|Paid out|  Balance|category|   Detailed Category|final_paid|
+---------+----+--------------------+-------+--------+---------+--------+--------------------+----------+
|25-Aug-17| POS|3818 23AUG17 , PA...|    0.0|   365.3|   £89.02|  Travel|            Agencies|     365.3|
|25-Aug-17| POS|3818 23AUG17 , PA...|    0.0|   365.3|   £89.02|  Travel|            Agencies|     365.3|
|22-Nov-17| POS|3818 21NOV17 , PA...|    0.0|   14.75|  £184.37|  Retail|            Clothing|     14.75|
|18-Apr-17| POS|1177 13APR17 CD ,...|    0.0|    24.5|  £159.76|  Retail|            Clothing|      24.5|
| 6-Feb-17| POS|3818 02FEB17 C , ...|    0.0|     2.0|  £737.19|  Retail|            Clothing|       2.0|
| 7-Dec-16| POS|3818 05DEC16 , PR...|    0.0|    62.0|  £525.51|  Retail|            Clothing|      62.0|
| 8-Feb-17| POS|3818 06FEB17 , PR...|    0.0| 

In [22]:
TransactionDf.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Paid in: float (nullable = true)
 |-- Paid out: float (nullable = true)
 |-- Balance: string (nullable = true)
 |-- category: string (nullable = true)
 |-- Detailed Category: string (nullable = true)
 |-- final_paid: float (nullable = true)



In [23]:
import numpy as np



def BinningData(TransactionDf,bins):
    max_val =TransactionDf.select(TransactionDf.final_paid.cast('float')).rdd.max()[0] 
    min_val =TransactionDf.select(TransactionDf.final_paid.cast('float')).rdd.min()[0] -1  ##??
    custom_bucket_array = np.linspace(min_val, max_val, 4)
    cut_points = list(custom_bucket_array)
    return cut_points


#col('house name'), rawdata.price.cast('float')

In [24]:
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=BinningData(TransactionDf,4),inputCol="final_paid", outputCol="final_paid_bin")
TransactionDf = bucketizer.transform(TransactionDf)

TransactionDf.show()

+---------+----+--------------------+-------+--------+---------+--------+--------------------+----------+--------------+
|     Date|Type|         Description|Paid in|Paid out|  Balance|category|   Detailed Category|final_paid|final_paid_bin|
+---------+----+--------------------+-------+--------+---------+--------+--------------------+----------+--------------+
|25-Aug-17| POS|3818 23AUG17 , PA...|    0.0|   365.3|   £89.02|  Travel|            Agencies|     365.3|           0.0|
|25-Aug-17| POS|3818 23AUG17 , PA...|    0.0|   365.3|   £89.02|  Travel|            Agencies|     365.3|           0.0|
|22-Nov-17| POS|3818 21NOV17 , PA...|    0.0|   14.75|  £184.37|  Retail|            Clothing|     14.75|           0.0|
|18-Apr-17| POS|1177 13APR17 CD ,...|    0.0|    24.5|  £159.76|  Retail|            Clothing|      24.5|           0.0|
| 6-Feb-17| POS|3818 02FEB17 C , ...|    0.0|     2.0|  £737.19|  Retail|            Clothing|       2.0|           0.0|
| 7-Dec-16| POS|3818 05DEC16 , P

In [25]:
pd.DataFrame(TransactionDf.take(5),columns=TransactionDf.columns).transpose()

Unnamed: 0,0,1,2,3,4
Date,25-Aug-17,25-Aug-17,22-Nov-17,18-Apr-17,6-Feb-17
Type,POS,POS,POS,POS,POS
Description,"3818 23AUG17 , PAYPAL *AIRBNB , HMRNB4 , 35314...","3818 23AUG17 , PAYPAL *AIRBNB , HMRNB4 , 35314...","3818 21NOV17 , PAYPAL , *DECATHLONUK , 3531436...","1177 13APR17 CD , PRIMARK 674 , LONDON GB","3818 02FEB17 C , PRIMARK 671 , STRATFORD GB"
Paid in,0,0,0,0,0
Paid out,365.3,365.3,14.75,24.5,2
Balance,£89.02,£89.02,£184.37,£159.76,£737.19
category,Travel,Travel,Retail,Retail,Retail
Detailed Category,Agencies,Agencies,Clothing,Clothing,Clothing
final_paid,365.3,365.3,14.75,24.5,2
final_paid_bin,0,0,0,0,0


In [None]:
def BinningData(TransactionDf, bins):
    min_val = min((TransactionDf['final_paid'])-1)
    max_val = max(TransactionDf['final_paid'])
    print(min_val, max_val)
    custom_bucket_array = np.linspace(min_val, max_val, bins)
    cut_points = list(custom_bucket_array)
    group_name = ["low","medium","high"]
    TransactionDf["final_paid_bin"] = pd.cut(TransactionDf["final_paid"], cut_points, labels=group_name)
    
    return TransactionDf