In [91]:
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit, arrays_zip, array
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql.types import *

import random

import socket

import pandas as pd
import numpy as np
import json

import utilities

In [2]:
driver_ip = socket.gethostbyname(socket.gethostname())

In [3]:
conf = pyspark.SparkConf().setAll([('spark.kubernetes.authenticate.caCertFile', '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'), \
                                   ('spark.kubernetes.authenticate.oauthTokenFile','/var/run/secrets/kubernetes.io/serviceaccount/token'), \
                                   ('spark.kubernetes.authenticate.driver.serviceAccountName','spark-driver-sa'), \
                                   ('spark.kubernetes.namespace','spark'), \
                                   ('spark.driver.pod.name','spark-driver'), \
                                   ('spark.executor.instances','3'), \
                                   ('spark.kubernetes.container.image','gcr.io/sarcasm-3wx3ce6drvftuy/spark-v2.4.4-worker:latest'), \
                                   ('spark.driver.host','spark-driver.spark.svc.cluster.local'), \
                                   ('spark.driver.port','29413'), \
                                   ('spark.driver.bindAddress',driver_ip), \
                                   ('spark.executor.memory','6500m'), \
                                   ('spark.executor.cores','1'), \
                                   ('spark.kubernetes.driverEnv.GCS_PROJECT_ID', 'sarcasm-3wx3ce6drvftuy'), \
                                   ('spark.kubernetes.driverEnv.GOOGLE_APPLICATION_CREDENTIALS', '/mnt/secrets/sarc-bucket-sa.json'), \
                                   ('spark.kubernetes.driver.secrets.sarc-bucket-sa','/mnt/secrets'), \
                                   ('spark.kubernetes.executor.secrets.sarc-bucket-sa','/mnt/secrets'), \
                                   ('spark.executorEnv.GCS_PROJECT_ID','sarcasm-3wx3ce6drvftuy'), \
                                   ('spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS','/mnt/secrets/sarc-bucket-sa.json'), \
                                   ('spark.hadoop.google.cloud.auth.service.account.enable','true'), \
                                   ('spark.hadoop.google.cloud.auth.service.account.json.keyfile','/mnt/secrets/sarc-bucket-sa.json'), \
                                   ('spark.hadoop.fs.gs.project.id','sarcasm-3wx3ce6drvftuy'), \
                                   ('spark.hadoop.fs.gs.system.bucket','sarc-bucket-3wx3ce6drvftuy')])

In [4]:
spark = SparkSession.builder.master("k8s://https://kubernetes.default.svc.cluster.local:443").appName("sarc").config(conf=conf).getOrCreate()
sc = spark.sparkContext

In [8]:
%time addresses = spark.read.csv("gs://sarc-bucket-3wx3ce6drvftuy/address_book.csv", inferSchema=True, sep = ',', header=True)


CPU times: user 3.26 ms, sys: 1.28 ms, total: 4.54 ms
Wall time: 3.94 s


In [9]:
addresses.show(10)

+---------+---------+---------+---------+----------------+------+-------+--------------------+------------------+------------------+
|ADDRNOCOM|STNAMEPRD|   STNAME|STNAMEPOT|       PLACENAME|USPSST|   ZIP5|           ADDRDELIV|         LONGITUDE|          LATITUDE|
+---------+---------+---------+---------+----------------+------+-------+--------------------+------------------+------------------+
|     4589|     WEST|  HIGGINS|     ROAD| Hoffman Estates|    IL|60192.0|4589 WEST HIGGINS...|       -88.1904381|        42.0741614|
|      125|    SOUTH|    HOUGH|   STREET|      Barrington|    IL|60010.0|125 SOUTH HOUGH S...|        -88.135805|        42.1534528|
|      104|     null|   QUEENS|     COVE|      Barrington|    IL|60010.0|     104 QUEENS COVE|       -88.1226772|42.147437200000006|
|     1732|     WEST|ALGONQUIN|     ROAD| Hoffman Estates|    IL|60120.0|1732 WEST ALGONQU...|       -88.1229055|        42.0928782|
|     1736|     WEST|ALGONQUIN|     ROAD| Hoffman Estates|    IL|6012

In [12]:
chi_addresses = addresses.filter(addresses["PLACENAME"]=="Chicago")

In [13]:
chi_addresses = chi_addresses.drop("ADDRNOCOM")
chi_addresses = chi_addresses.drop("STNAMEPRD")
chi_addresses = chi_addresses.drop("STNAME")
chi_addresses = chi_addresses.drop("STNAMEPOT")
chi_addresses = chi_addresses.drop("PLACENAME")
chi_addresses = chi_addresses.drop("USPSST")
chi_addresses = chi_addresses.drop("ZIP5")
chi_addresses.show(10)



+--------------------+------------------+------------------+
|           ADDRDELIV|         LONGITUDE|          LATITUDE|
+--------------------+------------------+------------------+
|7042 NORTH OZARK ...|       -87.8203129|        42.0090399|
|6908 NORTH OWEN A...|       -87.8192777|        42.0057771|
|6947 NORTH OLCOTT...|       -87.8133717|         42.007494|
|7420 NORTH ORIOLE...|       -87.8166382|42.015759700000004|
|7401 NORTH OTTAWA...|       -87.8172162|42.015235700000005|
|7121 NORTH OZANAM...|       -87.8209495|        42.0099599|
|7115 NORTH OTTAWA...|       -87.8173057|        42.0100246|
|7405 NORTH OTTAWA...|-87.81725899999999|        42.0153586|
|7508 NORTH OSCEOL...|-87.81294270000001|42.017315999999994|
|6813 NORTH OZARK ...|        -87.820401|          42.00452|
+--------------------+------------------+------------------+
only showing top 10 rows



In [63]:
chi_addresses = chi_addresses.filter(chi_addresses["LATITUDE"].isNotNull())
chi_addresses = chi_addresses.filter(chi_addresses["LONGITUDE"].isNotNull())

In [64]:
with open('dicts/zoning_polys.json','r') as f:
    zoning = json.load(f)

In [23]:
"""
Allowable zones:

B1 - Neighborhood Shopping District, **Low-Traffic Street, More Storefront-Like**
B2 - Neighborhood Mixed-Use District, "" ""
B3 - Community Shopping District 
C1 - Neighborhood Commercial District
C2 - Motor Vehicle-Related Commercial District
DC - Downtown Core District
DR - Downtown Residential District
DS - Downtown Service District
DX - Downtown Mixed-Use District
PD - Planned Development

"""

'\nAllowable zones:\n\nB1 - Neighborhood Shopping District, **Low-Traffic Street, More Storefront-Like**\nB2 - Neighborhood Mixed-Use District, "" ""\nB3 - Community Shopping District \nC1 - Neighborhood Commercial District\nC2 - Motor Vehicle-Related Commercial District\nDC - Downtown Core District\nDR - Downtown Residential District\nDS - Downtown Service District\nDX - Downtown Mixed-Use District\nPD - Planned Development\n\n'

In [15]:
allowable_zones = ["B1","B2","B3","C1","C2","DC","DR","DS","DX","PD"]

In [16]:
potential_zones = []
for i in range((len(list(zoning.keys())))):
    
    if zoning[list(zoning.keys())[i]][1][:2] in allowable_zones:
        potential_zones.append(list(zoning.keys())[i])
        
    
    

In [17]:
vals = [zoning[potential_zones[i]] for i in range(len(potential_zones))]


In [18]:
potential_zones = {i:j for (i,j) in zip(potential_zones, vals)}

In [19]:
potential_zones

{'row-jdxc_7s8u.uim2': [[[-87.71024494329944, 41.87679179024591],
   [-87.71017533609232, 41.87680982098899],
   [-87.71007717906045, 41.876837032688094],
   [-87.70995840840602, 41.87686940137096],
   [-87.70986776753541, 41.876893936607445],
   [-87.70979892451263, 41.87691227890078],
   [-87.70974032377826, 41.876927081527725],
   [-87.70965984049518, 41.876947750497],
   [-87.70965637988034, 41.87694864004993],
   [-87.7096571731452, 41.8769417154028],
   [-87.70965722690596, 41.87694124670255],
   [-87.70965430150487, 41.87694128323064],
   [-87.70956796248369, 41.87694236399029],
   [-87.70956750328092, 41.876921844390445],
   [-87.70956732460937, 41.87691386149636],
   [-87.7095670409318, 41.87690119524487],
   [-87.7095655140866, 41.87683164767299],
   [-87.70956406067798, 41.87676210049915],
   [-87.7095625338375, 41.87669255292559],
   [-87.70956100699941, 41.87662300535121],
   [-87.70955955359766, 41.876553458174854],
   [-87.70955802729796, 41.876483855716685],
   [-87.709

In [65]:
df = chi_addresses.toPandas()

In [66]:
df['LONGITUDE'].isna().value_counts()

False    582675
Name: LONGITUDE, dtype: int64

In [74]:
array(chi_addresses["LATITUDE"])

Column<b'array(LATITUDE)'>

In [100]:
coordinates = chi_addresses.select(arrays_zip(array(chi_addresses["LATITUDE"]), array(chi_addresses["LATITUDE"])).alias('COORDINATES'))


In [80]:
coordinates.show(5)

+--------------------+
|         COORDINATES|
+--------------------+
|[[42.0090399, 42....|
|[[42.0057771, 42....|
|[[42.007494, 42.0...|
|[[42.015759700000...|
|[[42.015235700000...|
+--------------------+
only showing top 5 rows



In [87]:
# Function to get rows at `rownums`
def getrows(df, rownums=None):
    return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])

# Get rows at positions 0 and 2.
a = getrows(coordinates, rownums=[0, 2]).collect()

In [99]:
(a[0][0][0][0],a[0][0][0][1])

(42.0090399, 42.0090399)

In [72]:
type(chi_addresses.LATITUDE)

pyspark.sql.column.Column

In [35]:
spark.udf.register("pointLookup", utilities.point_lookup)


<function utilities.point_lookup(polygon_dict, point)>

In [37]:
point_lookup_table = F.udf(utilities.point_lookup, ShortType())

In [102]:
point_lookup_table = udf(lambda x,y: utilities.point_lookup(x,y), StringType())

In [None]:
coordinates.select('COORDINATES', point_lookup_table(F.lit(potential_zones), 'COORDINATES'))

In [40]:
chi_addresses.select("LATITUDE","LONGITUDE").show(5)

+------------------+-----------+
|          LATITUDE|  LONGITUDE|
+------------------+-----------+
|        42.0090399|-87.8203129|
|        42.0057771|-87.8192777|
|         42.007494|-87.8133717|
|42.015759700000004|-87.8166382|
|42.015235700000005|-87.8172162|
+------------------+-----------+
only showing top 5 rows



In [50]:
chi_addresses.select(concat(col('LATITUDE'), lit(" , "), col("LONGITUDE"))).show(5)


+--------------------------------+
|concat(LATITUDE,  , , LONGITUDE)|
+--------------------------------+
|            42.0090399 , -87....|
|            42.0057771 , -87....|
|            42.007494 , -87.8...|
|            42.01575970000000...|
|            42.01523570000000...|
+--------------------------------+
only showing top 5 rows



In [None]:
%time chi_addresses.select("comment", n_comment_words_table("comment").alias("n_words")).show()

In [29]:

possible_coords = []

for i in range(len(address_coords)):

    result = utilities.point_lookup(potential_zones,chi_addresses)
    
    if result:
        possible_coords.append(address_coords[i])

KeyboardInterrupt: 