In [1]:
!pip install Shapely==1.6.4.post2
!pip install pyspark==2.3.1

Collecting Shapely==1.6.4.post2
[?25l  Downloading https://files.pythonhosted.org/packages/38/b6/b53f19062afd49bb5abd049aeed36f13bf8d57ef8f3fa07a5203531a0252/Shapely-1.6.4.post2-cp36-cp36m-manylinux1_x86_64.whl (1.5MB)
[K    100% |████████████████████████████████| 1.5MB 17.3MB/s ta 0:00:01
[?25hInstalling collected packages: Shapely
Successfully installed Shapely-1.6.4.post2
[33mYou are using pip version 10.0.1, however version 19.3.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
Collecting pyspark==2.3.1
[?25l  Downloading https://files.pythonhosted.org/packages/ee/2f/709df6e8dc00624689aa0a11c7a4c06061a7d00037e370584b9f011df44c/pyspark-2.3.1.tar.gz (211.9MB)
[K    100% |████████████████████████████████| 211.9MB 225kB/s eta 0:00:01
Building wheels for collected packages: pyspark
  Running setup.py bdist_wheel for pyspark ... [?25ldone
[?25h  Stored in directory: /home/ec2-user/.cache/pip/wheels/37/48/54/f1b63f0dbb729e20c92f1bbcf1c5

In [2]:
import json
import csv
import re
from pyspark import SparkContext
from pyspark.sql import SQLContext, functions
from pyspark.sql.types import StructType, StructField, FloatType, StringType, BooleanType, ArrayType
from polygon_utils import *
from shapely.geometry import shape, Point, mapping
from shapely.wkt import loads, dumps

In [3]:
# Roll-up polygons to suburban boundaries
suburbs = {}
with open('./melb_inner_2016.json') as json_file:
    for line in json_file:
        feat = json.loads(line)
        suburb = feat['sa2_name16']
        new_geom = shape(feat['geometry'])    
        if not suburbs or suburb not in suburbs:
            suburbs[suburb] = new_geom
        else:
            suburbs[suburb] = unary_union([suburbs[suburb], new_geom])

melb_suburb_2016 = []
for suburb, geom in suburbs.items():
    #melb_suburb_2016.append({'suburb_name': suburb, 'poly': mapping(geom)['coordinates']})
    melb_suburb_2016.append({'suburb_name': suburb, 'poly': dumps(geom)})

In [23]:
melb_suburb_2016[0:3]

[{'suburb_name': 'Brunswick',
  'poly': 'POLYGON ((144.9497404700000516 -37.7627689749999718, 144.9500285280000753 -37.7610530269999458, 144.9501800760000378 -37.7601691149999397, 144.9503262220000579 -37.7593159509999623, 144.9500848470000847 -37.7592873859999827, 144.9501560590000508 -37.7589317609999213, 144.9503060370000185 -37.7580953759999787, 144.9504391530000476 -37.7573778719999495, 144.9506517650000887 -37.7562328519999824, 144.9506612920000634 -37.7561964989999410, 144.9506739580000385 -37.7561607389999310, 144.9506896890001144 -37.7561257369999410, 144.9506987050000362 -37.7561086059999411, 144.9507189620001100 -37.7560751199999345, 144.9507302040000241 -37.7560588219999431, 144.9507421030000387 -37.7560428379999848, 144.9507680180000762 -37.7560119059999693, 144.9507965610000610 -37.7559824539999411, 144.9508117820000734 -37.7559683569999720, 144.9508275860000595 -37.7559546669999690, 144.9508609830000978 -37.7559286559999805, 144.9508785400000761 -37.7559163529999751, 144

In [5]:
with open('./melb_suburb_2016.json', 'w') as suburb_json:
    json.dump(melb_suburb_2016, suburb_json)

In [6]:
sc = SparkContext()
sc.addPyFile('polygon_utils.py')
sqlc = SQLContext(sc)

In [7]:
# Load suburban polygons
sa2 = sqlc.read.json('./melb_suburb_2016.json')

In [8]:
# Load urban forests
csv_schema = StructType([
    StructField('area', FloatType(), nullable = False),
    StructField('the_geom', StringType(), nullable = False)
])
urban_forest = sqlc.read.csv('./melb_urban_forest_2016.txt/part-0000[0-5]', schema = csv_schema, sep=' ')

In [9]:
# Fix up the_geom as proper WKT
#insert_comma = re.compile('\s([^\s]*)\s([^\s]*)')
insert_comma = re.compile('(\s[^\s]*)\s')
def insert_commas(the_geom):
    gtype, lonlat = the_geom.split(' ', 1)
    gtype = gtype.strip()
    #lonlat = insert_comma.sub(r' (\1,\2)', ' ' + lonlat.strip()).strip()
    lonlat = insert_comma.sub(r'\1, ', lonlat.strip())
    #return f'{lonlat}'
    return f'{gtype} {lonlat}'

urban_forest_fixed = urban_forest.withColumn('new_geom', functions.udf(insert_commas, StringType())('the_geom'))\
.drop('the_geom').withColumnRenamed('new_geom', 'the_geom').drop('area')

In [10]:
def get_rect(poly):
    return loads(poly).bounds

get_rect_udf = functions.udf(get_rect, ArrayType(FloatType()))

urban_forest_fixed = urban_forest_fixed.withColumn('urban_forest_rect', get_rect_udf('the_geom'))
sa2 = sa2.withColumn('sa2_rect', get_rect_udf('poly'))

In [11]:
def get_area(poly):
    return loads(poly).area

get_area_udf = functions.udf(get_area, FloatType())

sa2 = sa2.withColumn('sa2_area', get_area_udf('poly'))

In [12]:
def may_intersect(rect1, rect2):
    min_lon_1, min_lat_1, max_lon_1, max_lat_1 = rect1
    min_lon_2, min_lat_2, max_lon_2, max_lat_2 = rect2
    return min_lat_1 <= max_lat_2 and \
           max_lon_1 >= min_lon_2 and \
           max_lat_1 >= min_lat_2 and \
           min_lon_1 <= max_lon_2

sa2_p1 = sa2.repartition(1)
urban_forest_p10 = urban_forest_fixed.repartition(10)
urban_forest_in_sa2 = sa2_p1.crossJoin(urban_forest_p10).where(functions.udf(may_intersect, BooleanType())\
                                                  (sa2_p1.sa2_rect, urban_forest_p10.urban_forest_rect))

In [13]:
urban_forest_in_sa2 = urban_forest_in_sa2.drop('sa2_rect').drop('urban_forest_rect')

In [25]:
def get_veg_rate(sa2_geom, urban_forest_geom, sa2_area):
    return (loads(sa2_geom).intersection(loads(urban_forest_geom))).area * 100 / sa2_area

suburban_vegetation_rate = urban_forest_in_sa2.withColumn('urban_forest_%_contribution_to_sa2',\
                                                     functions.udf(get_veg_rate, FloatType())\
                                                     ('poly', 'the_geom', 'sa2_area'))\
.drop('poly').drop('the_geom').drop('sa2_area')

In [24]:
suburban_vegetation_rate.filter(suburban_vegetation_rate.suburb_name == 'Parkville').orderBy('urban_forest_%_contribution_to_sa2', ascending=False).head()

Row(suburb_name='Parkville', urban_forest_%_contribution_to_sa2=0.010033028200268745)

In [26]:
answer = suburban_vegetation_rate.groupBy('suburb_name').sum('urban_forest_%_contribution_to_sa2')

In [27]:
answer = answer.orderBy('sum(urban_forest_%_contribution_to_sa2)', ascending=False)

In [28]:
answer.show()

+--------------------+---------------------------------------+
|         suburb_name|sum(urban_forest_%_contribution_to_sa2)|
+--------------------+---------------------------------------+
|           Parkville|                     21.555475844945175|
|  South Yarra - West|                     21.039264185434604|
|           Southbank|                     20.513264408737314|
|      East Melbourne|                     18.803033386359402|
|             Carlton|                     15.577596197850198|
|   Kensington (Vic.)|                     13.427357967894181|
|     North Melbourne|                     13.090631428828715|
|           Melbourne|                      8.802424025178425|
|Flemington Raceco...|                      6.956917156325289|
|Carlton North - P...|                     6.8811071720233485|
|Port Melbourne In...|                     2.8305038956353883|
|           Docklands|                      2.672835074405217|
|      West Melbourne|                      2.075007471