In [21]:
from pyspark.sql.functions import sum, mean, udf, col, round, max, greatest, count, when
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import rank, col
import pandas as pd
import numpy as np
import time
import matplotlib.pyplot as plt
%matplotlib inline

In [22]:
# ! curl -O https://raw.githubusercontent.com/NYCPlanning/db-pluto/master/pluto_build/data/dcp_zoning_maxfar.csv
# ! mv dcp_zoning_maxfar.csv data/dcp_zoning_maxfar.csv

In [23]:
df = spark.read.csv('data/pluto.csv', header=True)
new_far = spark.read.csv('data/dcp_zoning_maxfar.csv', header=True)
df = df.select([col(A).alias(A.lower()) for A in df.columns])

#type conversion, '-' --> null
for A in ['residfar','commfar']:
    new_far = new_far.withColumn(A, col(A).cast(DoubleType()))
    df = df.withColumn(A, col(A).cast(DoubleType()))

#create two copies
new_far1 = new_far.select([col(A).alias(A.lower()+'_1') for A in ['zonedist','residfar','commfar']])
new_far2 = new_far.select([col(A).alias(A.lower()+'_2') for A in ['zonedist','residfar','commfar']])

In [24]:
@udf
def pick_value(A,B):
    if not A: 
        return B
    else: 
        return A

In [25]:
df = df.join(new_far1, df['zonedist1'] == new_far1['zonedist_1'], how='left')\
       .join(new_far2, df['zonedist2'] == new_far2['zonedist_2'], how='left')\
       .withColumn('residfar_new', pick_value(col('residfar_1'), col('residfar_2')))\
       .withColumn('commfar_new', pick_value(col('commfar_1'), col('commfar_2')))\
       .withColumn('maxfar', greatest(col('residfar'), col('commfar')))\
       .withColumn('pctunbuilt', ((col('maxfar') - col('builtfar'))/col('maxfar')))\
       .withColumn('maxfar_new', greatest(col('residfar_new'), col('commfar_new')))\
       .withColumn('pctunbuilt_new', ((col('maxfar_new') - col('builtfar'))/col('maxfar_new')))\
       .filter(col('unitsres').cast(DoubleType()) <= 6)\
       .filter(col('landmark').isNull())\
       .filter(col('irrlotcode') != 'Y')\
       .filter(~col ('bldgclass').like('M%'))\
       .filter(col('landuse') != '08')\
       .filter(col('easements').cast(DoubleType()) <= 0)\

In [26]:
df.agg(sum(when(col('pctunbuilt').isNull(), 1)).alias('pctunbuilt_null'), 
      sum(when(col('pctunbuilt_new').isNull(), 1)).alias('pctunbuilt_new_null')).show()

+---------------+-------------------+
|pctunbuilt_null|pctunbuilt_new_null|
+---------------+-------------------+
|           3522|               3453|
+---------------+-------------------+



In [27]:
df.filter(col('maxfar') < col('maxfar_new'))\
    .select('zonedist1', 'maxfar', 'maxfar_new').distinct().show(df.count(), False)

+---------+------+----------+
|zonedist1|maxfar|maxfar_new|
+---------+------+----------+
|R3-2     |0.6   |1.0       |
|M1-6/R9  |0.0   |7.52      |
|R7-1     |3.44  |4.0       |
|R3-2     |0.6   |2.0       |
|R5       |1.25  |3.0       |
|R8B      |4.0   |6.0       |
|C8-3     |2.0   |3.44      |
|R3A      |0.6   |1.0       |
|C8-2     |2.0   |3.0       |
|R7-1     |3.44  |5.0       |
|R5D      |2.0   |3.4       |
|R2A      |0.5   |2.0       |
|PARK     |0.0   |3.0       |
|PARK     |0.0   |0.75      |
|R5B      |1.35  |4.0       |
|M1-2     |2.0   |6.02      |
|R3X      |0.6   |2.0       |
|R6B      |2.0   |3.4       |
|M1-1     |1.0   |1.35      |
|R6B      |2.0   |4.2       |
|M1-2     |2.0   |2.43      |
|R3-1     |0.6   |1.0       |
|C8-1     |1.0   |2.0       |
|R5       |1.25  |2.0       |
|R3A      |0.6   |3.4       |
|M1-4     |2.0   |4.0       |
|R6       |2.43  |3.4       |
|R5B      |1.35  |3.4       |
|R2       |0.5   |1.0       |
|R3-2     |0.6   |3.0       |
|M2-1     

In [28]:
df.filter(col('maxfar') < col('maxfar_new'))\
    .groupBy('zonedist1')\
    .agg(count('zonedist1').alias('counts'))\
    .sort(col('counts').desc()).show(df.count(), False)

+---------+------+
|zonedist1|counts|
+---------+------+
|R8A      |591   |
|M1-1     |133   |
|R3-2     |79    |
|R6B      |71    |
|R3A      |66    |
|R5       |59    |
|PARK     |52    |
|R4       |51    |
|C8-1     |51    |
|R6       |43    |
|R3X      |36    |
|C8-2     |36    |
|R4A      |20    |
|M1-4/R8A |20    |
|R4B      |19    |
|R5B      |18    |
|M1-6/R9  |18    |
|R2       |13    |
|R7-1     |11    |
|C8-3     |11    |
|R6A      |10    |
|M1-2     |10    |
|R3-1     |8     |
|M1-4/R7X |7     |
|R4-1     |7     |
|M1-4/R7D |5     |
|M1-5     |4     |
|R2A      |4     |
|M1-4     |3     |
|C8-4     |2     |
|R8B      |2     |
|M1-1/R6A |2     |
|R7-2     |2     |
|M3-1     |2     |
|R8       |1     |
|R10      |1     |
|M1-4D    |1     |
|R5D      |1     |
|M2-1     |1     |
|R7A      |1     |
+---------+------+



In [30]:
df.filter(col('zonedist1').like('R%'))\
   .select('maxfar_new', 'maxfar', 'builtfar')\
   .agg(mean('maxfar_new'),mean('maxfar'),mean('builtfar')).show()

+------------------+------------------+------------------+
|   avg(maxfar_new)|       avg(maxfar)|     avg(builtfar)|
+------------------+------------------+------------------+
|1.1085407715997209|1.1666834392287233|0.8391015326831988|
+------------------+------------------+------------------+



In [31]:
# By Borough Comparison
df.groupBy('borough')\
    .agg(sum(when(col('pctunbuilt_new') >= 0.5, 1)).alias('softsites_new'),
         sum(when(col('pctunbuilt') >= 0.5, 1)).alias('softsites'))\
    .withColumn('diff', col('softsites_new')-col('softsites'))\
    .sort(col('diff').desc()).show()

+-------+-------------+---------+-----+
|borough|softsites_new|softsites| diff|
+-------+-------------+---------+-----+
|     MN|         5575|     6106| -531|
|     BX|        20241|    21727|-1486|
|     BK|        48361|    50568|-2207|
|     SI|        18810|    23264|-4454|
|     QN|        33537|    41283|-7746|
+-------+-------------+---------+-----+



In [32]:
df.groupBy('cd')\
    .agg(sum(when(col('pctunbuilt_new') >= 0.5, 1)).alias('softsites_new'),
         sum(when(col('pctunbuilt') >= 0.5, 1)).alias('softsites'))\
    .withColumn('diff', col('softsites_new')-col('softsites'))\
    .sort(col('diff').desc()).show()

+---+-------------+---------+----+
| cd|softsites_new|softsites|diff|
+---+-------------+---------+----+
|302|         1018|     1008|  10|
|301|         2927|     2917|  10|
|206|         1670|     1666|   4|
|308|         1777|     1773|   4|
|309|         2402|     2399|   3|
|102|          262|      259|   3|
|303|         2916|     2913|   3|
|201|         1568|     1566|   2|
|202|         1459|     1457|   2|
|484|            9|        7|   2|
|316|         3911|     3909|   2|
|304|         2599|     2597|   2|
|110|          810|      808|   2|
|205|         1252|     1250|   2|
|595|           18|       16|   2|
|204|         1299|     1298|   1|
|306|         1740|     1739|   1|
|203|         1471|     1470|   1|
|207|         1223|     1223|   0|
|101|           45|       45|   0|
+---+-------------+---------+----+
only showing top 20 rows



In [40]:
df.groupBy('cd')\
    .agg(sum(when(col('pctunbuilt_new') >= 0.5, 1)).alias('softsites_new'),
         sum(when(col('pctunbuilt') >= 0.5, 1)).alias('softsites'))\
    .withColumn('diff', col('softsites_new')-col('softsites'))\
    .filter((~col('diff').isNull())).sort(col('diff')).show()

+---+-------------+---------+-----+
| cd|softsites_new|softsites| diff|
+---+-------------+---------+-----+
|412|         4827|     6697|-1870|
|503|         6591|     8251|-1660|
|501|         6623|     8234|-1611|
|502|         5577|     6762|-1185|
|413|         4771|     5918|-1147|
|410|         1881|     2919|-1038|
|318|         2686|     3721|-1035|
|407|         3080|     3906| -826|
|405|         2518|     3169| -651|
|212|         3408|     4003| -595|
|315|         2686|     3205| -519|
|409|         1861|     2309| -448|
|408|         1501|     1925| -424|
|414|         2463|     2876| -413|
|210|         2016|     2428| -412|
|211|         1779|     2180| -401|
|411|         2136|     2524| -388|
|108|          687|      972| -285|
|317|         4578|     4842| -264|
|106|          347|      536| -189|
+---+-------------+---------+-----+
only showing top 20 rows



In [33]:
df.agg(sum(when(col('pctunbuilt_new') > col('pctunbuilt'), 1)).alias('pctunbuilt_increase'), 
      sum(when(col('pctunbuilt_new') == col('pctunbuilt'), 1)).alias('pctunbuilt_unchanged'),
      sum(when(col('pctunbuilt_new') < col('pctunbuilt'), 1)).alias('pctunbuilt_decrease'),).show()

+-------------------+--------------------+-------------------+
|pctunbuilt_increase|pctunbuilt_unchanged|pctunbuilt_decrease|
+-------------------+--------------------+-------------------+
|               1192|              365409|             304494|
+-------------------+--------------------+-------------------+



In [34]:
df.groupBy().agg(sum(when(col('maxfar_new') > col('maxfar'), 1)).alias('increase'), 
      sum(when(col('maxfar_new') == col('maxfar'), 1)).alias('unchanged'),
      sum(when(col('maxfar_new') < col('maxfar'), 1)).alias('decrease'),).show()

+--------+---------+--------+
|increase|unchanged|decrease|
+--------+---------+--------+
|    1472|   356009|  315182|
+--------+---------+--------+



In [38]:
df.agg(sum(when(col('pctunbuilt') > 0.5, 1)).alias('old>0.5'),
       sum(when(col('pctunbuilt') == 0.5, 1)).alias('old=0.5'),
       sum(when(col('pctunbuilt') < 0.5, 1)).alias('old<0.5'),
       sum(when(col('pctunbuilt_new') > 0.5, 1)).alias('new>0.5'),
       sum(when(col('pctunbuilt_new') == 0.5, 1)).alias('new=0.5'),
       sum(when(col('pctunbuilt_new') < 0.5, 1)).alias('new<0.5')).show()

+-------+-------+-------+-------+-------+-------+
|old>0.5|old=0.5|old<0.5|new>0.5|new=0.5|new<0.5|
+-------+-------+-------+-------+-------+-------+
| 136442|   6506| 528148| 121180|   5344| 544641|
+-------+-------+-------+-------+-------+-------+

