In [1]:
import pandas as pd
import numpy as np
from datetime import datetime
import pyspark as spark

from pyspark.sql import SparkSession, functions as F, SQLContext
sqlContext = SQLContext(sc)
from pyspark.sql.window import Window

from pyspark.sql.functions import rank, when, col, lit, to_date, udf, unix_timestamp, from_unixtime, regexp_replace, pow, cos, sin
from pyspark.sql.functions import datediff

from pyspark.sql.types import DateType, StringType, StructField, IntegerType, FloatType, StructType


sc._conf.set('spark.executor.memory', '200g')
sc._conf.set('spark.cores.max', '100')
sc._conf.set('spark.driver.allowMultipleContexts', 'true')

<pyspark.conf.SparkConf at 0x7f8da5f69f90>

In [2]:
#full=pd.read_csv('/user-home/1014/checking_models/response_rebuild/checking_model_data.txt', na_values=['.', 'null'])
#full.head()
#full=full[['Person_Seq_No_1_183','LAT','LNG','DATE_KEY']]
#full.to_csv('/user-home/1014/checking_models/response_rebuild/mailed_prosp_loc.csv', index=False)



<h3>Load prospect locations</h3>

In [3]:
%%time
prosp_loc_schema = StructType([StructField("Person_Seq_No_1_183", StringType(), True),StructField("LAT", FloatType(), True),StructField("LNG", FloatType(), True),  StructField("DATE_KEY_STR", StringType(), True)])

prospect_loc = (sqlContext.read.format("com.databricks.spark.csv").options(header= "true",delimiter=',',dateFormat="MM/dd/yyyy").schema(prosp_loc_schema).load('/user-home/1014/checking_models/response_rebuild/mailed_prosp_loc.csv'))
prospect_loc.show(n=5)

#nest case to translate string to date
date_key = when(
        col("DATE_KEY_STR").isin('31MAY2017'), F.lit('2017-05-31').cast(DateType()))\
        .when(col("DATE_KEY_STR").isin('31JUL2017'), F.lit('2017-07-31').cast(DateType()))\
        .otherwise(F.lit('2017-10-31').cast(DateType()))


prospect_loc=(prospect_loc.withColumn('date_key',date_key))
                                     
prospect_loc.show(n=5)
print(prospect_loc.count())

+--------------------+--------+---------+------------+
| Person_Seq_No_1_183|     LAT|      LNG|DATE_KEY_STR|
+--------------------+--------+---------+------------+
|0C008227031193001...|26.14206|-80.32132|   31OCT2017|
|0C0082270313E8007...|26.17931|-80.25072|   31OCT2017|
|3001324C0313E6007...|30.18591|-81.71905|   31OCT2017|
|0D00766A03118D000...|30.18591|-81.71905|   31OCT2017|
|0C00853A03138A005...|26.16278|-81.79096|   31OCT2017|
+--------------------+--------+---------+------------+
only showing top 5 rows

+--------------------+--------+---------+------------+----------+
| Person_Seq_No_1_183|     LAT|      LNG|DATE_KEY_STR|  date_key|
+--------------------+--------+---------+------------+----------+
|0C008227031193001...|26.14206|-80.32132|   31OCT2017|2017-10-31|
|0C0082270313E8007...|26.17931|-80.25072|   31OCT2017|2017-10-31|
|3001324C0313E6007...|30.18591|-81.71905|   31OCT2017|2017-10-31|
|0D00766A03118D000...|30.18591|-81.71905|   31OCT2017|2017-10-31|
|0C00853A03138A005

<h3>Load SNL locations</h3>

In [4]:
%%time
snl_loc = (sqlContext.read.format("com.databricks.spark.csv").options(header= "true",delimiter=',',dateFormat="MM/dd/yyyy").load('/user-home/1014/checking_models/response_rebuild/snl_2018.csv'))
snl_loc=(snl_loc.select('SNL_Branch_Key','Parent_Company_Name','Latitude_degrees','Longitude_degrees','Branch_Opened_mmddyyyy','Branch_Closed_mmddyyyy','TYPE','Branch_Tape_Deposits'))


###UPDATE COLUMN TYPES
#https://stackoverflow.com/questions/38080748/convert-pyspark-string-to-date-format

snl_loc=(snl_loc.withColumn('Latitude_degrees',snl_loc.Latitude_degrees.cast(FloatType()))\
         .withColumn('Longitude_degrees',snl_loc.Longitude_degrees.cast(FloatType()))\
         .withColumn('Branch_Tape_Deposits',regexp_replace('Branch_Tape_Deposits', ',', '').cast(FloatType()))\
         .withColumn('Branch_Opened_mmddyyyy',to_date(from_unixtime(unix_timestamp('Branch_Opened_mmddyyyy', 'MM/dd/yyy'))))\
         .withColumn('Branch_Closed_mmddyyyy',to_date(from_unixtime(unix_timestamp('Branch_Closed_mmddyyyy', 'MM/dd/yyy'))))\
        )
snl_loc=(snl_loc.fillna(0, subset=['Branch_Tape_Deposits']))


#create features for type of branches
        
snl_loc=(snl_loc.withColumn('fifththird', when(snl_loc.Parent_Company_Name.isin('Fifth Third Bancorp'),1).otherwise(0)))
snl_loc=(snl_loc.withColumn('peer', when(snl_loc.Parent_Company_Name.isin('BB&T Corporation',
                                                                        'Capital One Financial Corporation',
                                                                        'Comerica Incorporated',
                                                                        'Huntington Bancshares Incorporated',
                                                                        'KeyCorp',
                                                                        'M&T Bank Corporation',
                                                                        'PNC Financial Services Group, Inc.',
                                                                        'Regions Financial Corporation',
                                                                        'SunTrust Banks, Inc.',
                                                                        'U.S. Bancorp',
                                                                        'Zions Bancorporation')
                                                                        ,1).otherwise(0)))


snl_loc=(snl_loc.withColumn('core', when(snl_loc.Parent_Company_Name.isin('Wells Fargo & Company',
                                                                        'PNC Financial Services Group, Inc.',
                                                                        'Bank of America Corporation',
                                                                        'JPMorgan Chase & Co.',
                                                                        'BB&T Corporation',
                                                                        'SunTrust Banks, Inc.',
                                                                        'Regions Financial Corporation',
                                                                        'U.S. Bancorp',
                                                                        'Huntington Bancshares Incorporated')
                                                                        ,1).otherwise(0)))


snl_loc=(snl_loc.withColumn('tril', when(snl_loc.Parent_Company_Name.isin('Wells Fargo & Company',
                                                                        'JPMorgan Chase & Co.',
                                                                        'Bank of America Corporation')
                                                                        ,1).otherwise(0)))

snl_loc=(snl_loc.withColumn('cu', when(snl_loc.TYPE.isin('Credit Union') ,1).otherwise(0)))






snl_loc.show(10)
print(snl_loc.count())

+--------------+--------------------+----------------+-----------------+----------------------+----------------------+------------+--------------------+----------+----+----+----+---+
|SNL_Branch_Key| Parent_Company_Name|Latitude_degrees|Longitude_degrees|Branch_Opened_mmddyyyy|Branch_Closed_mmddyyyy|        TYPE|Branch_Tape_Deposits|fifththird|peer|core|tril| cu|
+--------------+--------------------+----------------+-----------------+----------------------+----------------------+------------+--------------------+----------+----+----+----+---+
|       1088811|@lantec Financial...|        36.92347|       -76.316376|            1952-01-01|            2004-06-30|Credit Union|                 0.0|         0|   0|   0|   0|  1|
|       1079711|1199 SEIU Federal...|         40.7581|        -73.99004|            1984-01-13|            2003-06-30|Credit Union|                 0.0|         0|   0|   0|   0|  1|
|       1079772|1199 SEIU Federal...|       40.758034|        -73.99006|            1

In [5]:
prospect_loc.createOrReplaceTempView("tbl_1")
snl_loc.createOrReplaceTempView("tbl_2")
spark = SparkSession.builder.getOrCreate()

In [8]:
%%time
sqlDf_3miles = spark.sql("select a.Person_SEQ_No_1_183, a.DATE_KEY_STR, sum(b.fifththird) as fifththird, sum(b.peer) as peer, sum(b.tril) as tril, sum(b.cu) as cu \
                         FROM tbl_1 a inner join tbl_2 b \
                        on (b.Branch_Closed_mmddyyyy is null or Branch_Closed_mmddyyyy > a.date_key) \
                        and (b.Branch_Opened_mmddyyyy is null or Branch_Opened_mmddyyyy < a.date_key)\
                        and 7921.6623*asin( sqrt(pow(sin((b.Latitude_degrees-a.LAT)/2.0),2.0)+cos(a.LAT)*cos(b.Latitude_degrees)*pow(sin((b.Longitude_degrees-a.LNG)/2.0),2.0))) <=3 \
                        group by a.Person_SEQ_No_1_183, a.DATE_KEY_STR")

CPU times: user 521 µs, sys: 2.7 ms, total: 3.22 ms
Wall time: 85.4 ms


In [6]:
sqlDf_5miles = spark.sql("select a.Person_SEQ_No_1_183, a.DATE_KEY_STR, sum(b.fifththird) as fifththird, sum(b.peer) as peer, sum(b.tril) as tril, sum(b.cu) as cu \
                         FROM tbl_1 a cross join tbl_2 b \
                        where (b.Branch_Closed_mmddyyyy is null or Branch_Closed_mmddyyyy > a.date_key) \
                        and (b.Branch_Opened_mmddyyyy is null or Branch_Opened_mmddyyyy < a.date_key)\
                        and 7921.6623*asin( sqrt(pow(sin((b.Latitude_degrees-a.LAT)/2.0),2.0)+cos(a.LAT)*cos(b.Latitude_degrees)*pow(sin((b.Longitude_degrees-a.LNG)/2.0),2.0))) <=5 \
                        group by a.Person_SEQ_No_1_183, a.DATE_KEY_STR")

sqlDf_7miles = spark.sql("select a.Person_SEQ_No_1_183, a.DATE_KEY_STR, sum(b.fifththird) as fifththird, sum(b.peer) as peer, sum(b.tril) as tril, sum(b.cu) as cu \
                         FROM tbl_1 a cross join tbl_2 b \
                        where (b.Branch_Closed_mmddyyyy is null or Branch_Closed_mmddyyyy > a.date_key) \
                        and (b.Branch_Opened_mmddyyyy is null or Branch_Opened_mmddyyyy < a.date_key)\
                        and 7921.6623*asin( sqrt(pow(sin((b.Latitude_degrees-a.LAT)/2.0),2.0)+cos(a.LAT)*cos(b.Latitude_degrees)*pow(sin((b.Longitude_degrees-a.LNG)/2.0),2.0))) <=7 \
                        group by a.Person_SEQ_No_1_183, a.DATE_KEY_STR")

CPU times: user 1.21 ms, sys: 2.54 ms, total: 3.75 ms
Wall time: 300 ms


In [9]:
sqlDf_3miles_pd=sqlDf_3miles.toPandas()

KeyboardInterrupt: 

In [None]:
sqlDf_distance = spark.sql("select a.Person_SEQ_No_1_183, a.DATE_KEY_STR, b.SNL_Branch_Key, 7921.6623*asin( sqrt(pow(sin((b.Latitude_degrees-a.LAT)/2.0),2.0)+cos(a.LAT)*cos(b.Latitude_degrees)*pow(sin((b.Longitude_degrees-a.LNG)/2.0),2.0))) as distance \
                         FROM tbl_1 a cross join tbl_2 b \
                        where (Branch_Closed_mmddyyyy is null or Branch_Closed_mmddyyyy > date_key) \
                        and (Branch_Opened_mmddyyyy is null or Branch_Opened_mmddyyyy < date_key)\
                        and 7921.6623*asin( sqrt(pow(sin((b.Latitude_degrees-a.LAT)/2.0),2.0)+cos(a.LAT)*cos(b.Latitude_degrees)*pow(sin((b.Longitude_degrees-a.LNG)/2.0),2.0))) <=30")

In [None]:
window = Window.\
              partitionBy('Person_SEQ_No_1_183', 'DATE_KEY_STR').\
              orderBy(sqlDf_distance['distance'])

sqlDf_ranked_dist = sqlDf_distance.withColumn("rank_based_on_distance",rank().over(window))
df_w_least = sqlDf_ranked_dist.filter(sqlDf_ranked_dist.rank_based_on_distance==1)