# <font color = brown> Cred Financials ETL from RDS to Hbase (PySpark) </font>

#### Table of contents<a id='top'></a>
> 1. [Introduction](#intro)
> 2. [PySpark Setup](#iml)
> 3. [Importing the Data from HDFS](#rud)
> 4.[Calculating Upper Control Limit(UCL)](#ucl)<br>
> 5. [Creation of Lookup Table](#cdt)

### 1. INTRODUCTION <a id='intro'></a>

- Credit card fraud is defined as a form of identity theft in which an individual uses someone else’s credit card information to make purchases or to withdraw funds from the account. <br>
- As a big data engineer, architect and build a solution to cater to the following requirements:
1.	**Fraud detection solution:** This is a feature to detect fraudulent transactions, wherein once a card member swipes their card for payment, the transaction is classified as fraudulent or authentic based on a set of predefined rules. If fraud is detected, then the transaction must be declined. Please note that incorrectly classifying a transaction as fraudulent will incur huge losses to the company and also provoke negative consumer sentiment. 
2.	**Customer information:** The relevant information about the customers’ needs to be continuously updated on a platform from where the customer support team can retrieve relevant information in real-time to resolve customer complaints and queries.

[go to top](#top)

#### 1.2. Objective 
1. Extracting the transactional data from a given MySQL RDS server to HDFS(EC2) instance using Sqoop.

2. Loading Data into NoSQL Database :
> a. Loading given history data(card_transactions.csv) into NoSQL DB (HBase).<br>
> b. Transforming the transactional data according to the given target schema(lookup table) using PySpark.<br> 
> c. The transformed data is to be loaded to NoSQL DB (HBase).

 ###  2. PySpark Setup <a id='iml'></a>

In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('CCFD_Project').getOrCreate()
spark

In [4]:
sc = spark.sparkContext

In [5]:
sc

In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

In [7]:
import pyspark.sql.functions as f

###  3. Importing the Credit Card Data from HDFS <a id='rud'></a>

**Strategy :** 
 1. Load data into dataframes using the files on HDFS with a well-defined schema.
 > a. card_member <br>
 > b. member_score <br>
 > c. card_transactions <br>
 2. Verify Schema and total count. 
 3. Create Staging Dataframes to get the relevant dataframe for lookup table creation :
 > a. Join card_df and member_df and transaction_df to get the relevant card details.<br>
 > b. Calculating Upper Control Limit(UCL), moving average and standard deviation.<br>
 4. Loading LookUpTable dataframe with relevant columns into HBase :
 > a. HBase connection setup <br>
 > b. Create table<br>
 > c. put data into HBase (look_up_table)Table.
 > d. Count the number of records in the table.

[go to top](#top)

In [8]:
# Defining StructType for each column based on Schema

In [9]:
cardschema = StructType([StructField('card_id', StringType(),False),
                        StructField('member_id', StringType(),False),
                        StructField('member_joining_dt', StringType(),False),
                        StructField('card_purchase_dt', StringType(),False),
                        StructField('country', StringType(),False),
                        StructField('city', StringType(),False), 
                        ])

In [10]:
card_df = spark.read.csv("hdfs:/user/root/CCFD_Project/card_member", header = False, schema = cardschema)

In [11]:
#Checking number of records loaded from HDFS
card_df.count()

999

- **Validation** :
  As expected there are **999** records in card_member table retrieved from AWS RDS using Sqoop

In [12]:
card_df.printSchema()

root
 |-- card_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- member_joining_dt: string (nullable = true)
 |-- card_purchase_dt: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)



In [13]:
card_df.columns

['card_id',
 'member_id',
 'member_joining_dt',
 'card_purchase_dt',
 'country',
 'city']

In [14]:
from pyspark.sql.functions import *
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window
from pyspark.sql import functions as sf

In [15]:
card_df.show()

+---------------+---------------+--------------------+----------------+-------------+-----------------+
|        card_id|      member_id|   member_joining_dt|card_purchase_dt|      country|             city|
+---------------+---------------+--------------------+----------------+-------------+-----------------+
|340028465709212|009250698176266|2012-02-08 06:04:...|           05/13|United States|        Barberton|
|340054675199675|835873341185231|2017-03-10 09:24:...|           03/17|United States|       Fort Dodge|
|340082915339645|512969555857346|2014-02-15 06:30:...|           07/14|United States|           Graham|
|340134186926007|887711945571282|2012-02-05 01:21:...|           02/13|United States|        Dix Hills|
|340265728490548|680324265406190|2014-03-29 07:49:...|           11/14|United States| Rancho Cucamonga|
|340268219434811|929799084911715|2012-07-08 02:46:...|           08/12|United States|    San Francisco|
|340379737226464|089615510858348|2010-03-10 00:06:...|          

In [16]:
##Reading Member score file

In [17]:
memberschema = StructType([StructField('member_id', StringType(),False),
                        StructField('score', IntegerType(),False),
                        ])

In [18]:
member_df = spark.read.csv("hdfs:/user/root/CCFD_Project/member_score", header = False, schema = memberschema)

In [19]:
member_df.count()

999

- **Validation** :
  As expected there are **999** records in card_member table retrieved from AWS RDS using Sqoop

In [20]:
member_df.show()

+---------------+-----+
|      member_id|score|
+---------------+-----+
|000037495066290|  339|
|000117826301530|  289|
|001147922084344|  393|
|001314074991813|  225|
|001739553947511|  642|
|003761426295463|  413|
|004494068832701|  217|
|006836124210484|  504|
|006991872634058|  697|
|007955566230397|  372|
|008732267588672|  213|
|008765307152821|  399|
|009136568025042|  308|
|009190444424572|  559|
|009250698176266|  233|
|009873334520465|  298|
|011716573646690|  249|
|011877954983420|  497|
|012390918683920|  407|
|012731668664932|  612|
+---------------+-----+
only showing top 20 rows



- Renaming member_id as mem_id as it conflicts to select specific column after joining. And as we need member id from member_score file

In [21]:
member_df = member_df.withColumnRenamed('member_id','mem_id')

In [22]:
member_df.printSchema()

root
 |-- mem_id: string (nullable = true)
 |-- score: integer (nullable = true)



In [23]:
member_df.columns

['mem_id', 'score']

In [24]:
transasction = StructType([StructField('card_id', StringType(),False),
                        StructField('member_id', StringType(),False),
                        StructField('amount', IntegerType(),False), 
                        StructField('postcode', StringType(),False),
                        StructField('pos_id', StringType(),False),
                        StructField('transaction_dt', StringType(),False), 
                        StructField('status', StringType(),False), 
                        ])

In [25]:
transaction_df = spark.read.csv("hdfs:/user/root/CCFD_Project/card_transactions/card_transactions.csv", header = True, schema = transasction)

In [26]:
transaction_df.count()

53292

- **Validation** :
  As expected there are **53292** records in card_transactions.csv table given for rule evaluation and updation

In [27]:
transaction_df_g= transaction_df.filter(transaction_df.status!='FRAUD')

In [28]:
transaction_df_g.count()

53210

In [29]:
transaction_df_g.show()

+---------------+---------------+-------+--------+---------------+-------------------+-------+
|        card_id|      member_id| amount|postcode|         pos_id|     transaction_dt| status|
+---------------+---------------+-------+--------+---------------+-------------------+-------+
|348702330256514|000037495066290|9084849|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290| 330148|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290| 136052|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290|4310362|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290|9097094|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290|2291118|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290|4900011|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290| 633447|   33946|

In [30]:
score = member_df.join(card_df, member_df.mem_id == card_df.member_id,how='LEFT')  

In [31]:
score.count()

999

In [32]:
score.printSchema()

root
 |-- mem_id: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- card_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- member_joining_dt: string (nullable = true)
 |-- card_purchase_dt: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)



[go to top](#top)

In [33]:
score = score.select('mem_id', 'score', 'card_id')

In [34]:
score.show()

+---------------+-----+----------------+
|         mem_id|score|         card_id|
+---------------+-----+----------------+
|000037495066290|  339| 348702330256514|
|000117826301530|  289|5189563368503974|
|001147922084344|  393|5407073344486464|
|001314074991813|  225| 378303738095292|
|001739553947511|  642| 348413196172048|
|003761426295463|  413| 348536585266345|
|004494068832701|  217|5515987071565183|
|006836124210484|  504|5400251558458125|
|006991872634058|  697|4573337022888445|
|007955566230397|  372|4708912758619517|
|008732267588672|  213|5342400571435088|
|008765307152821|  399|4237648081700588|
|009136568025042|  308| 371814781663843|
|009190444424572|  559| 341363858179050|
|009250698176266|  233| 340028465709212|
|009873334520465|  298|5495445301620991|
|011716573646690|  249|4795844193055110|
|011877954983420|  497|5164771396791995|
|012390918683920|  407|5423921058459194|
|012731668664932|  612|5379610024035907|
+---------------+-----+----------------+
only showing top

In [35]:
member_df = member_df.withColumnRenamed('mem_id','member_id')

In [36]:
member_df.show()

+---------------+-----+
|      member_id|score|
+---------------+-----+
|000037495066290|  339|
|000117826301530|  289|
|001147922084344|  393|
|001314074991813|  225|
|001739553947511|  642|
|003761426295463|  413|
|004494068832701|  217|
|006836124210484|  504|
|006991872634058|  697|
|007955566230397|  372|
|008732267588672|  213|
|008765307152821|  399|
|009136568025042|  308|
|009190444424572|  559|
|009250698176266|  233|
|009873334520465|  298|
|011716573646690|  249|
|011877954983420|  497|
|012390918683920|  407|
|012731668664932|  612|
+---------------+-----+
only showing top 20 rows



In [37]:
member_df.count()

999

In [38]:
score = score.withColumnRenamed('card_id','cardid')

In [39]:
score.show()

+---------------+-----+----------------+
|         mem_id|score|          cardid|
+---------------+-----+----------------+
|000037495066290|  339| 348702330256514|
|000117826301530|  289|5189563368503974|
|001147922084344|  393|5407073344486464|
|001314074991813|  225| 378303738095292|
|001739553947511|  642| 348413196172048|
|003761426295463|  413| 348536585266345|
|004494068832701|  217|5515987071565183|
|006836124210484|  504|5400251558458125|
|006991872634058|  697|4573337022888445|
|007955566230397|  372|4708912758619517|
|008732267588672|  213|5342400571435088|
|008765307152821|  399|4237648081700588|
|009136568025042|  308| 371814781663843|
|009190444424572|  559| 341363858179050|
|009250698176266|  233| 340028465709212|
|009873334520465|  298|5495445301620991|
|011716573646690|  249|4795844193055110|
|011877954983420|  497|5164771396791995|
|012390918683920|  407|5423921058459194|
|012731668664932|  612|5379610024035907|
+---------------+-----+----------------+
only showing top

In [40]:
history = transaction_df_g.join(score, transaction_df_g.member_id == score.mem_id,how='outer')  

In [41]:
history.count()

53210

In [42]:
history.printSchema()

root
 |-- card_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- postcode: string (nullable = true)
 |-- pos_id: string (nullable = true)
 |-- transaction_dt: string (nullable = true)
 |-- status: string (nullable = true)
 |-- mem_id: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- cardid: string (nullable = true)



In [43]:
history = history.select('card_id', 'amount', 'postcode', 'pos_id','transaction_dt','status','score')

In [44]:
history.show()

+---------------+-------+--------+---------------+-------------------+-------+-----+
|        card_id| amount|postcode|         pos_id|     transaction_dt| status|score|
+---------------+-------+--------+---------------+-------------------+-------+-----+
|340379737226464|6126197|   46933|167473544283898|01-05-2016 08:10:50|GENUINE|  229|
|340379737226464|7949232|   61840|664980919335952|01-10-2016 10:38:52|GENUINE|  229|
|340379737226464| 943839|   91743|633038040069180|02-08-2016 00:31:25|GENUINE|  229|
|340379737226464|3764114|   91743|633038040069180|02-08-2016 21:35:27|GENUINE|  229|
|340379737226464|6221251|   98384|064948657945290|02-10-2016 14:44:14|GENUINE|  229|
|340379737226464|2868312|   26032|856772774421259|02-12-2016 21:55:43|GENUINE|  229|
|340379737226464|4418586|   20129|390339673634463|02-12-2017 17:05:51|GENUINE|  229|
|340379737226464|7439113|   91763|315067016872305|03-04-2017 11:43:59|GENUINE|  229|
|340379737226464|8217180|   16063|208378790148728|03-05-2017 16:4

In [45]:
timeform = 'dd-MM-yyyy HH:mm:ss'
historyy = history.withColumn('transaction_date',f.unix_timestamp(history['transaction_dt'],timeform).cast('timestamp'))

In [46]:
historyy.show()

+---------------+-------+--------+---------------+-------------------+-------+-----+-------------------+
|        card_id| amount|postcode|         pos_id|     transaction_dt| status|score|   transaction_date|
+---------------+-------+--------+---------------+-------------------+-------+-----+-------------------+
|340379737226464|6126197|   46933|167473544283898|01-05-2016 08:10:50|GENUINE|  229|2016-05-01 08:10:50|
|340379737226464|7949232|   61840|664980919335952|01-10-2016 10:38:52|GENUINE|  229|2016-10-01 10:38:52|
|340379737226464| 943839|   91743|633038040069180|02-08-2016 00:31:25|GENUINE|  229|2016-08-02 00:31:25|
|340379737226464|3764114|   91743|633038040069180|02-08-2016 21:35:27|GENUINE|  229|2016-08-02 21:35:27|
|340379737226464|6221251|   98384|064948657945290|02-10-2016 14:44:14|GENUINE|  229|2016-10-02 14:44:14|
|340379737226464|2868312|   26032|856772774421259|02-12-2016 21:55:43|GENUINE|  229|2016-12-02 21:55:43|
|340379737226464|4418586|   20129|390339673634463|02-12

- transaction_dt can be dropped as new transaction_date column with proper datatpye of timestamp is been added to the dataframe

In [47]:
historyy = historyy.drop('transaction_dt')

In [48]:
historyy.show()

+---------------+-------+--------+---------------+-------+-----+-------------------+
|        card_id| amount|postcode|         pos_id| status|score|   transaction_date|
+---------------+-------+--------+---------------+-------+-----+-------------------+
|340379737226464|6126197|   46933|167473544283898|GENUINE|  229|2016-05-01 08:10:50|
|340379737226464|7949232|   61840|664980919335952|GENUINE|  229|2016-10-01 10:38:52|
|340379737226464| 943839|   91743|633038040069180|GENUINE|  229|2016-08-02 00:31:25|
|340379737226464|3764114|   91743|633038040069180|GENUINE|  229|2016-08-02 21:35:27|
|340379737226464|6221251|   98384|064948657945290|GENUINE|  229|2016-10-02 14:44:14|
|340379737226464|2868312|   26032|856772774421259|GENUINE|  229|2016-12-02 21:55:43|
|340379737226464|4418586|   20129|390339673634463|GENUINE|  229|2017-12-02 17:05:51|
|340379737226464|7439113|   91763|315067016872305|GENUINE|  229|2017-04-03 11:43:59|
|340379737226464|8217180|   16063|208378790148728|GENUINE|  229|2

[go to top](#top)

## Collecting Transaction Date & Post Code of Last Transaction

In [49]:
look_up_table = historyy.groupBy('card_id').agg(f.max("transaction_date").alias('transaction_date'))

In [50]:
look_up_table.show()

+----------------+-------------------+
|         card_id|   transaction_date|
+----------------+-------------------+
| 345406224887566|2017-12-25 04:03:58|
|6562510549485881|2018-01-17 08:35:27|
| 377201318164757|2017-11-28 16:32:22|
|4389973676463558|2018-01-26 13:47:46|
|5508842242491554|2018-01-31 14:55:58|
| 340379737226464|2018-01-27 00:19:47|
| 348962542187595|2018-01-29 17:17:14|
|5403923427969691|2018-01-22 23:46:19|
|4407230633003235|2018-01-27 07:21:08|
| 379321864695232|2018-01-03 00:29:37|
| 340028465709212|2018-01-02 03:25:35|
|4126356979547079|2018-01-24 16:09:03|
|5543219113990484|2018-01-13 18:34:00|
|4818950814628962|2018-01-31 00:53:15|
|5573293264792992|2018-01-31 14:55:57|
|6011985140563103|2018-01-30 02:03:54|
| 349143706735646|2018-01-29 22:33:14|
|5464688416792307|2018-01-26 19:03:47|
|6011273561157733|2018-02-01 01:27:58|
|4484950467600170|2018-01-10 08:03:13|
+----------------+-------------------+
only showing top 20 rows



In [51]:
look_up_table.count()

999

In [52]:
look_up_table = look_up_table.join(score, look_up_table.card_id == score.cardid,how='INNER')

In [53]:
look_up_table.count()

999

In [54]:
look_up_table.show()

+----------------+-------------------+---------------+-----+----------------+
|         card_id|   transaction_date|         mem_id|score|          cardid|
+----------------+-------------------+---------------+-----+----------------+
| 340379737226464|2018-01-27 00:19:47|089615510858348|  229| 340379737226464|
| 345406224887566|2017-12-25 04:03:58|296206661780881|  349| 345406224887566|
| 348962542187595|2018-01-29 17:17:14|366246487993992|  522| 348962542187595|
| 377201318164757|2017-11-28 16:32:22|924475891017022|  432| 377201318164757|
| 379321864695232|2018-01-03 00:29:37|082567374418739|  297| 379321864695232|
|4389973676463558|2018-01-26 13:47:46|295554828848966|  400|4389973676463558|
|4407230633003235|2018-01-27 07:21:08|761335698364860|  567|4407230633003235|
|5403923427969691|2018-01-22 23:46:19|922077754605834|  324|5403923427969691|
|5508842242491554|2018-01-31 14:55:58|634200295989311|  585|5508842242491554|
|6562510549485881|2018-01-17 08:35:27|659982919406634|  518|6562

In [55]:
look_up_table = look_up_table.select('card_id','transaction_date')

In [56]:
look_up_table.show(20)

+----------------+-------------------+
|         card_id|   transaction_date|
+----------------+-------------------+
| 340379737226464|2018-01-27 00:19:47|
| 345406224887566|2017-12-25 04:03:58|
| 348962542187595|2018-01-29 17:17:14|
| 377201318164757|2017-11-28 16:32:22|
| 379321864695232|2018-01-03 00:29:37|
|4389973676463558|2018-01-26 13:47:46|
|4407230633003235|2018-01-27 07:21:08|
|5403923427969691|2018-01-22 23:46:19|
|5508842242491554|2018-01-31 14:55:58|
|6562510549485881|2018-01-17 08:35:27|
| 340028465709212|2018-01-02 03:25:35|
| 349143706735646|2018-01-29 22:33:14|
|4126356979547079|2018-01-24 16:09:03|
|4484950467600170|2018-01-10 08:03:13|
|4818950814628962|2018-01-31 00:53:15|
|5464688416792307|2018-01-26 19:03:47|
|5543219113990484|2018-01-13 18:34:00|
|5573293264792992|2018-01-31 14:55:57|
|6011273561157733|2018-02-01 01:27:58|
|6011985140563103|2018-01-30 02:03:54|
+----------------+-------------------+
only showing top 20 rows



In [57]:
look_up_table = look_up_table.join(historyy,on=['card_id','transaction_date'],how='left')
look_up_table.count()

1476

In [58]:
look_up_table.show()

+----------------+-------------------+-------+--------+---------------+-------+-----+
|         card_id|   transaction_date| amount|postcode|         pos_id| status|score|
+----------------+-------------------+-------+--------+---------------+-------+-----+
| 378586484293754|2017-12-24 05:14:37|3859271|   24363|753115024049849|GENUINE|  337|
|4356201405998945|2018-01-24 14:23:42|4553231|   43791|339439168301190|GENUINE|  600|
|4418227862530505|2018-01-25 16:43:45|4085014|   14544|028630406062180|GENUINE|  318|
|5400249950855567|2018-01-28 06:10:31|1062269|   24966|757227694469394|GENUINE|  523|
| 373748808330229|2018-01-29 13:46:32|2446006|   25260|459926365561014|GENUINE|  685|
|4353614029446427|2018-01-10 23:51:13|2713094|   15311|791335648163958|GENUINE|  219|
|4598225659063187|2018-01-25 21:59:45| 421272|   50531|657401894365206|GENUINE|  355|
|4689314809377828|2018-01-25 21:59:45|1151530|   29550|365821079545471|GENUINE|  632|
|5447036761675606|2017-11-16 23:38:38| 566003|   32970

In [59]:
look_up_table = look_up_table.select('card_id','transaction_date','score','postcode')

In [60]:
look_up_table.show()

+----------------+-------------------+-----+--------+
|         card_id|   transaction_date|score|postcode|
+----------------+-------------------+-----+--------+
| 378586484293754|2017-12-24 05:14:37|  337|   24363|
|4356201405998945|2018-01-24 14:23:42|  600|   43791|
|4418227862530505|2018-01-25 16:43:45|  318|   14544|
|5400249950855567|2018-01-28 06:10:31|  523|   24966|
| 373748808330229|2018-01-29 13:46:32|  685|   25260|
|4353614029446427|2018-01-10 23:51:13|  219|   15311|
|4598225659063187|2018-01-25 21:59:45|  355|   50531|
|4689314809377828|2018-01-25 21:59:45|  632|   29550|
|5447036761675606|2017-11-16 23:38:38|  690|   32970|
|5508842242491554|2018-01-31 14:55:58|  585|   12986|
|5572427538311236|2018-01-31 20:11:58|  303|   91040|
|6011654527329500|2018-01-31 00:53:16|  683|   58634|
| 347893423075811|2018-01-24 02:06:21|  429|   15532|
| 371085417506954|2018-01-28 14:57:11|  599|   19468|
|5316831626197194|2018-01-29 13:46:32|  227|   40488|
|6011027251671860|2018-01-28

[go to top](#top)

###  4. Calculating Upper Control Limit (UCL) <a id='ucl'></a>

- using window functions and rank function filteration can be done and moving average can be calculated for the last 10 transactions for every cardholder

In [61]:
window = Window.partitionBy(historyy['card_id']).orderBy(historyy['transaction_date'].desc())

history_df = historyy.select('*', f.rank().over(window).alias('rank')).filter(f.col('rank') <= 10)

In [62]:
history_df.show()

+---------------+-------+--------+---------------+-------+-----+-------------------+----+
|        card_id| amount|postcode|         pos_id| status|score|   transaction_date|rank|
+---------------+-------+--------+---------------+-------+-----+-------------------+----+
|340379737226464|1784098|   26656|000383013889790|GENUINE|  229|2018-01-27 00:19:47|   1|
|340379737226464|3759577|   61334|016312401940277|GENUINE|  229|2018-01-18 14:26:09|   2|
|340379737226464|4080612|   51338|562082278231631|GENUINE|  229|2018-01-14 20:54:02|   3|
|340379737226464|4242710|   96105|285501971776349|GENUINE|  229|2018-01-11 19:09:55|   4|
|340379737226464|9061517|   40932|232455833079472|GENUINE|  229|2018-01-10 20:20:33|   5|
|340379737226464| 102248|   40932|232455833079472|GENUINE|  229|2018-01-10 15:04:33|   6|
|340379737226464|7445128|   50455|915439934619047|GENUINE|  229|2018-01-07 23:52:27|   7|
|340379737226464|5706163|   50455|915439934619047|GENUINE|  229|2018-01-07 22:07:07|   8|
|340379737

In [63]:
history_df = history_df.groupBy("card_id").agg(f.round(f.avg('amount'),2).alias('moving_avg'), \
                                                                      f.round(f.stddev('amount'),2).alias('Std_Dev'))
history_df.show()

+----------------+----------+----------+
|         card_id|moving_avg|   Std_Dev|
+----------------+----------+----------+
| 340379737226464| 5355453.1|3107063.55|
| 345406224887566| 5488456.5|3252527.52|
| 348962542187595| 5735629.0|3089916.54|
| 377201318164757| 5742377.7|2768545.84|
| 379321864695232| 4713319.1|3203114.94|
|4389973676463558| 4923904.7| 2306771.9|
|4407230633003235| 4348891.3|3274883.95|
|5403923427969691| 5375495.6|2913510.72|
|5508842242491554| 4570725.9|3229905.04|
|6562510549485881| 5551056.9|2501552.48|
| 340028465709212| 6863758.9|3326644.65|
| 349143706735646| 5453372.9|3424332.26|
|4126356979547079| 4286400.2|2909676.26|
|4484950467600170| 4550480.5|3171538.48|
|4818950814628962| 2210428.9| 958307.87|
|5464688416792307| 4985938.2|2379084.95|
|5543219113990484| 4033586.9|2969107.42|
|5573293264792992| 3929994.0|2589503.93|
|6011273561157733| 4634624.8|2801886.17|
|6011985140563103| 5302878.9| 3088988.7|
+----------------+----------+----------+
only showing top

In [64]:
history_df = history_df.withColumn('UCL',history_df.moving_avg+3*(history_df.Std_Dev))
history_df.show()

+----------------+----------+----------+--------------------+
|         card_id|moving_avg|   Std_Dev|                 UCL|
+----------------+----------+----------+--------------------+
| 340379737226464| 5355453.1|3107063.55|1.4676643749999998E7|
| 345406224887566| 5488456.5|3252527.52|       1.524603906E7|
| 348962542187595| 5735629.0|3089916.54|1.5005378620000001E7|
| 377201318164757| 5742377.7|2768545.84|1.4048015219999999E7|
| 379321864695232| 4713319.1|3203114.94|       1.432266392E7|
|4389973676463558| 4923904.7| 2306771.9|1.1844220399999999E7|
|4407230633003235| 4348891.3|3274883.95|1.4173543150000002E7|
|5403923427969691| 5375495.6|2913510.72|       1.411602776E7|
|5508842242491554| 4570725.9|3229905.04|1.4260441020000001E7|
|6562510549485881| 5551056.9|2501552.48|       1.305571434E7|
| 340028465709212| 6863758.9|3326644.65|       1.684369285E7|
| 349143706735646| 5453372.9|3424332.26|       1.572636968E7|
|4126356979547079| 4286400.2|2909676.26|       1.301542898E7|
|4484950

In [65]:
history_df = history_df.select('card_id','UCL')

In [66]:
look_up_table = look_up_table.join(history_df,on=['card_id'])

In [67]:
look_up_table.show()

+----------------+-------------------+-----+--------+--------------------+
|         card_id|   transaction_date|score|postcode|                 UCL|
+----------------+-------------------+-----+--------+--------------------+
| 340379737226464|2018-01-27 00:19:47|  229|   26656|1.4676643749999998E7|
| 345406224887566|2017-12-25 04:03:58|  349|   53034|       1.524603906E7|
| 348962542187595|2018-01-29 17:17:14|  522|   27830|1.5005378620000001E7|
| 377201318164757|2017-11-28 16:32:22|  432|   84302|1.4048015219999999E7|
| 379321864695232|2018-01-03 00:29:37|  297|   98837|       1.432266392E7|
|4389973676463558|2018-01-26 13:47:46|  400|   10985|1.1844220399999999E7|
|4407230633003235|2018-01-27 07:21:08|  567|   50167|1.4173543150000002E7|
|5403923427969691|2018-01-22 23:46:19|  324|   17350|       1.411602776E7|
|5508842242491554|2018-01-31 14:55:58|  585|   12986|1.4260441020000001E7|
|6562510549485881|2018-01-17 08:35:27|  518|   35440|       1.305571434E7|
| 340028465709212|2018-01

In [68]:
look_up_table = look_up_table.dropDuplicates((['card_id','transaction_date','postcode']))

In [69]:
look_up_table.count()

1000

###  5. Creation of Lookup Table in HBase <a id='cdt'></a>

In [70]:
import happybase
#create connection
connection = happybase.Connection('localhost', port=9090 ,autoconnect=False)

In [71]:
def open_connection():
    connection.open()
#close the opened connection
def close_connection():
    connection.close()
#list all tables in Hbase
def list_tables():
 print "fetching all table"
 open_connection()
 tables = connection.tables()
 close_connection()
 print "all tables fetched"
 return tables

In [72]:
#create the required table 
def create_table(name,cf):
 print "creating table " + name
 tables = list_tables()
 if name not in tables:
  open_connection()
  connection.create_table(name, cf)
  close_connection()
  print "table created"
 else:
  print "table already present"
#get the pointer to a table
def get_table(name):
 open_connection()
 table = connection.table(name)
 close_connection()
 return table

In [104]:
 create_table('look_up_table', {'info' : dict(max_versions=5) })

creating table look_up_table
fetching all table
all tables fetched
table created


In [105]:
#batch insert data in lookup table
def batch_insert_data(df,tableName):
 print "starting batch insert of events"
 table = get_table(tableName)
 open_connection()
 rows_count=0

#Creating a rowkey for better data query. RowKey is the cardId .
 rowKey_dict={}
 with table.batch(batch_size=4) as b:
   for row in df.rdd.collect():
    b.put(bytes(row.card_id) , { 'info:card_id':bytes(row.card_id),
                        'info:transaction_date':bytes(row.transaction_date),
                        'info:score':bytes(row.score),
                        'info:postcode':bytes(row.postcode),
                        'info:UCL':bytes(row.UCL)})

 

 print "batch insert done"
 close_connection()

In [107]:
batch_insert_data(look_up_table,'look_up_table')

starting batch insert of events
batch insert done


In [74]:
 create_table('card_transaction', {'info' : dict(max_versions=5) })

creating table card_transaction
fetching all table
all tables fetched
table created


[go to top](#top)