In [38]:
from pyspark.sql.functions import mean, udf, col, round
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
import pandas as pd
import numpy as np
import time
import matplotlib.pyplot as plt
%matplotlib inline

In [39]:
spark_session  = SparkSession.builder\
                        .enableHiveSupport()\
                        .getOrCreate()

spark_session.conf.set("spark.executor.memory", '8g')
spark_session.conf.set('spark.executor.cores', '13')
spark_session.conf.set('spark.cores.max', '13')
spark_session.conf.set("spark.driver.memory",'8g')
sc = spark_session.sparkContext

## import csv files into spark dataframes
Note: both files contain records from all 5 boroughs

In [40]:
df1 = spark.read.csv('data/pluto.csv', header=True)
df2 = spark.read.csv('data/pluto_18v1.csv', header=True)

In [41]:
df1 = df1.select([col(A).alias(A.lower()) for A in df1.schema.names])
df2 = df2.select([col(A).alias(A.lower()) for A in df2.schema.names])

In [42]:
double_columns = ['bldgarea', 'facilfar',
                  'residfar', 'commfar', 'numbldgs', 'numfloors', 'bldgdepth', 
                  'bldgfront', 'lotdepth', 'lotfront', 
                  'exempttot', 'exemptland',  'assessland', 'assesstot', 'builtfar']

In [43]:
cols = df2.columns

In [44]:
df1 = df1.select(cols)
df2 = df2.select(cols)

## Type Conversion

In [45]:
for A in double_columns: 
    df1 = df1.withColumn(A, round(col(A).cast(DoubleType()), 2))
    df2 = df2.withColumn(A, round(col(A).cast(DoubleType()), 2))

## Change column names and joining dataframes
* since we are doing column comparison, we need to the column names of one df to avoid confusion
* in this case, we add "_1"_ to the name, e.g. "BBL" ==> "BBL_1"
* here we are doing an inner join because we only care about the BBL's that apeared in both dataframes

In [46]:
df1 = df1.select([col(A).alias(A+'_1') for A in df1.schema.names])

In [47]:
df1_names = df1.schema.names
df2_names = df2.schema.names
colnames = zip(df1_names, df2_names)

In [48]:
df = df2.join(df1, df2['bbl'] == df1['bbl_1'])
df0 = df

## Inspect top differences

In [12]:
@udf
def diff(col1,col2):
    try:
        return abs(col2-col1)
    except: 
        pass

In [13]:
spark.sql('set spark.sql.caseSensitive=true')
targets = ['unitsres','lotarea','bldgarea',
             'comarea','resarea',
             'officearea','retailarea',
             'garagearea','strgearea',
             'factryarea','otherarea']
for A in targets:
    df0 = df0.withColumn(A+'_diff', diff(col(A).cast(DoubleType()),col(A+'_1').cast(DoubleType())))

In [14]:
df0.select('bbl', col('unitsres_diff').cast(DoubleType())).orderBy('unitsres_diff', ascending=False).show(5)

+----------+-------------+
|       bbl|unitsres_diff|
+----------+-------------+
|2051410120|      10914.0|
|1013730001|       8018.0|
|2051350051|       4458.0|
|1008317502|       2916.0|
|3044520200|       2229.0|
+----------+-------------+
only showing top 5 rows



In [15]:
df0.select('bbl', col('comarea_diff').cast(DoubleType())).orderBy('comarea_diff', ascending=False).show(5)

+----------+------------+
|       bbl|comarea_diff|
+----------+------------+
|4070210001|   4168578.0|
|1007830070|   2689635.0|
|1007390001|   2161994.0|
|1009950005|   1642675.0|
|2027810500|   1598812.0|
+----------+------------+
only showing top 5 rows



In [16]:
df0.select('bbl', col('resarea_diff').cast(DoubleType())).orderBy('resarea_diff', ascending=False).show(5)

+----------+------------+
|       bbl|resarea_diff|
+----------+------------+
|1021060003|   2508580.0|
|3044520085|   1688760.0|
|3044520001|   1493793.0|
|3044520020|   1485000.0|
|3000860011|    878265.0|
+----------+------------+
only showing top 5 rows



In [17]:
df0.select('bbl', col('officearea_diff').cast(DoubleType())).orderBy('officearea_diff', ascending=False).show(5)

+----------+---------------+
|       bbl|officearea_diff|
+----------+---------------+
|1007830070|      2319046.0|
|1007390001|      2161994.0|
|1009950005|      1597675.0|
|1013010001|      1528689.0|
|1013000001|      1177646.0|
+----------+---------------+
only showing top 5 rows



In [18]:
df0.select('bbl', col('retailarea_diff').cast(DoubleType()))\
    .orderBy('retailarea_diff', ascending=False).show(5)

+----------+---------------+
|       bbl|retailarea_diff|
+----------+---------------+
|5024000180|       317994.0|
|1007830070|       216912.0|
|1008080040|       156078.0|
|3006990001|       143900.0|
|1021760017|       141600.0|
+----------+---------------+
only showing top 5 rows



In [19]:
df0.select('bbl', col('garagearea_diff').cast(DoubleType()))\
    .orderBy('garagearea_diff', ascending=False).show(5)

+----------+---------------+
|       bbl|garagearea_diff|
+----------+---------------+
|4009260001|       368000.0|
|2051410440|       311000.0|
|2051350210|       296715.0|
|1009910060|       294656.0|
|1012640005|       255714.0|
+----------+---------------+
only showing top 5 rows



In [20]:
df0.select('bbl', col('factryarea_diff').cast(DoubleType()))\
    .orderBy('factryarea_diff', ascending=False).show(5)

+----------+---------------+
|       bbl|factryarea_diff|
+----------+---------------+
|5017600035|       855000.0|
|2026040252|       213776.0|
|2027810500|       192933.0|
|5070900001|       140830.0|
|3053010001|       124954.0|
+----------+---------------+
only showing top 5 rows



In [21]:
df0.select('bbl', col('otherarea_diff').cast(DoubleType()))\
    .orderBy('otherarea_diff', ascending=False).show(5)

+----------+--------------+
|       bbl|otherarea_diff|
+----------+--------------+
|4070210001|     4009138.0|
|1007559040|     1378125.0|
|1010870005|      749133.0|
|3007150001|      679585.0|
|5001020001|      510819.0|
+----------+--------------+
only showing top 5 rows

