# **Pyspark for Credit Card Fraud Detection Project**

### Tasks performed in this sheet

* After the data is imported in HDFS we need to read the file and create Target dimension model in Pyspark
* Environment variable set up and Initiating SparkSession and creating sparkContext
* Custom Schema needs to be created to avoid any data type issues
* Perform data analysis for the dataframes
* Read the past transactions data into dataframe
* Perform necessary joins and renaming of columns for readability
* creating lookup_table and calculating transaction date and post code of last transaction
* calculating UCL
* loading this dataframe to NoSQL database


*  Import system dependencies for CDH
*  set spark context
*  import necessary libraries

In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('CreditCardTxn').master("local").getOrCreate()
spark

In [4]:
sc = spark.sparkContext

In [5]:
sc

In [6]:
#import libraries
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, DoubleType, LongType

In [7]:
import pyspark.sql.functions as f

In [8]:
from pyspark.sql.functions import *
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window
from pyspark.sql import functions as sf

* Defining Struct Types for each column based on Schema for card member details

In [9]:
cardschema = StructType([StructField('card_id', StringType(),False),
                        StructField('member_id', StringType(),False),
                        StructField('member_joining_dt', StringType(),False),
                        StructField('card_purchase_dt', StringType(),False),
                        StructField('country', StringType(),False),
                        StructField('city', StringType(),False), 
                        ])

In [11]:
#read the data
card_df = spark.read.csv("hdfs:/user/root/capstone_project/card_member", header = False, schema = cardschema)

In [12]:
#count the records in card member
card_df.count()  # 999 as expected

999

In [13]:
card_df.printSchema()

root
 |-- card_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- member_joining_dt: string (nullable = true)
 |-- card_purchase_dt: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)



In [14]:
card_df.columns

['card_id',
 'member_id',
 'member_joining_dt',
 'card_purchase_dt',
 'country',
 'city']

In [15]:
card_df.show()

+---------------+---------------+--------------------+----------------+-------------+-----------------+
|        card_id|      member_id|   member_joining_dt|card_purchase_dt|      country|             city|
+---------------+---------------+--------------------+----------------+-------------+-----------------+
|340028465709212|009250698176266|2012-02-08 06:04:...|           05/13|United States|        Barberton|
|340054675199675|835873341185231|2017-03-10 09:24:...|           03/17|United States|       Fort Dodge|
|340082915339645|512969555857346|2014-02-15 06:30:...|           07/14|United States|           Graham|
|340134186926007|887711945571282|2012-02-05 01:21:...|           02/13|United States|        Dix Hills|
|340265728490548|680324265406190|2014-03-29 07:49:...|           11/14|United States| Rancho Cucamonga|
|340268219434811|929799084911715|2012-07-08 02:46:...|           08/12|United States|    San Francisco|
|340379737226464|089615510858348|2010-03-10 00:06:...|          

In [16]:
memberschema = StructType([StructField('member_id', StringType(),False),
                        StructField('score', IntegerType(),False),
                        ])

In [17]:
#read the data
mem_df = spark.read.csv("hdfs:/user/root/capstone_project/member_score", header = False, schema = memberschema)

In [18]:
#count the records in member score
mem_df.count() #as expected 999

999

In [19]:
mem_df.show()

+---------------+-----+
|      member_id|score|
+---------------+-----+
|000037495066290|  339|
|000117826301530|  289|
|001147922084344|  393|
|001314074991813|  225|
|001739553947511|  642|
|003761426295463|  413|
|004494068832701|  217|
|006836124210484|  504|
|006991872634058|  697|
|007955566230397|  372|
|008732267588672|  213|
|008765307152821|  399|
|009136568025042|  308|
|009190444424572|  559|
|009250698176266|  233|
|009873334520465|  298|
|011716573646690|  249|
|011877954983420|  497|
|012390918683920|  407|
|012731668664932|  612|
+---------------+-----+
only showing top 20 rows



In [20]:
mem_df = mem_df.withColumnRenamed('member_id','mem_id')

In [21]:
mem_df.printSchema()

root
 |-- mem_id: string (nullable = true)
 |-- score: integer (nullable = true)



In [22]:
mem_df.columns

['mem_id', 'score']

In [23]:
transasction = StructType([StructField('card_id', StringType(),False),
                        StructField('member_id', StringType(),False),
                        StructField('amount', IntegerType(),False), 
                        StructField('postcode', StringType(),False),
                        StructField('pos_id', StringType(),False),
                        StructField('transaction_dt', StringType(),False), 
                        StructField('status', StringType(),False), 
                        ])

Reading Past Transactions data (source as csv)

In [25]:
trans_df = spark.read.csv("hdfs:/user/root/capstone_project/card_transactions.csv", header = True, schema = transasction)

In [26]:
#check the record count
trans_df.count()  #53292

53292

In [27]:
#Update to non-fraud transactions
trans_df= trans_df.filter(trans_df.status!='FRAUD')

In [28]:
trans_df.count()

53210

In [29]:
trans_df.show()

+---------------+---------------+-------+--------+---------------+-------------------+-------+
|        card_id|      member_id| amount|postcode|         pos_id|     transaction_dt| status|
+---------------+---------------+-------+--------+---------------+-------------------+-------+
|348702330256514|000037495066290|9084849|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290| 330148|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290| 136052|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290|4310362|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290|9097094|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290|2291118|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290|4900011|   33946|614677375609919|11-02-2018 00:00:00|GENUINE|
|348702330256514|000037495066290| 633447|   33946|

In [30]:
score = mem_df.join(card_df, mem_df.mem_id == card_df.member_id,how='LEFT') 

In [31]:
score.count()

999

In [32]:
score.printSchema()

root
 |-- mem_id: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- card_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- member_joining_dt: string (nullable = true)
 |-- card_purchase_dt: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)



In [33]:
score = score.select('mem_id', 'score', 'card_id')  #selecting required columns

In [34]:
score.show()

+---------------+-----+----------------+
|         mem_id|score|         card_id|
+---------------+-----+----------------+
|000037495066290|  339| 348702330256514|
|000117826301530|  289|5189563368503974|
|001147922084344|  393|5407073344486464|
|001314074991813|  225| 378303738095292|
|001739553947511|  642| 348413196172048|
|003761426295463|  413| 348536585266345|
|004494068832701|  217|5515987071565183|
|006836124210484|  504|5400251558458125|
|006991872634058|  697|4573337022888445|
|007955566230397|  372|4708912758619517|
|008732267588672|  213|5342400571435088|
|008765307152821|  399|4237648081700588|
|009136568025042|  308| 371814781663843|
|009190444424572|  559| 341363858179050|
|009250698176266|  233| 340028465709212|
|009873334520465|  298|5495445301620991|
|011716573646690|  249|4795844193055110|
|011877954983420|  497|5164771396791995|
|012390918683920|  407|5423921058459194|
|012731668664932|  612|5379610024035907|
+---------------+-----+----------------+
only showing top

In [35]:
mem_df = mem_df.withColumnRenamed('mem_id','member_id')

In [36]:
mem_df.show()

+---------------+-----+
|      member_id|score|
+---------------+-----+
|000037495066290|  339|
|000117826301530|  289|
|001147922084344|  393|
|001314074991813|  225|
|001739553947511|  642|
|003761426295463|  413|
|004494068832701|  217|
|006836124210484|  504|
|006991872634058|  697|
|007955566230397|  372|
|008732267588672|  213|
|008765307152821|  399|
|009136568025042|  308|
|009190444424572|  559|
|009250698176266|  233|
|009873334520465|  298|
|011716573646690|  249|
|011877954983420|  497|
|012390918683920|  407|
|012731668664932|  612|
+---------------+-----+
only showing top 20 rows



In [37]:
mem_df.count()

999

In [38]:
score = score.withColumnRenamed('card_id','cardid')

In [39]:
score.show()

+---------------+-----+----------------+
|         mem_id|score|          cardid|
+---------------+-----+----------------+
|000037495066290|  339| 348702330256514|
|000117826301530|  289|5189563368503974|
|001147922084344|  393|5407073344486464|
|001314074991813|  225| 378303738095292|
|001739553947511|  642| 348413196172048|
|003761426295463|  413| 348536585266345|
|004494068832701|  217|5515987071565183|
|006836124210484|  504|5400251558458125|
|006991872634058|  697|4573337022888445|
|007955566230397|  372|4708912758619517|
|008732267588672|  213|5342400571435088|
|008765307152821|  399|4237648081700588|
|009136568025042|  308| 371814781663843|
|009190444424572|  559| 341363858179050|
|009250698176266|  233| 340028465709212|
|009873334520465|  298|5495445301620991|
|011716573646690|  249|4795844193055110|
|011877954983420|  497|5164771396791995|
|012390918683920|  407|5423921058459194|
|012731668664932|  612|5379610024035907|
+---------------+-----+----------------+
only showing top

In [40]:
hist = trans_df.join(score, trans_df.member_id == score.mem_id,how='outer') #outer join on member_id between score and transactions table

In [41]:
hist.count() #53210

53210

In [42]:
hist.printSchema()

root
 |-- card_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- postcode: string (nullable = true)
 |-- pos_id: string (nullable = true)
 |-- transaction_dt: string (nullable = true)
 |-- status: string (nullable = true)
 |-- mem_id: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- cardid: string (nullable = true)



In [43]:
hist = hist.select('card_id', 'amount', 'postcode', 'pos_id','transaction_dt','status','score')

In [44]:
hist.show()

+---------------+-------+--------+---------------+-------------------+-------+-----+
|        card_id| amount|postcode|         pos_id|     transaction_dt| status|score|
+---------------+-------+--------+---------------+-------------------+-------+-----+
|340379737226464|6126197|   46933|167473544283898|01-05-2016 08:10:50|GENUINE|  229|
|340379737226464|7949232|   61840|664980919335952|01-10-2016 10:38:52|GENUINE|  229|
|340379737226464| 943839|   91743|633038040069180|02-08-2016 00:31:25|GENUINE|  229|
|340379737226464|3764114|   91743|633038040069180|02-08-2016 21:35:27|GENUINE|  229|
|340379737226464|6221251|   98384|064948657945290|02-10-2016 14:44:14|GENUINE|  229|
|340379737226464|2868312|   26032|856772774421259|02-12-2016 21:55:43|GENUINE|  229|
|340379737226464|4418586|   20129|390339673634463|02-12-2017 17:05:51|GENUINE|  229|
|340379737226464|7439113|   91763|315067016872305|03-04-2017 11:43:59|GENUINE|  229|
|340379737226464|8217180|   16063|208378790148728|03-05-2017 16:4

In [45]:
timeform = 'dd-MM-yyyy HH:mm:ss'
history = hist.withColumn('transaction_date',f.unix_timestamp(hist['transaction_dt'],timeform).cast('timestamp'))

In [46]:
history.show()

+---------------+-------+--------+---------------+-------------------+-------+-----+-------------------+
|        card_id| amount|postcode|         pos_id|     transaction_dt| status|score|   transaction_date|
+---------------+-------+--------+---------------+-------------------+-------+-----+-------------------+
|340379737226464|6126197|   46933|167473544283898|01-05-2016 08:10:50|GENUINE|  229|2016-05-01 08:10:50|
|340379737226464|7949232|   61840|664980919335952|01-10-2016 10:38:52|GENUINE|  229|2016-10-01 10:38:52|
|340379737226464| 943839|   91743|633038040069180|02-08-2016 00:31:25|GENUINE|  229|2016-08-02 00:31:25|
|340379737226464|3764114|   91743|633038040069180|02-08-2016 21:35:27|GENUINE|  229|2016-08-02 21:35:27|
|340379737226464|6221251|   98384|064948657945290|02-10-2016 14:44:14|GENUINE|  229|2016-10-02 14:44:14|
|340379737226464|2868312|   26032|856772774421259|02-12-2016 21:55:43|GENUINE|  229|2016-12-02 21:55:43|
|340379737226464|4418586|   20129|390339673634463|02-12

In [47]:
history = history.drop('transaction_dt')

In [48]:
history.show()

+---------------+-------+--------+---------------+-------+-----+-------------------+
|        card_id| amount|postcode|         pos_id| status|score|   transaction_date|
+---------------+-------+--------+---------------+-------+-----+-------------------+
|340379737226464|6126197|   46933|167473544283898|GENUINE|  229|2016-05-01 08:10:50|
|340379737226464|7949232|   61840|664980919335952|GENUINE|  229|2016-10-01 10:38:52|
|340379737226464| 943839|   91743|633038040069180|GENUINE|  229|2016-08-02 00:31:25|
|340379737226464|3764114|   91743|633038040069180|GENUINE|  229|2016-08-02 21:35:27|
|340379737226464|6221251|   98384|064948657945290|GENUINE|  229|2016-10-02 14:44:14|
|340379737226464|2868312|   26032|856772774421259|GENUINE|  229|2016-12-02 21:55:43|
|340379737226464|4418586|   20129|390339673634463|GENUINE|  229|2017-12-02 17:05:51|
|340379737226464|7439113|   91763|315067016872305|GENUINE|  229|2017-04-03 11:43:59|
|340379737226464|8217180|   16063|208378790148728|GENUINE|  229|2

### Collecting Transaction Date & Post Code of Last Transaction

* logic for lookup table
* calculate max transaction_date grouped by card id


In [49]:
lookup_table = history.groupBy('card_id').agg(f.max("transaction_date").alias('transaction_date'))

In [50]:
lookup_table.show() #card_id and transaction_date

+----------------+-------------------+
|         card_id|   transaction_date|
+----------------+-------------------+
| 340379737226464|2018-01-27 00:19:47|
| 377201318164757|2017-11-28 16:32:22|
| 348962542187595|2018-01-29 17:17:14|
|4389973676463558|2018-01-26 13:47:46|
|5403923427969691|2018-01-22 23:46:19|
| 345406224887566|2017-12-25 04:03:58|
|6562510549485881|2018-01-17 08:35:27|
|5508842242491554|2018-01-31 14:55:58|
|4407230633003235|2018-01-27 07:21:08|
| 379321864695232|2018-01-03 00:29:37|
| 340028465709212|2018-01-02 03:25:35|
| 349143706735646|2018-01-29 22:33:14|
|4126356979547079|2018-01-24 16:09:03|
|5543219113990484|2018-01-13 18:34:00|
|5464688416792307|2018-01-26 19:03:47|
|6011273561157733|2018-02-01 01:27:58|
|4484950467600170|2018-01-10 08:03:13|
|4818950814628962|2018-01-31 00:53:15|
|5573293264792992|2018-01-31 14:55:57|
|6011985140563103|2018-01-30 02:03:54|
+----------------+-------------------+
only showing top 20 rows



In [51]:
lookup_table.count()  #999

999

In [52]:
score.show()

+---------------+-----+----------------+
|         mem_id|score|          cardid|
+---------------+-----+----------------+
|000037495066290|  339| 348702330256514|
|000117826301530|  289|5189563368503974|
|001147922084344|  393|5407073344486464|
|001314074991813|  225| 378303738095292|
|001739553947511|  642| 348413196172048|
|003761426295463|  413| 348536585266345|
|004494068832701|  217|5515987071565183|
|006836124210484|  504|5400251558458125|
|006991872634058|  697|4573337022888445|
|007955566230397|  372|4708912758619517|
|008732267588672|  213|5342400571435088|
|008765307152821|  399|4237648081700588|
|009136568025042|  308| 371814781663843|
|009190444424572|  559| 341363858179050|
|009250698176266|  233| 340028465709212|
|009873334520465|  298|5495445301620991|
|011716573646690|  249|4795844193055110|
|011877954983420|  497|5164771396791995|
|012390918683920|  407|5423921058459194|
|012731668664932|  612|5379610024035907|
+---------------+-----+----------------+
only showing top

* joining score with lookup_table using Card id 

In [53]:
lookup_table = lookup_table.join(score, lookup_table.card_id == score.cardid,how='INNER')

In [54]:
lookup_table.count()  #check the count (999)

999

In [55]:
lookup_table.show()

+----------------+-------------------+---------------+-----+----------------+
|         card_id|   transaction_date|         mem_id|score|          cardid|
+----------------+-------------------+---------------+-----+----------------+
| 340379737226464|2018-01-27 00:19:47|089615510858348|  229| 340379737226464|
| 345406224887566|2017-12-25 04:03:58|296206661780881|  349| 345406224887566|
| 348962542187595|2018-01-29 17:17:14|366246487993992|  522| 348962542187595|
| 377201318164757|2017-11-28 16:32:22|924475891017022|  432| 377201318164757|
| 379321864695232|2018-01-03 00:29:37|082567374418739|  297| 379321864695232|
|4389973676463558|2018-01-26 13:47:46|295554828848966|  400|4389973676463558|
|4407230633003235|2018-01-27 07:21:08|761335698364860|  567|4407230633003235|
|5403923427969691|2018-01-22 23:46:19|922077754605834|  324|5403923427969691|
|5508842242491554|2018-01-31 14:55:58|634200295989311|  585|5508842242491554|
|6562510549485881|2018-01-17 08:35:27|659982919406634|  518|6562

In [56]:
lookup_table = lookup_table.select('card_id','transaction_date')

In [57]:
lookup_table.show()

+----------------+-------------------+
|         card_id|   transaction_date|
+----------------+-------------------+
| 340379737226464|2018-01-27 00:19:47|
| 345406224887566|2017-12-25 04:03:58|
| 348962542187595|2018-01-29 17:17:14|
| 377201318164757|2017-11-28 16:32:22|
| 379321864695232|2018-01-03 00:29:37|
|4389973676463558|2018-01-26 13:47:46|
|4407230633003235|2018-01-27 07:21:08|
|5403923427969691|2018-01-22 23:46:19|
|5508842242491554|2018-01-31 14:55:58|
|6562510549485881|2018-01-17 08:35:27|
| 340028465709212|2018-01-02 03:25:35|
| 349143706735646|2018-01-29 22:33:14|
|4126356979547079|2018-01-24 16:09:03|
|4484950467600170|2018-01-10 08:03:13|
|4818950814628962|2018-01-31 00:53:15|
|5464688416792307|2018-01-26 19:03:47|
|5543219113990484|2018-01-13 18:34:00|
|5573293264792992|2018-01-31 14:55:57|
|6011273561157733|2018-02-01 01:27:58|
|6011985140563103|2018-01-30 02:03:54|
+----------------+-------------------+
only showing top 20 rows



In [58]:
 #To arrive at derived columns like latest_transaction date, group the combined dataframe 
#on card_id such that all transactions on same card id collate and get max(transaction 
#date).
lookup_table = lookup_table.join(history,on=['card_id','transaction_date'],how='left')  #1476
lookup_table.count() #1476

1476

In [59]:

lookup_table.show()  

+----------------+-------------------+-------+--------+---------------+-------+-----+
|         card_id|   transaction_date| amount|postcode|         pos_id| status|score|
+----------------+-------------------+-------+--------+---------------+-------+-----+
| 378586484293754|2017-12-24 05:14:37|3859271|   24363|753115024049849|GENUINE|  337|
|4356201405998945|2018-01-24 14:23:42|4553231|   43791|339439168301190|GENUINE|  600|
|4418227862530505|2018-01-25 16:43:45|4085014|   14544|028630406062180|GENUINE|  318|
|5400249950855567|2018-01-28 06:10:31|1062269|   24966|757227694469394|GENUINE|  523|
| 373748808330229|2018-01-29 13:46:32|2446006|   25260|459926365561014|GENUINE|  685|
|4353614029446427|2018-01-10 23:51:13|2713094|   15311|791335648163958|GENUINE|  219|
|4598225659063187|2018-01-25 21:59:45| 421272|   50531|657401894365206|GENUINE|  355|
|4689314809377828|2018-01-25 21:59:45|1151530|   29550|365821079545471|GENUINE|  632|
|5447036761675606|2017-11-16 23:38:38| 566003|   32970

## Calculating UCL

* Calculate the moving average and standard deviation of the last 10 transactions for each card_id for the data present in Hadoop and NoSQL database
* With the fresh dataframe, use member ID once again as common key and join with card_transaction.csv to load postcode, pos_id, status, amount & transaction date fields from history transactions
* open a window frame where we group input dataframe rows on card_id and order by transaction date to get all transactions on card in chronological order


In [60]:
window = Window.partitionBy(history['card_id']).orderBy(history['transaction_date'].desc())

history_df = history.select('*', f.rank().over(window).alias('rank')).filter(f.col('rank') <= 10)

In [61]:
history_df.show()

+---------------+-------+--------+---------------+-------+-----+-------------------+----+
|        card_id| amount|postcode|         pos_id| status|score|   transaction_date|rank|
+---------------+-------+--------+---------------+-------+-----+-------------------+----+
|340379737226464|1784098|   26656|000383013889790|GENUINE|  229|2018-01-27 00:19:47|   1|
|340379737226464|3759577|   61334|016312401940277|GENUINE|  229|2018-01-18 14:26:09|   2|
|340379737226464|4080612|   51338|562082278231631|GENUINE|  229|2018-01-14 20:54:02|   3|
|340379737226464|4242710|   96105|285501971776349|GENUINE|  229|2018-01-11 19:09:55|   4|
|340379737226464|9061517|   40932|232455833079472|GENUINE|  229|2018-01-10 20:20:33|   5|
|340379737226464| 102248|   40932|232455833079472|GENUINE|  229|2018-01-10 15:04:33|   6|
|340379737226464|7445128|   50455|915439934619047|GENUINE|  229|2018-01-07 23:52:27|   7|
|340379737226464|5706163|   50455|915439934619047|GENUINE|  229|2018-01-07 22:07:07|   8|
|340379737

In [62]:
history_df = history_df.groupBy("card_id").agg(f.round(f.avg('amount'),2).alias('moving_avg'), \
                                                                      f.round(f.stddev('amount'),2).alias('Std_Dev'))
history_df.show()

+----------------+----------+----------+
|         card_id|moving_avg|   Std_Dev|
+----------------+----------+----------+
| 340379737226464| 5355453.1|3107063.55|
| 345406224887566| 5488456.5|3252527.52|
| 348962542187595| 5735629.0|3089916.54|
| 377201318164757| 5742377.7|2768545.84|
| 379321864695232| 4713319.1|3203114.94|
|4389973676463558| 4923904.7| 2306771.9|
|4407230633003235| 4348891.3|3274883.95|
|5403923427969691| 5375495.6|2913510.72|
|5508842242491554| 4570725.9|3229905.04|
|6562510549485881| 5551056.9|2501552.48|
| 340028465709212| 6863758.9|3326644.65|
| 349143706735646| 5453372.9|3424332.26|
|4126356979547079| 4286400.2|2909676.26|
|4484950467600170| 4550480.5|3171538.48|
|4818950814628962| 2210428.9| 958307.87|
|5464688416792307| 4985938.2|2379084.95|
|5543219113990484| 4033586.9|2969107.42|
|5573293264792992| 3929994.0|2589503.93|
|6011273561157733| 4634624.8|2801886.17|
|6011985140563103| 5302878.9| 3088988.7|
+----------------+----------+----------+
only showing top

In [63]:
history_df = history_df.withColumn('UCL',history_df.moving_avg+3*(history_df.Std_Dev))
history_df.show()

+----------------+----------+----------+--------------------+
|         card_id|moving_avg|   Std_Dev|                 UCL|
+----------------+----------+----------+--------------------+
| 340379737226464| 5355453.1|3107063.55|1.4676643749999998E7|
| 345406224887566| 5488456.5|3252527.52|       1.524603906E7|
| 348962542187595| 5735629.0|3089916.54|1.5005378620000001E7|
| 377201318164757| 5742377.7|2768545.84|1.4048015219999999E7|
| 379321864695232| 4713319.1|3203114.94|       1.432266392E7|
|4389973676463558| 4923904.7| 2306771.9|1.1844220399999999E7|
|4407230633003235| 4348891.3|3274883.95|1.4173543150000002E7|
|5403923427969691| 5375495.6|2913510.72|       1.411602776E7|
|5508842242491554| 4570725.9|3229905.04|1.4260441020000001E7|
|6562510549485881| 5551056.9|2501552.48|       1.305571434E7|
| 340028465709212| 6863758.9|3326644.65|       1.684369285E7|
| 349143706735646| 5453372.9|3424332.26|       1.572636968E7|
|4126356979547079| 4286400.2|2909676.26|       1.301542898E7|
|4484950

In [64]:
history_df = history_df.select('card_id','UCL')

In [65]:
lookup_table = lookup_table.join(history_df,on=['card_id'])

In [66]:
lookup_table.show()  #Final data set look as below

+----------------+-------------------+-------+--------+---------------+-------+-----+--------------------+
|         card_id|   transaction_date| amount|postcode|         pos_id| status|score|                 UCL|
+----------------+-------------------+-------+--------+---------------+-------+-----+--------------------+
| 340379737226464|2018-01-27 00:19:47|1784098|   26656|000383013889790|GENUINE|  229|1.4676643749999998E7|
| 345406224887566|2017-12-25 04:03:58|1135534|   53034|146838238062262|GENUINE|  349|       1.524603906E7|
| 348962542187595|2018-01-29 17:17:14|7408949|   27830|453850044027107|GENUINE|  522|1.5005378620000001E7|
| 377201318164757|2017-11-28 16:32:22|4799826|   84302|287431794718846|GENUINE|  432|1.4048015219999999E7|
| 379321864695232|2018-01-03 00:29:37|5702120|   98837|638380208258390|GENUINE|  297|       1.432266392E7|
|4389973676463558|2018-01-26 13:47:46|7196505|   10985|588476547410852|GENUINE|  400|1.1844220399999999E7|
|4407230633003235|2018-01-27 07:21:08

In [67]:
lookup_table = lookup_table.dropDuplicates((['card_id','transaction_date','postcode']))

In [68]:
lookup_table.count()  #1000

1000

### Load this dataframe into NoSQL database i.e. Hbase

* Create a connection with Hbase
* Check if table you want to create already exists and create one if it doesn’t exist
* Batch load data from dataframe to table created.


In [73]:
import happybase
#create connection
connection = happybase.Connection('localhost', port=9090 ,autoconnect=False)

In [74]:
def open_connection():
    connection.open()
#close the opened connection
def close_connection():
    connection.close()
#list all tables in Hbase
def list_tables():
    print "fetching all table"
    open_connection()
    tables = connection.tables()
    close_connection()
    print "all tables fetched"
    return tables

In [75]:
#create the required table 
def create_table(name,cf):
    print "creating table " + name
    tables = list_tables()
    if name not in tables:
        open_connection()
        connection.create_table(name, cf)
        close_connection()
        print "table created"
    else:
        print "table already present"
#get the pointer to a table
def get_table(name):
    open_connection()
    table = connection.table(name)
    close_connection()
    return table

In [76]:
create_table('lookup_table', {'info' : dict(max_versions=5) })

creating table lookup_table
fetching all table
all tables fetched
table created


In [77]:
#batch insert data in lookup table
def batch_insert_data(df,tableName):
 print "starting batch insert of events"
 table = get_table(tableName)
 open_connection()
 rows_count=0

#Creating a rowkey for better data query. RowKey is the cardId .
 rowKey_dict={}
 with table.batch(batch_size=4) as b:
   for row in df.rdd.collect():
    b.put(bytes(row.card_id) , { 'info:card_id':bytes(row.card_id),
                        'info:transaction_date':bytes(row.transaction_date),
                        'info:score':bytes(row.score),
                        'info:postcode':bytes(row.postcode),
                        'info:UCL':bytes(row.UCL)})

 

 print "batch insert done"
 close_connection()

In [79]:
batch_insert_data(lookup_table,'lookup_table')

starting batch insert of events
batch insert done


In [80]:
# create table of card_transactions.csv file.
create_table('card_transactions', {'info' : dict(max_versions=5) })


creating table card_transactions
fetching all table
all tables fetched
table created


In [81]:
def batch_insert_csvdata(filename,tableName):
    print "starting batch insert of events"
    file = open(filename, "r")
    table = get_table(tableName)
    open_connection()
    i=0
    
    for line in file:
        temp = line.strip().split(",")
        
        #Skip the first row
        if temp[0]!='card_id':
            
            table.put(bytes(i) , { 'info:card_id':bytes(temp[0]),
                                                'info:member_id':bytes(temp[1]),
                                                'info:amount':bytes(temp[2]),
                                                'info:postcode':bytes(temp[3]),
                                                'info:pos_id':bytes(temp[4]),
                                                'info:transaction_dt':bytes(temp[5]),
                                                'info:status':bytes(temp[6])})
        i=i+1
     
    
 
    file.close()
    print "batch insert done"
    close_connection()


In [82]:
#Batch insert data of card_transactions.csv file.
batch_insert_csvdata('card_transactions.csv','card_transactions')



starting batch insert of events
batch insert done
