In [140]:
import os
import sys
from shapely.geometry import mapping
from polygon_utils import may_intersect, intersection_area_bool
from utils import to_poly, cleaned_polygon
from past.builtins import xrange
from functools import reduce

# NOTE: please change the path to current setup in your system
# windows
if sys.platform.startswith('win'):
    # location where all the programs will be stored this project location
    os.chdir(r"C:\Users\Diwakar Sharma\Pictures\PySparkV2")
    
    # path of spark installation
    os.environ['SPARK_HOME'] = r"C:\spark"

# do same  if there are other platforms
os.curdir

# Create variable for root path
SPARK_HOME = os.environ['SPARK_HOME']
os.environ['PYSPARK_PYTHON'] = r"C:\Users\Diwakar Sharma\Pictures\anaconda3\python.exe"

# Add following paths
sys.path.insert(0,os.path.join(SPARK_HOME, "python"))
sys.path.insert(0,os.path.join(SPARK_HOME, "python", "lib"))
sys.path.insert(0,os.path.join(SPARK_HOME, "python", "lib", "pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME, "python", "lib", "py4j-0.10.7-src.zip"))

# Initialize spark session and spark context
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
from functools import reduce
from pyspark.sql import functions as F
from pyspark.sql.functions import lit
from pyspark.sql import DataFrame
from pyspark.sql.types import BooleanType

# create Spark Session
spSession = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Pyspark Tuts") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.cores.max", "4") \
    .config("spark.driver.maxResultSize", "8g") \
    .config("spark.sql.broadcastTimeout", 3600) \
    .config("spark.network.timeout", 10000000) \
    .config("spark.executor.heartbeatInterval", 10000000) \
    .config("spark.sql.crossJoin.enabled", "true") \
    .config("spark.worker.timeout", 3600) \
    .config("spark.sql.warehouse.dir", "file:///C:/temp/spark-warehouse") \
    .getOrCreate()

# get the spark context from Spark Session
sparkContext = spSession.sparkContext
# -*- coding: utf-8 -*-
spSession.sparkContext.setLogLevel('INFO')



In [141]:

########################################
# udf to map suburb to forest area
########################################
may_intersect_udf = F.udf(lambda x, y: may_intersect(x, y), BooleanType())

########################################
# load urbanForest datasets
# for testing purpose only a subset is  
# loaded due to computing infrastructure
########################################
urban_forest_rdd_part_zero = sparkContext.textFile("file:///C:/Users/Diwakar Sharma/Pictures/PySparkV2/melb_urban_forest_2016.txt/test1").repartition(5)
print(urban_forest_rdd_part_zero.count())
urban_forest_data_zero = urban_forest_rdd_part_zero.map(lambda l: l.split("\""))
urban_forest_map_zero = urban_forest_data_zero.map(lambda x: ((x[1], x[3])))
urban_forest_map_zero = urban_forest_map_zero.map(lambda x: (x[0], 
                                                  cleaned_polygon(x[1])))
urban_forest_map_zero = urban_forest_map_zero.map(lambda x: (float(x[0]), 
                                                  to_poly(x[1])))
urban_forest_cleaned_zero = urban_forest_map_zero.toDF(['ForestArea', 'Coords'])

urban_forest_cleaned_zero.show(10)

242
+----------------+--------------------+
|      ForestArea|              Coords|
+----------------+--------------------+
|3.98043559067884|[[144.94191893846...|
|9.17814705681699|[[144.94190895719...|
|  3.007072111269|[[144.94187567637...|
|3.86014890925862|[[144.94190080796...|
|4.14718749660807|[[144.94188538484...|
|1.46492895511503|[[144.97152597558...|
|3.69308797904325|[[144.94186984499...|
|3.75804691422453|[[144.94184858513...|
|5.49937282149683|[[144.94125470898...|
|5.70700410008907|[[144.94125593388...|
+----------------+--------------------+
only showing top 10 rows



In [142]:
########################################
# read json for inner city as DataFrame
########################################
inner_city_df = spSession.read.json("file:///C:/Users/Diwakar Sharma/Pictures/PySparkV2/melb_inner_2016.json")
drop_cols = ["gcc_code16", "gcc_name16", "sa1_7dig16", "sa1_main16", 
             "sa2_5dig16", "sa2_main16", "gcc_code16", "gcc_name16",
             "sa1_7dig16", "sa1_main16", "sa2_5dig16", "sa2_main16",
             "sa3_code16", "sa3_name16", "sa4_code16", "sa4_name16",
             "ste_code16", "ste_name16", "type"]
inner_city_df = inner_city_df.drop(*drop_cols)
oldColumns = inner_city_df.schema.names
newColumns = ["Area", "geometry", "Suburb"]
cleaned_city_df = reduce(lambda inner_city_df, idx: inner_city_df.
                         withColumnRenamed(oldColumns[idx], newColumns[idx]),
                         xrange(len(oldColumns)), inner_city_df)
cleaned_city_df = cleaned_city_df.select("Area", "geometry.coordinates", "Suburb")
melbourne_stats_df = cleaned_city_df.rdd.map(lambda x: 
                            (float(x[0]), to_poly(x[1]), x[2])).\
                             toDF(["Area", "Coordinates", "Suburb"])

# find total area by suburb
total_area_df = melbourne_stats_df.groupBy("Suburb").agg({"Area": "sum"}).\
                withColumnRenamed("sum(Area)", "TotalArea")

total_area_df.show(5)

+---------------+------------------+
|         Suburb|         TotalArea|
+---------------+------------------+
| Brunswick West| 3.179500000000001|
|South Melbourne|            2.4948|
|      Brunswick| 5.142499999999999|
|     Ascot Vale|            3.8361|
|  St Kilda East|2.4135000000000004|
+---------------+------------------+
only showing top 5 rows



In [143]:
# cross join to get suburb mappings with ForestArea/TreeCanopy

joined_urban_forest_df = urban_forest_cleaned_zero.crossJoin(melbourne_stats_df).where                              (may_intersect_udf(urban_forest_cleaned_zero.Coords,                                                          melbourne_stats_df.Coordinates))
joined_urban_forest_df = joined_urban_forest_df.distinct().groupBy("Suburb").agg(                                    {"ForestArea": "sum"}).withColumnRenamed(
                            "sum(ForestArea)", "ForestArea")
joined_urban_forest_df = joined_urban_forest_df.repartition("Suburb")
joined_urban_forest_df.show(5)


+--------------------+------------------+
|              Suburb|        ForestArea|
+--------------------+------------------+
|           Parkville| 5525.531878288416|
|Flemington Raceco...|161.57590240909337|
|           Docklands| 4713.978738002249|
|Carlton North - P...|3039.1032570743387|
|   Kensington (Vic.)|223.64138858032229|
+--------------------+------------------+
only showing top 5 rows



In [144]:

polygon_list_df = melbourne_stats_df.select("Coordinates")

#self join
joined_cross_geometry = melbourne_stats_df.crossJoin(polygon_list_df).where(
                        intersection_udf(melbourne_stats_df.Coordinates,                                                polygon_list_df.Coordinates))

joined_cross_geometry = joined_cross_geometry.withColumn("IArea", lit(0)).\
                         repartition("Suburb")
joined_intersection = joined_cross_geometry.rdd.map(lambda x: (x.Suburb,
                      x.IArea + intersection_area(x.Coordinates, x.Coordinates)
                    )).toDF(["Suburb", "IArea"])
intersection_area_df = joined_intersection.groupBy("Suburb").agg({"IArea": "sum"})                                .withColumnRenamed("sum(IArea)", "IntersectionArea")
print("Intersection Area By Surbu")
intersection_area_df.show(5)


Intersection Area By Surbu
+---------------+-------------------+
|         Suburb|   IntersectionArea|
+---------------+-------------------+
| Brunswick West|0.40106951704206234|
|South Melbourne| 0.3150696804359202|
|      Brunswick|  0.648886622257908|
|     Ascot Vale|0.48410477340804864|
|  St Kilda East|0.30491464608960345|
+---------------+-------------------+
only showing top 5 rows



In [145]:
# calculate actual area from intersection area
actual_area_df = total_area_df.join(intersection_area_df, ["Suburb"])
actual_area_df = actual_area_df.rdd.map(lambda x: (x.Suburb, (x.TotalArea - x.IntersectionArea))).toDF(["Suburb", "ActualArea"])

In [146]:
# join urban forest DF having Suburb and ForestArea to melbourne stats having Actual Area and Suburb
forest_area_by_suburb_joined_df = actual_area_df.join(joined_urban_forest_df, ["Suburb"])
print(forest_area_by_suburb_joined_df.count())
forest_area_by_suburb_joined_df.show(5)

10
+--------------------+------------------+------------------+
|              Suburb|        ActualArea|        ForestArea|
+--------------------+------------------+------------------+
|           Parkville|3.5381872704130544| 5525.531878288416|
|Flemington Raceco...|1.4935617358779143|161.57590240909337|
|           Docklands|2.1354409067835927| 4713.978738002249|
|Carlton North - P...|2.0133882063585213|3039.1032570743387|
|   Kensington (Vic.)|1.8759950691214007|223.64138858032229|
+--------------------+------------------+------------------+
only showing top 5 rows



In [147]:
forest_area_by_suburb_joined_df = forest_area_by_suburb_joined_df.groupBy(
                                    "Suburb").sum("ForestArea", "ActualArea")


In [151]:
forest_area_by_suburb_joined_df = forest_area_by_suburb_joined_df.withColumnRenamed("sum(ForestArea)", "ForestArea").withColumnRenamed("sum(ActualArea)", "ActualArea")


In [152]:
forest_area_by_suburb_joined_df.schema.names

['Suburb', 'ForestArea', 'ActualArea']

In [153]:
# find greenery ratio by Suburb
greenest_suburb = forest_area_by_suburb_joined_df.rdd.map(lambda x: (x.Suburb, 
                     (x.ForestArea/x.ActualArea)*0.0001)).\
                     toDF(["Suburb", "GreenRatio"])

In [154]:
# print greenest Suburb
greenest_suburb.orderBy(F.col("GreenRatio").desc()).first()

Row(Suburb='Docklands', GreenRatio=0.22074966921479733)