<h1> Set Up </h1>

In [1]:
import os
import math

import numpy as np
import pandas as pd
import matplotlib.pylab as plt

In [2]:
from pyspark import SparkContext
from pyspark import SparkConf

In [3]:
sc = SparkContext('local[*]', 'PySpark')

In [4]:
# spark related
from pyspark.sql import DataFrameWriter
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sql = sqlContext.sql

In [5]:
sc.master

u'local[*]'

<h1> Read Files </h1>

In [7]:
profile_path = os.path.abspath('../dataset/profile/**/*.csv')

In [8]:
profile_txtRDD = sc.textFile(profile_path)
profile_txtRDD.take(5)

[u'1,1871111837,F,D,181903,F,373933,3320438400,1217.1769900503969,528.7309192220683,Y,Y,N,N,3696969600',
 u'2,1208855837,M,C,68383,F,72631,3688502400,1717.8806412199986,3.3682296721720717,Y,N,Y,N,3696969600',
 u'3,2880263837,M,E,457653,F,93331,3692304000,1216.7107394786008,307.4304767295353,N,N,N,Y,3696969600',
 u'4,1334999837,M,L,74257,I,248503,3647980800,1414.86660231857,888.3791238333672,Y,N,N,Y,3696969600',
 u'5,2817191837,F,A,45583,I,895863,3622406400,1388.8557936778625,642.1109125464978,Y,Y,N,Y,3696969600']

In [9]:
profile_txtRDD.count()

100

In [10]:
from pyspark.sql.types import *

fields = [
    StructField('customer_id', StringType(), False),
    StructField('birth_time', LongType(), True),
    StructField('gender', StringType(), True),
    StructField('contact_loc', StringType(), True),
    StructField('contact_code', StringType(), True),
    StructField('register_loc', StringType(), True),
    StructField('register_code', StringType(), True),
    StructField('start_time', LongType(), True),
    StructField('aum', DoubleType(), True),
    StructField('net_profit', DoubleType(), True),
    StructField('credit_card_flag', StringType(), True),
    StructField('loan_flag', StringType(), True),
    StructField('deposit_flag', StringType(), True),
    StructField('wealth_flag', StringType(), True),
    StructField('partition_time', LongType(), True)
]

schema = StructType(fields)

def process_profile_line(l):
    p = l.split(',')
    # for LongType
    for i in [1, 7, 14]:
        p[i] = int(p[i])
    # for DoubleType
    for i in [8, 9]:
        p[i] = float(p[i])
    return p

profile_df = profile_txtRDD \
    .map(process_profile_line) \
    .toDF(schema=schema)
    
profile_df.show(5)

+-----------+----------+------+-----------+------------+------------+-------------+----------+------------------+------------------+----------------+---------+------------+-----------+--------------+
|customer_id|birth_time|gender|contact_loc|contact_code|register_loc|register_code|start_time|               aum|        net_profit|credit_card_flag|loan_flag|deposit_flag|wealth_flag|partition_time|
+-----------+----------+------+-----------+------------+------------+-------------+----------+------------------+------------------+----------------+---------+------------+-----------+--------------+
|          1|1871111837|     F|          D|      181903|           F|       373933|3320438400|1217.1769900503969| 528.7309192220683|               Y|        Y|           N|          N|    3696969600|
|          2|1208855837|     M|          C|       68383|           F|        72631|3688502400|1717.8806412199986|3.3682296721720717|               Y|        N|           Y|          N|    3696969600|


In [11]:
profile_df.count()

100

In [12]:
type(profile_df)

pyspark.sql.dataframe.DataFrame

In [13]:
contact_locnum = profile_df.select('contact_loc').rdd.distinct().count()
contact_loclist = profile_df.select('contact_loc').rdd.distinct().collect()
print contact_locnum

11


In [14]:
contact_loclist

[Row(contact_loc=u'B'),
 Row(contact_loc=u'H'),
 Row(contact_loc=u'D'),
 Row(contact_loc=u'J'),
 Row(contact_loc=u'F'),
 Row(contact_loc=u'L'),
 Row(contact_loc=u'K'),
 Row(contact_loc=u'C'),
 Row(contact_loc=u'A'),
 Row(contact_loc=u'G'),
 Row(contact_loc=u'E')]

In [15]:
clRDD = profile_df.select('contact_loc').rdd.map(lambda loc: (loc.contact_loc, 1)).reduceByKey(lambda x, y: x+y).sortByKey()

In [16]:
cllist = clRDD.collect()

In [17]:
cllist

[(u'A', 4),
 (u'B', 6),
 (u'C', 8),
 (u'D', 8),
 (u'E', 8),
 (u'F', 4),
 (u'G', 12),
 (u'H', 10),
 (u'J', 16),
 (u'K', 8),
 (u'L', 16)]

In [18]:
clRDDnum = clRDD.aggregate(0, lambda x, y: x+y[1], lambda x, y: x+y) == 100

In [19]:
type(clRDDnum)

bool

In [20]:
clRDDnum

True

In [21]:
clidRDD = profile_df.select('contact_loc', 'customer_id').rdd\
        .map(lambda df: (df.contact_loc, [df.customer_id])).reduceByKey(lambda x, y: x+y)

In [22]:
clidRDD.count()

11

In [23]:
clidRDD.first()

(u'A', [u'5', u'17', u'30', u'42'])

In [24]:
clidlist = clidRDD.collect()

In [25]:
for i in xrange(len(clidlist)):
    print clidlist[i][0]
    print set(clidlist[i][1])

A
set([u'42', u'30', u'5', u'17'])
C
set([u'39', u'27', u'15', u'14', u'28', u'40', u'3', u'2'])
E
set([u'15', u'19', u'32', u'44', u'28', u'40', u'3', u'7'])
K
set([u'11', u'20', u'48', u'23', u'33', u'45', u'36', u'8'])
G
set([u'10', u'30', u'20', u'22', u'17', u'33', u'47', u'45', u'42', u'35', u'5', u'8'])
B
set([u'25', u'18', u'31', u'50', u'43', u'6'])
D
set([u'1', u'13', u'38', u'26'])
F
set([u'24', u'12', u'37', u'49'])
H
set([u'25', u'21', u'46', u'18', u'31', u'43', u'34', u'6', u'9', u'50'])
J
set([u'24', u'10', u'39', u'12', u'14', u'22', u'49', u'47', u'16', u'37', u'29', u'35', u'41', u'27', u'2', u'4'])
L
set([u'11', u'21', u'48', u'16', u'19', u'32', u'44', u'23', u'29', u'41', u'46', u'36', u'4', u'7', u'9', u'34'])


In [26]:
tol = 0
for i in xrange(len(clidlist)):
    tol += len(list(set(clidlist[i][1])))

print tol

96


In [71]:
clccRDD = profile_df.select('contact_loc', 'contact_code').rdd.map(lambda df: (df.contact_loc, [df.contact_code]))\
                    .reduceByKey(lambda x, y: x+y).map(lambda (x, y): (x, list(set(y))))

In [72]:
clccRDD.first()

(u'A', [u'45583'])

In [73]:
clccRDD.count()

11

In [81]:
clccRDD.aggregate(0, lambda x, y: x + len(y[1]), lambda x, y: x+y)

25

In [77]:
ccRDD = profile_df.select('contact_code').distinct().collect()

In [78]:
ccRDD

[Row(contact_code=u'530713'),
 Row(contact_code=u'922561'),
 Row(contact_code=u'468541'),
 Row(contact_code=u'437583'),
 Row(contact_code=u'109893'),
 Row(contact_code=u'354621'),
 Row(contact_code=u'74257'),
 Row(contact_code=u'230881'),
 Row(contact_code=u'53593'),
 Row(contact_code=u'124963'),
 Row(contact_code=u'159601'),
 Row(contact_code=u'698061'),
 Row(contact_code=u'843643'),
 Row(contact_code=u'647221'),
 Row(contact_code=u'45583'),
 Row(contact_code=u'180201'),
 Row(contact_code=u'32581'),
 Row(contact_code=u'68383'),
 Row(contact_code=u'215761'),
 Row(contact_code=u'115261'),
 Row(contact_code=u'358203'),
 Row(contact_code=u'457653'),
 Row(contact_code=u'181903'),
 Row(contact_code=u'48181')]

In [79]:
len(ccRDD)

24

In [80]:
clccRDD.collect()

[(u'A', [u'45583']),
 (u'C', [u'68383', u'437583']),
 (u'E', [u'468541', u'457653']),
 (u'K', [u'843643', u'109893']),
 (u'G', [u'230881', u'180201', u'215761']),
 (u'B', [u'115261', u'698061']),
 (u'D', [u'181903']),
 (u'F', [u'647221']),
 (u'H', [u'159601', u'922561', u'530713']),
 (u'J', [u'32581', u'48181', u'124963', u'457653']),
 (u'L', [u'74257', u'53593', u'358203', u'354621'])]

In [83]:
clccfRDD = profile_df.select('contact_loc', 'contact_code').rdd.map(lambda df: (df.contact_code, [df.contact_loc]))\
                    .reduceByKey(lambda x, y: x+y).map(lambda (x, y): (x, list(set(y))))\
                    .filter(lambda (x, y): len(y) > 1)

In [84]:
clccfRDD.count()

1

In [85]:
clccfRDD.first()

(u'457653', [u'J', u'E'])

<h1> ATM </h1>

In [27]:
import json

fields = [
    StructField('actor_type', StringType(), False),
    StructField('actor_id', StringType(), True),
    StructField('action_type', StringType(), True),
    StructField('action_time', LongType(), True),
    StructField('object_type', StringType(), True),
    StructField('object_id', StringType(), True),
    StructField('channel_type', StringType(), True),
    StructField('channel_id', StringType(), True),
    StructField('attrs', StringType(), True),
    StructField('theme', StringType(), True),
    StructField('partition_time', LongType(), True)
]

atmschema = StructType(fields)

In [28]:
def process_profile_line(l):
    p = l.split(',')
    # pre
    pre_fields = p[:8]
    pre_fields[3] = int(pre_fields[3])
    # post
    post_fields = p[-2:]
    post_fields[-1] = int(post_fields[-1])
    # attrs
    attrs = json.loads(','.join(p[8:-2]))
    # concat fields
    fields = pre_fields + [attrs] + post_fields
    return fields

In [29]:
atm_df = sc.textFile(os.path.abspath('../dataset/atm/**/*.csv')) \
    .map(process_profile_line) \
    .toDF(schema=atmschema)

atm_df.show(5)

+-----------+--------+-----------+-----------+-----------+--------------------+------------+--------------------+--------------------+-----+--------------+
| actor_type|actor_id|action_type|action_time|object_type|           object_id|channel_type|          channel_id|               attrs|theme|partition_time|
+-----------+--------+-----------+-----------+-----------+--------------------+------------+--------------------+--------------------+-----+--------------+
|customer_id|      17|    inquire| 3696970704|saving_acct|MJQXLDJMQQOYAPFBBPBS|         ATM|NCZHYDSTABCUAXLYJLQZ|{"action": {"txn_...|  atm|    3696969600|
|customer_id|      42|    inquire| 3696970704|saving_acct|MJQXLDJMQQOYAPFBBPBS|         ATM|NCZHYDSTABCUAXLYJLQZ|{"action": {"txn_...|  atm|    3696969600|
|customer_id|       2|    inquire| 3696973691|saving_acct|NQVSLAHSTIYMZFTFVPBH|         ATM|HFJXJQTXPAMHUNCEMXBC|{"action": {"txn_...|  atm|    3696969600|
|customer_id|      27|    inquire| 3696973691|saving_acct|NQVSLA

In [30]:
import json

# return to RDD for unstructured data
atm_attrs = atm_df.select("attrs").rdd.map(lambda r: r.attrs).map(lambda x: json.loads(x))
atm_attrs.take(10)

[{u'action': {u'trans_type': u'trans_out',
   u'txn_amt': 0.0,
   u'txn_fee_amt': 0.0},
  u'channel': {u'address_zipcode': 601401, u'machine_bank_code': u'C_bank'},
  u'object': {u'target_acct_nbr': u'UMOTEPKACILAPBIKOYHZ',
   u'target_bank_code': u'B_bank'}},
 {u'action': {u'trans_type': u'trans_out',
   u'txn_amt': 0.0,
   u'txn_fee_amt': 0.0},
  u'channel': {u'address_zipcode': 601401, u'machine_bank_code': u'C_bank'},
  u'object': {u'target_acct_nbr': u'UMOTEPKACILAPBIKOYHZ',
   u'target_bank_code': u'B_bank'}},
 {u'action': {u'trans_type': u'trans_out',
   u'txn_amt': 0.0,
   u'txn_fee_amt': 0.0},
  u'channel': {u'address_zipcode': 97657, u'machine_bank_code': u'Cathay'},
  u'object': {u'target_acct_nbr': u'INYTRNQEOMRLSGZWQPOD',
   u'target_bank_code': u'A_bank'}},
 {u'action': {u'trans_type': u'trans_out',
   u'txn_amt': 0.0,
   u'txn_fee_amt': 0.0},
  u'channel': {u'address_zipcode': 97657, u'machine_bank_code': u'Cathay'},
  u'object': {u'target_acct_nbr': u'INYTRNQEOMRLSGZWQP

In [31]:
type(atm_attrs)

pyspark.rdd.PipelinedRDD

In [32]:
atm_Transtyp = atm_attrs.map(lambda atm: atm['action']['trans_type']).distinct()

In [33]:
atm_Transtyp.count()

2

In [35]:
# atm action: trans_type
atm_Transtyp.take(5)

[u'trans_in', u'trans_out']

In [56]:
atm_checkoneone = atm_attrs\
                    .map(lambda atm: (atm['channel']['address_zipcode'], [atm['object']['target_acct_nbr']]))\
                    .reduceByKey(lambda x, y: x+y)\
                    .map(lambda (x, y): (x, list(set(y))))\
                    .filter(lambda (x, y): len(y) > 1)

In [57]:
atm_checkoneone.count()

263

In [38]:
atm_df.count()

1934

In [62]:
atm_checkoneone.take(10)

[(639201,
  [u'HSLPVHYXNGOVNEDGKQHU',
   u'CJVKXSVWOPNWNNWQFTTM',
   u'MMYNHASAFFRGVQSGRJQC',
   u'AHBPWQJTJNZVHDJSRKCK']),
 (1001001,
  [u'SJOEXYDEFCZFIDFMTMKH', u'SVJXDAQDCKJGKVUXFOXN', u'AKNCRIQOBFDUWOLWYQMA']),
 (951601, [u'UHEROBCXPSWOXNNVTVMB', u'SSRDJUNVPHIQLSVZKPCU']),
 (30801, [u'KWUISBDPUQFRPTKAVBUR', u'IJJBSECHUCRVFCFWGSHL']),
 (359401,
  [u'NOTHPMUBNXABPPLHJGWJ', u'SOYMOYMHFXNXCVHGJCFO', u'HLQGQZEYGSQMYOFDPMWM']),
 (679801,
  [u'QYFXTYSIPPCASAKROGAO',
   u'RHLFXECKYSZJCMGXLDRI',
   u'GWOGQWWHEDFGHWLYTJMT',
   u'GEYKRRODOELOYADBCRXX']),
 (601401,
  [u'UMOTEPKACILAPBIKOYHZ', u'SUUAFAUJZQEZMGPBMOEC', u'SEXNPUJUBCFCBHFGJIIN']),
 (40201, [u'YLJXSOWXSFUSSVECDKIJ', u'NMAQGJPFDUEDMBQKFLZP']),
 (858403, [u'SWAHICHUUWSEXUMRDZAW', u'XXUPPPYKAADBXDYQJYMT']),
 (106603, [u'BXEZFXOVEXEQKYYKAVAH', u'DBHLOMGFRHOOYZEDGVVD'])]

In [60]:
# check the data validation which every atm just located in one place
atm_checkoneonea = atm_attrs\
                    .map(lambda atm: (atm['object']['target_acct_nbr'], [atm['channel']['address_zipcode']]))\
                    .reduceByKey(lambda x, y: x+y)\
                    .map(lambda (x, y): (x, list(set(y))))\
                    .filter(lambda (x, y): len(y) > 1)

In [61]:
atm_checkoneonea.count()

0

In [86]:
atm_loc = atm_attrs.map(lambda atm: atm['channel']['address_zipcode']).distinct()

In [87]:
atm_loc.count()

588

In [88]:
atm_loc.take(20)

[639201,
 160401,
 1001001,
 951601,
 39801,
 30801,
 331201,
 359401,
 679801,
 360601,
 601401,
 40201,
 159601,
 158803,
 858403,
 997003,
 763003,
 106603,
 161203,
 224203]

In [89]:
atm_loc.filter(lambda loc: loc == 437583).count()

1

In [90]:
re_atm_loc = atm_attrs.map(lambda atm: (atm['channel']['address_zipcode'], 1)).reduceByKey(lambda x, y: x+y)

In [91]:
re_atm_loc.take(10)

[(639201, 8),
 (160401, 2),
 (1001001, 6),
 (951601, 4),
 (39801, 2),
 (30801, 4),
 (331201, 2),
 (359401, 6),
 (679801, 8),
 (360601, 2)]

In [92]:
re_atm_loc.aggregate(0, lambda x, y: x+y[1], lambda x, y: x+y)

1934

In [95]:
re_atm_loc.filter(lambda (c, n): c == 639201).take(2)

[(639201, 8)]