In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("GD data prep") \
    .getOrCreate()
spark

In [3]:
df = spark.read.parquet('gs://greendot-events/je/transdtl_with_pt_for_bq_training')

In [4]:
df.printSchema()

root
 |-- postedtransactionv2key: string (nullable = true)
 |-- transdtlkey: long (nullable = true)
 |-- customerkey: long (nullable = true)
 |-- cardkey: long (nullable = true)
 |-- trancode: string (nullable = true)
 |-- postdate: timestamp (nullable = true)
 |-- transdate: timestamp (nullable = true)
 |-- transamt: decimal(19,4) (nullable = true)
 |-- merch_name: string (nullable = true)
 |-- merch_trunc: string (nullable = true)
 |-- mcc: string (nullable = true)
 |-- mcc_description: string (nullable = true)
 |-- mcc_category: string (nullable = true)
 |-- merch_city: string (nullable = true)
 |-- merch_state: string (nullable = true)
 |-- merch_zip: string (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- online_retail_product: string (nullable = true)
 |-- bin: string (nullable = true)
 |-- product_code: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_desc: string (nullable = true)
 |-- card_type_desc: string (nullable = true)
 

In [5]:
# Data Shape

#print((df.count(),len(df.columns)))

In [6]:
# Length of SSN Tokens

df.withColumn('ssnlength',F.length('ssntoken')).groupby('ssnlength').count().show()

+---------+---------+
|ssnlength|    count|
+---------+---------+
|     null|  2397472|
|       11|167443158|
+---------+---------+



The SSN tokens are mostly 11 characters in length, the remaining seem to be null.

In [7]:
from pyspark.sql.functions import col, length

# Filter the Data for SSN tokens length 11
df_11= df.where(length(col('ssntoken')) == 11)

#Check Shape
#print((df_11.count(),len(df_11.columns)))

In [8]:
#Most occuring SSN Token

df_11.groupby('ssntoken').count().orderBy("count",ascending=False).first()

Row(ssntoken=u'FNSSX0DAX3I', count=41084)

In [9]:
#Top 10 most occuring SSN tokens

df_11.registerTempTable('df_11')
sqlContext.sql("SELECT ssntoken, COUNT(*) AS cnt FROM df_11 GROUP BY ssntoken ORDER BY cnt DESC LIMIT 10").collect()

[Row(ssntoken=u'FNSSX0DAX3I', cnt=41084),
 Row(ssntoken=u'5PZROF234LI', cnt=14541),
 Row(ssntoken=u'P7V9RXSUAOI', cnt=8340),
 Row(ssntoken=u'BSYMYINTLII', cnt=8151),
 Row(ssntoken=u'V16JFRWZ0WI', cnt=8129),
 Row(ssntoken=u'SHSWGI6S7AI', cnt=7894),
 Row(ssntoken=u'5OCX6C70SAI', cnt=7695),
 Row(ssntoken=u'BYA6O06Z8VI', cnt=7646),
 Row(ssntoken=u'9ELQGOF901I', cnt=7142),
 Row(ssntoken=u'4UI3OGEKBQI', cnt=6680)]

These are the top 10 most occuring SSN Tokens. While there doesn't appear to be anything odd going on with the token values, it is worth nothing the most occuring SSN token has 41,000 different rows, which is much larger than any other. Might be worth looking into this specific customer to see what's going on.

In [10]:
# Exclude negative Dispute amounts

df_11_a = df_11.where((df.dispute_amount >=0) | (df.dispute_amount.isNull()))
print('Excluded {} negative dispute amounts.'.format(df_11.where(df.dispute_amount<0).count()))

Excluded 21 negative dispute amounts.


In [11]:
#Read in product key data

df_prod = spark.read.parquet('gs://greendot-events/je/product_key_segment')
df_prod.count()

3471

In [12]:
#Determine when the transaction was posted in relation to customer create date
from pyspark.sql.functions import unix_timestamp
df_11_b = df_11_a.withColumn('date_from_creation', (F.unix_timestamp('postdate') - F.unix_timestamp('customer_createdate')))



In [13]:
df_11_b.registerTempTable('df_11_b')
sqlContext.sql("SELECT date_from_creation, COUNT(*) AS cnt FROM df_11_b WHERE date_from_creation >(180*86400) GROUP BY date_from_creation ORDER BY cnt DESC LIMIT 10").collect()


[Row(date_from_creation=17376120, cnt=4532),
 Row(date_from_creation=17116920, cnt=2073),
 Row(date_from_creation=17462520, cnt=1855),
 Row(date_from_creation=16983060, cnt=1527),
 Row(date_from_creation=17548920, cnt=1422),
 Row(date_from_creation=19795320, cnt=1375),
 Row(date_from_creation=17226120, cnt=1172),
 Row(date_from_creation=17203320, cnt=1110),
 Row(date_from_creation=17658240, cnt=1087),
 Row(date_from_creation=15759780, cnt=943)]

In [14]:
df_11_c = df_11_b.where((df_11_b.date_from_creation <=180*86400))

In [15]:
'Removed {} values from the dataset due to the date_from_creation being longer than 180 days'.format(df_11_b.count() - df_11_c.count())

'Removed 16682519 values from the dataset due to the date_from_creation being longer than 180 days'

In [16]:
#Create Observation Period column (obs_period) and flag (obs_90) that equals 1 
#if the Observation period is greater than 90 days, otherwise 0
#df_11_d = df_11_c.withColumn('obs_period', (unix_timestamp('postdate') - unix_timestamp('customer_createdate')))
#df_11_d = df_11_d.where((df_11_d.obs_period <=90*86400))
df_11_e = df_11_c.withColumn('obs_90', \
                            F.when(df_11_c.date_from_creation >=90*86400,1).otherwise(0))



In [17]:
df_11_e.registerTempTable('df_11_e')
sqlContext.sql("SELECT obs_90, COUNT(*) AS cnt FROM df_11_e GROUP BY obs_90 ORDER BY cnt DESC LIMIT 10").collect()


[Row(obs_90=0, cnt=99018382), Row(obs_90=1, cnt=51742236)]

In [18]:
#Create Target Variable based on following criteria:
# dispute_amount >= 100, 91<= date_from_creation >= 180
# date_from_creation is how long the transaction has been posted relative to the customer's creation date
df_11_f = df_11_e.withColumn('target', \
                             F.when((df_11_e.date_from_creation>=91*86400)\
                                    &(df_11_e.date_from_creation<=180*86400)\
                                    &(df_11_e.dispute_amount>=100),1)\
            .otherwise(0))

df_11_f.registerTempTable('df_11_f')
sqlContext.sql("SELECT target, COUNT(DISTINCT ssntoken) AS unique_tokens,COUNT(*) AS total_tokens FROM df_11_f GROUP BY target ORDER BY target DESC LIMIT 10").collect()

[Row(target=1, unique_tokens=3228, total_tokens=14763),
 Row(target=0, unique_tokens=1270489, total_tokens=150745855)]

In [19]:
#Summary statistics for the transaction amount 
#for customers that were posted 0-90 days after customer creation
df_11_f.filter((df_11_f.date_from_creation<90*86400)).select('transamt').describe().show()


+-------+------------------+
|summary|          transamt|
+-------+------------------+
|  count|          99018382|
|   mean|      101.47246789|
| stddev|422.87141474508417|
|    min|            0.0000|
|    max|       205000.0000|
+-------+------------------+



In [20]:
#Most frequent transaction codes
#Note this is not filtered by date_from_creation
df_11_f.registerTempTable('df_11_f')
#sqlContext.sql("SELECT trancode, COUNT(*) AS cnt FROM df_11_f GROUP BY trancode ORDER BY cnt DESC LIMIT 10").collect()

[Row(trancode=u'1001', cnt=56586256),
 Row(trancode=u'7301', cnt=35942566),
 Row(trancode=u'7017', cnt=9212716),
 Row(trancode=u'0886', cnt=7563363),
 Row(trancode=u'7302', cnt=7516222),
 Row(trancode=u'7008', cnt=5959386),
 Row(trancode=u'3001', cnt=5203399),
 Row(trancode=u'7027', cnt=4466490),
 Row(trancode=u'7166', cnt=4109278),
 Row(trancode=u'1018', cnt=3452160)]

In [21]:
# See if customers are using different cardkeys
from pyspark.sql.functions import countDistinct

df_11_f.groupBy('ssntoken').agg(countDistinct(col("cardkey")).alias("count")).orderBy("count",ascending=False).show()





+-----------+-----+
|   ssntoken|count|
+-----------+-----+
|FNSSX0DAX3I| 4536|
|DH2YHI13FLI|  626|
|E20FVDKRZXI|  516|
|34HRSC5R37I|  191|
|8N7T3G0S5UI|  141|
|IWXMEBPIM6I|  141|
|3JM52MS2QVI|  129|
|D4LHR0BX45I|  105|
|JXNAG6QDLPI|   89|
|Q7SDZUI5ZCI|   86|
|XU6KKNFYE5I|   79|
|YVATVDWAEQI|   78|
|POW1HH4ATGI|   68|
|N88E4GJR6YI|   67|
|BYMMKKJGNLI|   64|
|IGCD60HR8YI|   63|
|X8XHDJIYFLI|   56|
|TZJ5VUFAJWI|   50|
|P8F6HMT6EGI|   49|
|RF1K0O6UT0I|   49|
+-----------+-----+
only showing top 20 rows



In [22]:
#df_11_f.withColumn('targetcnt',F.length('target')).groupby('targetcnt').count().show()

+---------+---------+
|targetcnt|    count|
+---------+---------+
|        1|150760618|
+---------+---------+



In [23]:
#sqlContext.sql("SELECT obs_90, COUNT(*) AS cnt FROM df_11_f GROUP BY obs_90 ORDER BY cnt ASC LIMIT 10").collect()

[Row(obs_90=1, cnt=51742236), Row(obs_90=0, cnt=99018382)]

In [24]:
df_prod = spark.read.parquet('gs://greendot-events/je/product_key_segment')

prod = df_prod.alias('prod') 
data = df_11_f.alias('data')

left_join = prod.join(data,"product_key", how='left')


In [25]:
left_join.select('product_segment').distinct().collect()




[Row(product_segment=u'GoBank Online'),
 Row(product_segment=u'FSC Retail'),
 Row(product_segment=u'Green Dot Retail'),
 Row(product_segment=u'GoBank @ Walmart'),
 Row(product_segment=u'GDN Retail'),
 Row(product_segment=u'Walmart Retail'),
 Row(product_segment=u'Walmart Gift'),
 Row(product_segment=u'- Retail'),
 Row(product_segment=u'Green Dot @ Walmart'),
 Row(product_segment=u'Walmart Online'),
 Row(product_segment=u'Green Dot Online'),
 Row(product_segment=u'Green Dot Partner'),
 Row(product_segment=u'GoBank Retail'),
 Row(product_segment=u'GoBank Partner')]

In [26]:
sqlContext.sql("SELECT target, COUNT(DISTINCT ssntoken) AS unique_tokens,COUNT(*) AS total_tokens FROM df_11_f WHERE dispute_resolution='WriteOff' GROUP BY target ORDER BY target DESC LIMIT 10").collect()

[Row(target=1, unique_tokens=544, total_tokens=2116),
 Row(target=0, unique_tokens=14660, total_tokens=139825)]

In [27]:
df_new = left_join
df_new.printSchema()


root
 |-- product_key: integer (nullable = true)
 |-- product_segment: string (nullable = true)
 |-- postedtransactionv2key: string (nullable = true)
 |-- transdtlkey: long (nullable = true)
 |-- customerkey: long (nullable = true)
 |-- cardkey: long (nullable = true)
 |-- trancode: string (nullable = true)
 |-- postdate: timestamp (nullable = true)
 |-- transdate: timestamp (nullable = true)
 |-- transamt: decimal(19,4) (nullable = true)
 |-- merch_name: string (nullable = true)
 |-- merch_trunc: string (nullable = true)
 |-- mcc: string (nullable = true)
 |-- mcc_description: string (nullable = true)
 |-- mcc_category: string (nullable = true)
 |-- merch_city: string (nullable = true)
 |-- merch_state: string (nullable = true)
 |-- merch_zip: string (nullable = true)
 |-- online_retail_product: string (nullable = true)
 |-- bin: string (nullable = true)
 |-- product_code: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_desc: string (nullable = true)


In [28]:

df_trancode = df_new.groupBy('ssntoken').agg(countDistinct(col("trancode")).alias("unique_trancodes")\
                                             ,countDistinct(col("cardkey")).alias("unique_cardkeys")\
                                            ,).orderBy("unique_trancodes",ascending=False)


In [29]:
df_new_1 = df_new.join(df_trancode,"ssntoken", how='left')


In [30]:
df_new_1.printSchema()

root
 |-- ssntoken: string (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- product_segment: string (nullable = true)
 |-- postedtransactionv2key: string (nullable = true)
 |-- transdtlkey: long (nullable = true)
 |-- customerkey: long (nullable = true)
 |-- cardkey: long (nullable = true)
 |-- trancode: string (nullable = true)
 |-- postdate: timestamp (nullable = true)
 |-- transdate: timestamp (nullable = true)
 |-- transamt: decimal(19,4) (nullable = true)
 |-- merch_name: string (nullable = true)
 |-- merch_trunc: string (nullable = true)
 |-- mcc: string (nullable = true)
 |-- mcc_description: string (nullable = true)
 |-- mcc_category: string (nullable = true)
 |-- merch_city: string (nullable = true)
 |-- merch_state: string (nullable = true)
 |-- merch_zip: string (nullable = true)
 |-- online_retail_product: string (nullable = true)
 |-- bin: string (nullable = true)
 |-- product_code: string (nullable = true)
 |-- product_name: string (nullable = true)
 |--

In [31]:
from pyspark.sql import Window
df_trancode = (
    df_new
    .groupBy('ssntoken')
    .agg(countDistinct(col("trancode")).alias("unique_trancode_count")
))
ssn_trancode_window = F.row_number().over(Window.partitionBy("ssntoken").orderBy(col("transcode_count").desc()))
most_freq_trancode_df = (
    df_new
    .groupBy("ssntoken", "trancode")
    .agg(
        F.count(col("trancode")).alias("transcode_count")
    )
    .withColumn("rn", ssn_trancode_window)
    .filter("rn = 1")
    .select(
        col("ssntoken"), col("trancode").alias("most_frequent_trancode")
    )
)
joined_tranc_df = df_trancode.join(most_freq_trancode_df, 'ssntoken', 'inner')
df_new_2 = df_new_1.join(joined_tranc_df,"ssntoken", how='left')


In [32]:
df_new_2.printSchema()

root
 |-- ssntoken: string (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- product_segment: string (nullable = true)
 |-- postedtransactionv2key: string (nullable = true)
 |-- transdtlkey: long (nullable = true)
 |-- customerkey: long (nullable = true)
 |-- cardkey: long (nullable = true)
 |-- trancode: string (nullable = true)
 |-- postdate: timestamp (nullable = true)
 |-- transdate: timestamp (nullable = true)
 |-- transamt: decimal(19,4) (nullable = true)
 |-- merch_name: string (nullable = true)
 |-- merch_trunc: string (nullable = true)
 |-- mcc: string (nullable = true)
 |-- mcc_description: string (nullable = true)
 |-- mcc_category: string (nullable = true)
 |-- merch_city: string (nullable = true)
 |-- merch_state: string (nullable = true)
 |-- merch_zip: string (nullable = true)
 |-- online_retail_product: string (nullable = true)
 |-- bin: string (nullable = true)
 |-- product_code: string (nullable = true)
 |-- product_name: string (nullable = true)
 |--

In [33]:
df_merch = df_new.groupBy('ssntoken').agg(countDistinct(col("mcc_category")).alias("unique_merch")\
                                            ,).orderBy("unique_merch",ascending=False)

In [34]:
df_new_3 = df_new_2.join(df_merch,"ssntoken", how='left')
df_new_3.printSchema()

root
 |-- ssntoken: string (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- product_segment: string (nullable = true)
 |-- postedtransactionv2key: string (nullable = true)
 |-- transdtlkey: long (nullable = true)
 |-- customerkey: long (nullable = true)
 |-- cardkey: long (nullable = true)
 |-- trancode: string (nullable = true)
 |-- postdate: timestamp (nullable = true)
 |-- transdate: timestamp (nullable = true)
 |-- transamt: decimal(19,4) (nullable = true)
 |-- merch_name: string (nullable = true)
 |-- merch_trunc: string (nullable = true)
 |-- mcc: string (nullable = true)
 |-- mcc_description: string (nullable = true)
 |-- mcc_category: string (nullable = true)
 |-- merch_city: string (nullable = true)
 |-- merch_state: string (nullable = true)
 |-- merch_zip: string (nullable = true)
 |-- online_retail_product: string (nullable = true)
 |-- bin: string (nullable = true)
 |-- product_code: string (nullable = true)
 |-- product_name: string (nullable = true)
 |--

In [35]:
df_merch = (
    df_new
    .groupBy('ssntoken')
    .agg(countDistinct(col("mcc_category")).alias("unique_mcc_count")
))
ssn_mcc_window = F.row_number().over(Window.partitionBy("ssntoken").orderBy(col("mcc_count").desc()))
most_freq_mcc_df = (
    df_new
    .groupBy("ssntoken", "mcc_category")
    .agg(
        F.count(col("mcc_category")).alias("mcc_count")
    )
    .withColumn("rn", ssn_mcc_window)
    .filter("rn = 1")
    .select(
        col("ssntoken"), col("mcc_category").alias("most_frequent_mcc")
    )
)
joined_mcc_df = df_merch.join(most_freq_mcc_df, 'ssntoken', 'inner')
df_new_3 = df_new_2.join(joined_mcc_df,"ssntoken", how='left')
df_new_3.printSchema()

root
 |-- ssntoken: string (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- product_segment: string (nullable = true)
 |-- postedtransactionv2key: string (nullable = true)
 |-- transdtlkey: long (nullable = true)
 |-- customerkey: long (nullable = true)
 |-- cardkey: long (nullable = true)
 |-- trancode: string (nullable = true)
 |-- postdate: timestamp (nullable = true)
 |-- transdate: timestamp (nullable = true)
 |-- transamt: decimal(19,4) (nullable = true)
 |-- merch_name: string (nullable = true)
 |-- merch_trunc: string (nullable = true)
 |-- mcc: string (nullable = true)
 |-- mcc_description: string (nullable = true)
 |-- mcc_category: string (nullable = true)
 |-- merch_city: string (nullable = true)
 |-- merch_state: string (nullable = true)
 |-- merch_zip: string (nullable = true)
 |-- online_retail_product: string (nullable = true)
 |-- bin: string (nullable = true)
 |-- product_code: string (nullable = true)
 |-- product_name: string (nullable = true)
 |--

In [36]:
df_merch = df_new.groupBy('ssntoken').agg(countDistinct(col("merch_city")).alias("unique_cities")\
                                          ,countDistinct(col("merch_zip")).alias("unique_zip")
                                          ,countDistinct(col("merch_state")).alias("unique_states")
                                            ,).orderBy("unique_cities",ascending=False)

df_new_4 = df_new_3.join(df_merch,"ssntoken", how='left')
df_new_4.printSchema()

root
 |-- ssntoken: string (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- product_segment: string (nullable = true)
 |-- postedtransactionv2key: string (nullable = true)
 |-- transdtlkey: long (nullable = true)
 |-- customerkey: long (nullable = true)
 |-- cardkey: long (nullable = true)
 |-- trancode: string (nullable = true)
 |-- postdate: timestamp (nullable = true)
 |-- transdate: timestamp (nullable = true)
 |-- transamt: decimal(19,4) (nullable = true)
 |-- merch_name: string (nullable = true)
 |-- merch_trunc: string (nullable = true)
 |-- mcc: string (nullable = true)
 |-- mcc_description: string (nullable = true)
 |-- mcc_category: string (nullable = true)
 |-- merch_city: string (nullable = true)
 |-- merch_state: string (nullable = true)
 |-- merch_zip: string (nullable = true)
 |-- online_retail_product: string (nullable = true)
 |-- bin: string (nullable = true)
 |-- product_code: string (nullable = true)
 |-- product_name: string (nullable = true)
 |--

In [37]:
(
    df_new_4.write
     .format('parquet')
     .mode('overwrite')
     .option('compression', 'snappy')
     .save('gs://greendot-events/je/sample_data')
)

Py4JJavaError: An error occurred while calling o379.save.
: java.io.IOException: Error getting Bucket greendot-events:
{"code":403,"errors":[{"domain":"global","message":"865475175237-compute@developer.gserviceaccount.com does not have storage.buckets.get access to the Google Cloud Storage bucket.","reason":"forbidden"}],"message":"865475175237-compute@developer.gserviceaccount.com does not have storage.buckets.get access to the Google Cloud Storage bucket."}
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl$8.onFailure(GoogleCloudStorageImpl.java:1676)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.BatchHelper.execute(BatchHelper.java:182)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.BatchHelper.lambda$queue$0(BatchHelper.java:163)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
