In [1]:
import pandas as pd
import matplotlib.pyplot as plt 
import numpy as np
from pyspark.sql.functions import desc
from pyspark.sql.functions import max
from pyspark.sql.functions import isnan, when, count, col
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import Bucketizer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

### Importing the data to dataframe df from the table created

In [3]:
df = spark.sql("select * from project_data_csv")

### How many null values in Each column

In [5]:
# reference: https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

##### I have same number of null values in every column as you can see, I had '-' in my integer columns in the csv file when I changed the datatype from string to integer in databricks while importing the data to table, for columns with the blanks were replaced with null values in the entire row.

### Calculating the number of records before  dropping the nulls

In [8]:
df.count()

#### Dropping the Null values from the df

In [10]:
df=df.dropna(how="any")

#### Calculating the number of records after dropping the nulls

In [12]:
df.count()

### Displaying sample records of the data

In [14]:
df1=spark.sql("select * from project_data_csv order by sale_price desc limit 10 ")
display(df1)

ID,BOROUGH,NEIGHBORHOOD,BUILDING_CLASS_CATEGORY,TAX_CLASS_AT_PRESENT,BLOCK,LOT,BUILDING_CLASS_AT_PRESENT,ADDRESS,APARTMENT_NUMBER,ZIP_CODE,RESIDENTIAL_UNITS,COMMERCIAL_UNITS,TOTAL_UNITS,LAND_SQUARE_FEET,GROSS_SQUARE_FEET,YEAR_BUILT,TAX_CLASS_AT_TIME_OF_SALE,BUILDING_CLASS_AT_TIME_OF_SALE,SALE_PRICE,SALE_DATE
2563,1,FINANCIAL,21 OFFICE BUILDINGS,4,40,3,O4,60 WALL STREET,,10005,0,1,1,53632,1617206,1987,4,O4,1040000000,1/24/2017 0:00
2561,1,FINANCIAL,21 OFFICE BUILDINGS,4,29,1,O4,85 BROAD STREET,,10004,0,1,1,42762,993569,1983,4,O4,652000000,5/24/2017 0:00
6336,1,KIPS BAY,08 RENTALS - ELEVATOR APARTMENTS,2,934,1,D6,460-520 2ND AVENUE,,10016,894,8,902,141836,829024,1975,2,D6,620000000,12/8/2016 0:00
2054,1,FASHION,21 OFFICE BUILDINGS,4,833,11,O4,1250 BROADWAY,,10001,0,55,55,30750,645977,1969,4,O4,565000000,11/1/2016 0:00
10038,3,DOWNTOWN-FULTON FERRY,29 COMMERCIAL GARAGES,4,54,1,G7,85 JAY STREET,,11201,0,0,0,134988,0,0,4,G7,345000000,12/20/2016 0:00
6321,1,JAVITS CENTER,21 OFFICE BUILDINGS,4,732,36,O3,433-447 NINTH AVENUE,,10001,0,2,2,39779,350000,1962,4,O3,330000000,12/6/2016 0:00
9598,1,MIDTOWN WEST,21 OFFICE BUILDINGS,4,1026,21,O5,1706-1720 BROADWAY,,10019,0,2,2,8848,52150,1918,4,O5,268124175,1/5/2017 0:00
17110,4,LONG ISLAND CITY,27 FACTORIES,4,281,1,F9,31-00 47TH AVENUE,,11101,0,51,51,120000,568000,1922,4,F9,257500000,12/22/2016 0:00
2094,1,FINANCIAL,08 RENTALS - ELEVATOR APARTMENTS,2,27,9,D5,63 WALL STREET,,10005,476,6,482,17623,400531,1929,2,D5,239114603,9/30/2016 0:00
2071,1,FASHION,25 LUXURY HOTELS,4,806,76,H1,371 7 AVENUE,,10001,529,9,538,19011,473391,1929,4,H1,212500000,12/20/2016 0:00


### Count of different types of Neighbourhood

In [16]:
df.select('NEIGHBORHOOD').distinct().count()

### Count of different types of BUILDING_CLASS_CATEGORY

In [18]:
df.select('BUILDING_CLASS_CATEGORY').distinct().count()

### Count of different types of BUILDING_CLASS_AT_TIME_OF_SALE

In [20]:
df.select('BUILDING_CLASS_AT_TIME_OF_SALE').distinct().count()

### We will create a temporary view for running spark sql queries to further analyze the data.

In [22]:
df.createOrReplaceTempView('ny_property_sale')

### Number of Properties in each Borough for Sale

In [24]:
%sql
select Boroughname,count(*) as number_of_properties from
(select case
when BOROUGH == 1 then 'Manhattan'
when BOROUGH == 2 then 'Bronx'
when BOROUGH == 3 then 'Brooklyn'
when BOROUGH == 4 then 'Queens'
when BOROUGH == 5 then 'Staten Island'
end as Boroughname
from ny_property_sale) group by Boroughname

Boroughname,number_of_properties
Queens,11078
Brooklyn,24047
Staten Island,5064
Manhattan,1005
Bronx,7049


### Number of Properties in each neighbourhood

In [26]:
display(spark.sql("select NEIGHBORHOOD,count(ID) as number_of_properties from ny_property_sale group by NEIGHBORHOOD order by number_of_properties desc"))

NEIGHBORHOOD,number_of_properties
BEDFORD STUYVESANT,1436
BOROUGH PARK,1245
SHEEPSHEAD BAY,1013
EAST NEW YORK,982
FLATBUSH-EAST,846
BAY RIDGE,832
FLUSHING-NORTH,798
CROWN HEIGHTS,793
CANARSIE,783
BUSHWICK,769


### Number of Properties in each BUILDING_CLASS_CATEGORY

In [28]:
display(spark.sql("select BUILDING_CLASS_CATEGORY,count(ID) as number_of_properties from ny_property_sale group by BUILDING_CLASS_CATEGORY order by number_of_properties desc"))

BUILDING_CLASS_CATEGORY,number_of_properties
01 ONE FAMILY DWELLINGS,14394
02 TWO FAMILY DWELLINGS,13043
03 THREE FAMILY DWELLINGS,3663
13 CONDOS - ELEVATOR APARTMENTS,3257
10 COOPS - ELEVATOR APARTMENTS,3178
07 RENTALS - WALKUP APARTMENTS,2784
15 CONDOS - 2-10 UNIT RESIDENTIAL,956
44 CONDO PARKING,783
04 TAX CLASS 1 CONDOS,739
22 STORE BUILDINGS,723


### Number of Properties in each BUILDING_CLASS_AT_TIME_OF_SALE

In [30]:
display(spark.sql("select BUILDING_CLASS_AT_TIME_OF_SALE,count(ID) as number_of_properties from ny_property_sale group by BUILDING_CLASS_AT_TIME_OF_SALE order by number_of_properties desc"))

BUILDING_CLASS_AT_TIME_OF_SALE,number_of_properties
A1,5247
A5,4600
B1,4059
B2,3816
C0,3663
R4,3257
D4,3175
B3,3116
A2,2098
B9,1369


### Checking the expensive building of the sales and it's details

In [32]:
spark.sql("select sale_price, borough, neighborhood from project_data_csv order by sale_price desc").show()

### Number of properties in each Neighbourhood grouped by Borough

In [34]:
A2=df.select("NEIGHBORHOOD","BOROUGH").groupby("NEIGHBORHOOD","BOROUGH").count().sort(desc("count")).show(1000,truncate = False)

### Number of properties by each TAX_CLASS_AT_PRESENT

In [36]:
# Most of the sales belong to Class 1 which Includes most residential property of up to three units (such as one-,two-, and three-family homes and small stores or offices with one or two attached apartments), vacant land that is zoned for residential use, and most condominiums that are not more than three stories. 
A2=df.select("TAX_CLASS_AT_PRESENT").groupby("TAX_CLASS_AT_PRESENT").count().sort(desc("count")).show(1000,truncate = False)

### Statistics of each variable

In [38]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
ID,48243,10449.002653234666,7574.482564925197,4,26702
BOROUGH,48243,3.25178782414029,0.9030869090762457,1,5
NEIGHBORHOOD,48243,,,AIRPORT LA GUARDIA,WYCKOFF HEIGHTS
BUILDING_CLASS_CATEGORY,48243,,,01 ONE FAMILY DWELLINGS,49 CONDO WAREHOUSES/FACTORY/INDUS
TAX_CLASS_AT_PRESENT,48243,1.4326673069265186,0.8576226240753219,,4
BLOCK,48243,4984.608544244761,3427.41258158423,1,16319
LOT,48243,234.32949858010488,496.36552869491544,1,9009
BUILDING_CLASS_AT_PRESENT,48243,,,,Z9
ADDRESS,48243,,,1 12TH ST EXTENSION,ZEREGA AVENUE


### Correlation Matrix

In [40]:
NEIGHBORHOOD_indexer_corr = StringIndexer(inputCol='NEIGHBORHOOD',outputCol='NEIGHBORHOOD_index_corr',handleInvalid='keep')
BUILDING_CLASS_CATEGORY_indexer_corr = StringIndexer(inputCol='BUILDING_CLASS_CATEGORY',outputCol='BUILDING_CLASS_CATEGORY_index_corr',handleInvalid='keep')


# Create a vector assembler with all input features and indexed features
assembler_corr = VectorAssembler(inputCols=['BOROUGH','NEIGHBORHOOD_index_corr','BUILDING_CLASS_CATEGORY_index_corr','RESIDENTIAL_UNITS','COMMERCIAL_UNITS','TOTAL_UNITS','LAND_SQUARE_FEET','GROSS_SQUARE_FEET','YEAR_BUILT','TAX_CLASS_AT_TIME_OF_SALE','SALE_PRICE'],
                            outputCol="features")

# create a pipeline with stages of transformation
pipe = Pipeline(stages=[NEIGHBORHOOD_indexer_corr,BUILDING_CLASS_CATEGORY_indexer_corr,assembler_corr])


# fit the pipeline to our spark dataframe df
fit_pipe_corr = pipe.fit(df)


# create a transformed data frame 
df_correlated=fit_pipe_corr.transform(df)
# Getting the correlation dense matrix
corr = Correlation.corr(df_correlated, "features").collect()[0][0]
# Coverting it to a list so that it can plotted
corrmatrix = corr.toArray().tolist() 
# defining the features data columns since if convert the corr matrix to list we will loose column headers
data_columns=['BOROUGH','NEIGHBORHOOD_index_corr','BUILDING_CLASS_CATEGORY_index_corr','RESIDENTIAL_UNITS','COMMERCIAL_UNITS','TOTAL_UNITS','LAND_SQUARE_FEET','GROSS_SQUARE_FEET','YEAR_BUILT','TAX_CLASS_AT_TIME_OF_SALE','SALE_PRICE']
# We create a dataframe with correlated values list between features and features headers
corrdf=spark.createDataFrame(corrmatrix,data_columns)

print(corrdf)

# We will plot the correlation matrix with matplot matrix show.
# reference: https://stackoverflow.com/questions/55546467/how-to-plot-correlation-heatmap-when-using-pysparkdatabricks
fig, ax = plt.subplots()
# Creating a matrix plot with correlation matrix with max value 1 and min value -1
corr_ax=ax.matshow(corrmatrix,vmax=1,vmin=-1)
# Setting title xlabel , ylabel, xticks and yticks
ax.set_title("Correlation Matrix")
ax.set_xticks(np.arange(len(data_columns)))
ax.set_yticks(np.arange(len(data_columns)))
ax.set_xticklabels(data_columns)
ax.set_yticklabels(data_columns)
plt.gca().xaxis.tick_bottom()
plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
plt.tick_params(axis="both", which="both" )
fig.colorbar(corr_ax)
# Adding correlation value  using every element of corr matrix on the matrix plot 
for i in range(len(corrmatrix)):
  for j in range(len(corrmatrix[0])):
          text = ax.text(j, i, "%.2f" % corrmatrix[i][j],
                         ha="center", va="center", color="white", fontsize="x-small")
fig.tight_layout()
display(fig)

### Visualization of Data

In [42]:
from pyspark.sql.types import *
def valueToCategory1(value):

   if   value == 1: return 'Manhattan'
   elif value == 2: return 'Bronx'
   elif value == 3: return 'Brooklyn'
   elif value == 4: return 'Queens'
   elif value == 5: return 'Staten Island'
   else: return 'n/a'
  

def valueToCategory2(value1):

   if   value1 == 1: return 'ResidentialProperty-3'
   elif value1 == 2: return 'PrimarilyResidential'
   elif value1 == 3: return 'OwnedByOthers'
   elif value1 == 4: return 'Companies'
   else: return 'n/a'

udfValueToCategory1 = udf(valueToCategory1, StringType())
udfValueToCategory2 = udf(valueToCategory2, StringType())


df1 = df.withColumn("Boroughname", udfValueToCategory1("BOROUGH")).withColumn("Tax_Class_Name", udfValueToCategory2("TAX_CLASS_AT_TIME_OF_SALE"))


df1.show()

In [43]:
df1.createOrReplaceTempView('NewYork_property_sale')


In [44]:
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

In [45]:
# Convert the spark dataframe to pandas dataframe for better visualization
pd_df1=df1.toPandas()
print(pd_df1.columns)


### We will see what are top ten Building Class with highest property count for Tax_Class = Company

In [47]:
df1_agg = spark.sql("select count(ID) as number_of_properties,BUILDING_CLASS_AT_TIME_OF_SALE, Tax_Class_Name from NewYork_property_sale  group by BUILDING_CLASS_AT_TIME_OF_SALE,Tax_Class_Name order by number_of_properties ")
df1_agg=df1_agg.filter(df1_agg.Tax_Class_Name=='Companies')
df1_agg=df1_agg.sort(desc('number_of_properties')).limit(10)
pd_df1 = df1_agg.toPandas()  
g=sns.barplot(x='BUILDING_CLASS_AT_TIME_OF_SALE',y='number_of_properties',data=pd_df1)
g.set_xticklabels(g.get_xticklabels(), rotation=40,ha="right")
g.set_title('Companies')
plt.tight_layout()
plt.show()

### We will see what are top ten Building Class with highest property count for Tax_Class = ResidentialProperty-3

In [49]:
df1_agg = spark.sql("select count(ID) as number_of_properties,BUILDING_CLASS_AT_TIME_OF_SALE, Tax_Class_Name from NewYork_property_sale  group by BUILDING_CLASS_AT_TIME_OF_SALE,Tax_Class_Name order by number_of_properties ")
df1_agg=df1_agg.filter(df1_agg.Tax_Class_Name=='ResidentialProperty-3')
df1_agg=df1_agg.sort(desc('number_of_properties')).limit(10)
pd_df1 = df1_agg.toPandas()  
g=sns.barplot(x='BUILDING_CLASS_AT_TIME_OF_SALE',y='number_of_properties',data=pd_df1)
g.set_xticklabels(g.get_xticklabels(), rotation=40,ha="right")
g.set_title('ResidentialProperty-3')
plt.tight_layout()
plt.show()

### We will see what are top ten Building Class with highest property count for Tax_Class = PrimarilyResidential

In [51]:
df1_agg = spark.sql("select count(ID) as number_of_properties,BUILDING_CLASS_AT_TIME_OF_SALE, Tax_Class_Name from NewYork_property_sale  group by BUILDING_CLASS_AT_TIME_OF_SALE,Tax_Class_Name order by number_of_properties ")
df1_agg=df1_agg.filter(df1_agg.Tax_Class_Name=='PrimarilyResidential')
df1_agg=df1_agg.sort(desc('number_of_properties')).limit(10)
pd_df1 = df1_agg.toPandas()  
g=sns.barplot(x='BUILDING_CLASS_AT_TIME_OF_SALE',y='number_of_properties',data=pd_df1)
g.set_xticklabels(g.get_xticklabels(), rotation=40,ha="right")
g.set_title('PrimarilyResidential')
plt.tight_layout()
plt.show()

### We will see what are top ten Building Class with highest property count for Boroughname = Manhattan

In [53]:
df1_agg = spark.sql("select count(id) as number_of_properties,BUILDING_CLASS_AT_TIME_OF_SALE,Boroughname from NewYork_property_sale   group by BUILDING_CLASS_AT_TIME_OF_SALE,Boroughname order by number_of_properties")
df1_agg=df1_agg.filter(df1_agg.Boroughname=='Manhattan')
df1_agg=df1_agg.sort(desc('number_of_properties')).limit(10)
pd_df1 = df1_agg.toPandas()  
g=sns.barplot(x='BUILDING_CLASS_AT_TIME_OF_SALE',y='number_of_properties',data=pd_df1)
g.set_xticklabels(g.get_xticklabels(), rotation=40,ha="right")
g.set_title('Manhattan')
plt.tight_layout()
plt.show()

### We will see what are top ten Building Class with highest property count for Boroughname = Bronx

In [55]:
df1_agg = spark.sql("select count(id) as number_of_properties,BUILDING_CLASS_AT_TIME_OF_SALE,Boroughname from NewYork_property_sale   group by BUILDING_CLASS_AT_TIME_OF_SALE,Boroughname order by number_of_properties")
df1_agg=df1_agg.filter(df1_agg.Boroughname=='Bronx')
df1_agg=df1_agg.sort(desc('number_of_properties')).limit(10)
pd_df1 = df1_agg.toPandas()  
g=sns.barplot(x='BUILDING_CLASS_AT_TIME_OF_SALE',y='number_of_properties',data=pd_df1)
g.set_xticklabels(g.get_xticklabels(), rotation=40,ha="right")
g.set_title('Bronx')
plt.tight_layout()
plt.show()

### We will see what are top ten Building Class with highest property count for Boroughname = Brooklyn

In [57]:
df1_agg = spark.sql("select count(id) as number_of_properties,BUILDING_CLASS_AT_TIME_OF_SALE,Boroughname from NewYork_property_sale   group by BUILDING_CLASS_AT_TIME_OF_SALE,Boroughname order by number_of_properties")
df1_agg=df1_agg.filter(df1_agg.Boroughname=='Brooklyn')
df1_agg=df1_agg.sort(desc('number_of_properties')).limit(10)
pd_df1 = df1_agg.toPandas()  
g=sns.barplot(x='BUILDING_CLASS_AT_TIME_OF_SALE',y='number_of_properties',data=pd_df1)
g.set_xticklabels(g.get_xticklabels(), rotation=40,ha="right")
g.set_title('Brooklyn')
plt.tight_layout()
plt.show()

### We will see what are top ten Building Class with highest property count for Boroughname = Queens

In [59]:
df1_agg = spark.sql("select count(id) as number_of_properties,BUILDING_CLASS_AT_TIME_OF_SALE,Boroughname from NewYork_property_sale   group by BUILDING_CLASS_AT_TIME_OF_SALE,Boroughname order by number_of_properties")
df1_agg=df1_agg.filter(df1_agg.Boroughname=='Queens')
df1_agg=df1_agg.sort(desc('number_of_properties')).limit(10)
pd_df1 = df1_agg.toPandas()  
g=sns.barplot(x='BUILDING_CLASS_AT_TIME_OF_SALE',y='number_of_properties',data=pd_df1)
g.set_xticklabels(g.get_xticklabels(), rotation=40,ha="right")
g.set_title('Queens')
plt.tight_layout()
plt.show()

### We will see what are top ten Building Class with highest property count for Boroughname = Staten Island

In [61]:
df1_agg = spark.sql("select count(id) as number_of_properties,BUILDING_CLASS_AT_TIME_OF_SALE,Boroughname from NewYork_property_sale   group by BUILDING_CLASS_AT_TIME_OF_SALE,Boroughname order by number_of_properties")
df1_agg=df1_agg.filter(df1_agg.Boroughname=='Staten Island')
df1_agg=df1_agg.sort(desc('number_of_properties')).limit(10)
pd_df1 = df1_agg.toPandas()  
g=sns.barplot(x='BUILDING_CLASS_AT_TIME_OF_SALE',y='number_of_properties',data=pd_df1)
g.set_xticklabels(g.get_xticklabels(), rotation=40,ha="right")
g.set_title('Staten Island')
plt.tight_layout()
plt.show()

### Distrbution of Sale Price

In [63]:
fig =plt.subplots(figsize=(8,4))
ax=sns.boxplot(pd_df1['SALE_PRICE'])
display(ax)

### Linear Plot of Gross_Sqaure Feet and Land_Square Feet on Sales Price.

In [65]:
fig1, ax =plt.subplots(figsize=(8,4))
ax=sns.scatterplot(pd_df1['GROSS_SQUARE_FEET'],pd_df1['SALE_PRICE'])
plt.tight_layout()
display(fig1)

fig, ax =plt.subplots(figsize=(8,4))
ax=sns.scatterplot(pd_df1['LAND_SQUARE_FEET'],pd_df1['SALE_PRICE'])
plt.tight_layout()
display(fig)

### Linearity Plot of Residential_Units and Commercial_Units on Sales Price.

In [67]:
fig1, ax =plt.subplots(figsize=(8,4))
ax=sns.scatterplot(pd_df1['RESIDENTIAL_UNITS'],pd_df1['SALE_PRICE'])
plt.tight_layout()
display(fig1)

fig, ax =plt.subplots(figsize=(8,4))
ax=sns.scatterplot(pd_df1['COMMERCIAL_UNITS'],pd_df1['SALE_PRICE'])
plt.tight_layout()
display(fig)

### Checking the range of Sale Price by creating buckets

In [69]:
#Referencfe : stack overflow

dffilter=df.select('sale_price')
splits = [0, 80000.0, 8500000.0,15000000,1000000000.0,float("inf")]
bucketizer = Bucketizer(splits=splits, inputCol="sale_price", outputCol="bucketedFeatures")
bucketedData = bucketizer.transform(dffilter)
bucketedData.groupBy("bucketedFeatures").count().show()

### Linear Regression Including all Variables

In [71]:
df = spark.sql("select * from project_data_csv")

In [72]:
df.count()

In [73]:
df = df.dropna(how="any")

In [74]:
df.count()

### Building the Linear Regression Model

In [76]:

train_data_lr,test_data_lr=df.randomSplit([0.7,0.3],seed=12345)


neighbourhood_indexer_lr = StringIndexer(inputCol='NEIGHBORHOOD',outputCol='neighbourhood_index_lr',handleInvalid='keep')
BUILDING_CLASS_CATEGORY_Indexer_lr = StringIndexer(inputCol='BUILDING_CLASS_CATEGORY',outputCol='BUILDING_CLASS_CATEGORY_Index_lr',handleInvalid='keep')
BUILDING_CLASS_AT_TIME_OF_SALE_indexer_lr = StringIndexer(inputCol='BUILDING_CLASS_AT_TIME_OF_SALE',outputCol='BUILDING_CLASS_index_lr',handleInvalid='keep')



# Vector assembler is used to create a vector of input features

assembler_lr=VectorAssembler(inputCols=['BOROUGH','ZIP_CODE','RESIDENTIAL_UNITS','COMMERCIAL_UNITS','LAND_SQUARE_FEET','GROSS_SQUARE_FEET','YEAR_BUILT','TAX_CLASS_AT_TIME_OF_SALE',
                                     'neighbourhood_index_lr',  'BUILDING_CLASS_index_lr','BUILDING_CLASS_CATEGORY_Index_lr'],
                            outputCol="features")

# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data

pipe = Pipeline(stages=[neighbourhood_indexer_lr,BUILDING_CLASS_CATEGORY_Indexer_lr,BUILDING_CLASS_AT_TIME_OF_SALE_indexer_lr,assembler_lr])

fitted_pipe_lr=pipe.fit(train_data_lr)

train_data_lr=fitted_pipe_lr.transform(train_data_lr)

# Create an object for the Linear Regression model

lr_model = LinearRegression(labelCol='SALE_PRICE')

# Fit the model on the train data

fit_model_lr = lr_model.fit(train_data_lr.select(['features','SALE_PRICE']))

# Transform the test data using the model to predict the duration

test_data_lr=fitted_pipe_lr.transform(test_data_lr)

# Store the results in a dataframe

results_lr = fit_model_lr.transform(test_data_lr)

results_lr.select(['sale_price','prediction']).show()

In [77]:
test_results_lr = fit_model_lr.evaluate(test_data_lr)

In [78]:
test_results_lr.rootMeanSquaredError

In [79]:
test_results_lr.residuals.show()

In [80]:
test_results.r2

In [81]:
print("Coefficients: %s" % str(fit_model_lr.coefficients))
print("Intercept: %s" % str(fit_model_lr.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary_lr = fit_model_lr.summary
print("numIterations: %d" % trainingSummary_lr.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary_lr.objectiveHistory))
#trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary_lr.rootMeanSquaredError)
print("r2: %f" % trainingSummary_lr.r2)

In [82]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

pd_df_lr = results_lr.toPandas()
fig, ax=plt.subplots(1,1,figsize=(10,6))
ax=sns.regplot(pd_df_lr['prediction'],pd_df_lr['SALE_PRICE'])
display(plt.show())

#display(pd_df_lr)

### Function to know the bounds for each column based on inter quartile range ( Outliers detection)

In [84]:
# Reference
#https://blog.zhaytam.com/2019/07/15/outliers-detection-in-pyspark-2-interquartile-range/


def calculate_bounds(df):
  bounds = {
    c: dict(
      zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
        )
    for c,d in zip(df.columns, df.dtypes) if d[1] == "int"
    }
  for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['min'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['max'] = bounds[c]['q3'] + (iqr * 1.5)

    return bounds

### Calculating Bounds using created function

In [86]:
calculate_bounds(df)

In [87]:
df1=df.filter(df.SALE_PRICE.between('80000','50000000'))

In [88]:
df1.count()

### LinearModel 2 with filtered rows as stated above and omitted few columns as per the Correlation matrix

In [90]:
train_data_lr2,test_data_lr2=df1.randomSplit([0.8,0.2],seed=59688144)


neighbourhood_indexer_lr2 = StringIndexer(inputCol='NEIGHBORHOOD',outputCol='neighbourhood_index_lr2',handleInvalid='keep')
BUILDING_CLASS_CATEGORY_Indexer_lr2 = StringIndexer(inputCol='BUILDING_CLASS_CATEGORY',outputCol='BUILDING_CLASS_CATEGORY_Index_lr2',handleInvalid='keep')
BUILDING_CLASS_AT_TIME_OF_SALE_indexer_lr2 = StringIndexer(inputCol='BUILDING_CLASS_AT_TIME_OF_SALE',outputCol='BUILDING_CLASS_index_lr2',handleInvalid='keep')



# Vector assembler is used to create a vector of input features

assembler_lr2=VectorAssembler(inputCols=['BOROUGH','RESIDENTIAL_UNITS','COMMERCIAL_UNITS','BUILDING_CLASS_index_lr2',
                                        'LAND_SQUARE_FEET','TAX_CLASS_AT_TIME_OF_SALE',
                                     'neighbourhood_index_lr2',],
                            outputCol="features")

# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data

pipe_lr2 = Pipeline(stages=[neighbourhood_indexer_lr2,BUILDING_CLASS_CATEGORY_Indexer_lr2,BUILDING_CLASS_AT_TIME_OF_SALE_indexer_lr2,assembler_lr2])

fitted_pipe_lr2=pipe_lr2.fit(train_data_lr2)

train_data_lr2=fitted_pipe_lr2.transform(train_data_lr2)

# Create an object for the Linear Regression model

lr_model_lr2 = LinearRegression(labelCol='sale_price')

# Fit the model on the train data

fit_model_lr2 = lr_model_lr2.fit(train_data_lr2.select(['features','sale_price']))

# Transform the test data using the model to predict the duration

test_data_lr2=fitted_pipe_lr2.transform(test_data_lr2)

# Store the results in a dataframe

results_lr2 = fit_model_lr2.transform(test_data_lr2)

results_lr2.select(['sale_price','prediction']).show()

In [91]:
test_results_lr2 = fit_model_lr2.evaluate(test_data_lr2)

In [92]:
test_results_lr2.rootMeanSquaredError

In [93]:
test_results_lr2.r2 

In [94]:
pd_df_lr2 = results_lr2.toPandas()
fig, ax=plt.subplots(figsize=(10,6))
ax=sns.regplot(pd_df_lr2['prediction'],pd_df_lr2['SALE_PRICE'])
display(plt.show())

### Decision Tree Model (Basic)

In [96]:
df = spark.sql("select * from project_data_csv")

In [97]:
df = df.dropna(how="any")

In [98]:
df.count()

In [99]:
from pyspark.ml.regression import DecisionTreeRegressor

train_data_dt,test_data_dt=df.randomSplit([0.7,0.3],seed = 50945803)
neighbourhood_indexer_dt = StringIndexer(inputCol='NEIGHBORHOOD',outputCol='neighbourhood_index_dt',handleInvalid='keep')
BUILDING_CLASS_CATEGORY_Indexer_dt = StringIndexer(inputCol='BUILDING_CLASS_CATEGORY',outputCol='BUILDING_CLASS_CATEGORY_Index_dt',handleInvalid='keep')
BUILDING_CLASS_AT_TIME_OF_SALE_indexer_dt = StringIndexer(inputCol='BUILDING_CLASS_AT_TIME_OF_SALE',outputCol='BUILDING_CLASS_index_dt',handleInvalid='keep')

assembler_dt=VectorAssembler(inputCols=['BOROUGH','ZIP_CODE','RESIDENTIAL_UNITS','COMMERCIAL_UNITS','LAND_SQUARE_FEET','GROSS_SQUARE_FEET','YEAR_BUILT','TAX_CLASS_AT_TIME_OF_SALE',
                                     'neighbourhood_index_dt', 'BUILDING_CLASS_index_dt','BUILDING_CLASS_CATEGORY_Index_dt'],
                            outputCol="features")


dt_model = DecisionTreeRegressor(labelCol='SALE_PRICE',maxBins=5000)
pipe_dt = Pipeline(stages=[neighbourhood_indexer_dt,BUILDING_CLASS_CATEGORY_Indexer_dt,BUILDING_CLASS_AT_TIME_OF_SALE_indexer_dt,assembler_dt,dt_model])
fit_model_dt=pipe_dt.fit(train_data_dt)
results_dt = fit_model_dt.transform(test_data_dt)
results_dt.select(['sale_price','Prediction']).show()

In [100]:
from pyspark.ml.evaluation import RegressionEvaluator
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="SALE_PRICE", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(results_dt)

print (rmse)

In [101]:
evaluator1 = RegressionEvaluator(labelCol="SALE_PRICE", predictionCol="prediction", metricName="r2")
accuracy = evaluator1.evaluate(results_dt)
print(accuracy)

### Decision Tree Model 2 filtered Sale Price and omitted columns based on Analysis

In [103]:
df1=df.filter(df.SALE_PRICE.between('80000','50000000'))

In [104]:
# Create a 70-30 train test split

train_data_dt2,test_data_dt2=df1.randomSplit([0.8,0.2],seed = 50945803)


neighbourhood_indexer_dt2 = StringIndexer(inputCol='NEIGHBORHOOD',outputCol='neighbourhood_index_dt2',handleInvalid='keep')
BUILDING_CLASS_CATEGORY_Indexer_dt2 = StringIndexer(inputCol='BUILDING_CLASS_CATEGORY',outputCol='BUILDING_CLASS_CATEGORY_Index_dt2',handleInvalid='keep')
BUILDING_CLASS_AT_TIME_OF_SALE_indexer_dt2 = StringIndexer(inputCol='BUILDING_CLASS_AT_TIME_OF_SALE',outputCol='BUILDING_CLASS_index_dt2',handleInvalid='keep')

assembler_dt2=VectorAssembler(inputCols=['BOROUGH','RESIDENTIAL_UNITS','COMMERCIAL_UNITS','LAND_SQUARE_FEET','GROSS_SQUARE_FEET','TAX_CLASS_AT_TIME_OF_SALE',
                                     'neighbourhood_index_dt2', 'BUILDING_CLASS_CATEGORY_Index_dt2'],
                            outputCol="features")

dt_model_2 = DecisionTreeRegressor(labelCol='SALE_PRICE',maxBins=15000)

# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data

pipe_dt2 = Pipeline(stages=[neighbourhood_indexer_dt2,BUILDING_CLASS_CATEGORY_Indexer_dt2,BUILDING_CLASS_AT_TIME_OF_SALE_indexer_dt2,assembler_dt2,dt_model_2])

fit_model_dt2=pipe_dt2.fit(train_data_dt2)

results_dt2 = fit_model_dt2.transform(test_data_dt2)

results_dt2.select(['sale_price','Prediction']).show()

In [105]:
# Select (prediction, true label) and compute test error
evaluator2 = RegressionEvaluator(labelCol="SALE_PRICE", predictionCol="prediction", metricName="rmse")
rmse = evaluator2.evaluate(results_dt2)
print (rmse)

In [106]:
evaluator3 = RegressionEvaluator(labelCol="SALE_PRICE", predictionCol="prediction", metricName="r2")
accuracy1 = evaluator3.evaluate(results_dt2)
print(accuracy1)

### K Means Clustering

In [108]:
data=df.select(['BOROUGH','NEIGHBORHOOD','BUILDING_CLASS_CATEGORY','TAX_CLASS_AT_PRESENT','ZIP_CODE','RESIDENTIAL_UNITS','COMMERCIAL_UNITS',
                                 'TOTAL_UNITS','LAND_SQUARE_FEET','GROSS_SQUARE_FEET','YEAR_BUILT','TAX_CLASS_AT_TIME_OF_SALE','BUILDING_CLASS_AT_TIME_OF_SALE','SALE_PRICE'])

In [109]:
data=data.dropna()

In [110]:
data.count()

In [111]:
from pyspark.ml.clustering import KMeans

In [112]:
BOROUGH_KM_indexer = StringIndexer(inputCol='BOROUGH',outputCol='BOROUGH_KM_index',handleInvalid='keep')
NEIGHBORHOOD_KM_indexer = StringIndexer(inputCol='NEIGHBORHOOD',outputCol='NEIGHBORHOOD_KM_index',handleInvalid='keep')
ZIP_CODE_KM_indexer = StringIndexer(inputCol='ZIP_CODE',outputCol='ZIP_CODE_KM_index',handleInvalid='keep')
SALE_PRICE_KM_indexer = StringIndexer(inputCol='SALE_PRICE',outputCol='SALE_PRICE_KM_index',handleInvalid='keep')


# Vector assembler is used to create a vector of input features

assembler_km = VectorAssembler(inputCols=['BOROUGH_KM_index','NEIGHBORHOOD_KM_index','ZIP_CODE_KM_index'],outputCol="features")



# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data.

pipe_km = Pipeline(stages=[BOROUGH_KM_indexer,NEIGHBORHOOD_KM_indexer,ZIP_CODE_KM_indexer,SALE_PRICE_KM_indexer,assembler_km])


final_data_km=pipe_km.fit(data).transform(data)


kmeans_model = KMeans(k=4)


fit_model_km = kmeans_model.fit(final_data_km)


wssse = fit_model_km.computeCost(final_data_km)
print("The within set sum of squared error of the mode is {}".format(wssse))


centers = fit_model_km.clusterCenters()


print("Cluster Centers")
index=1
for cluster in centers:
    print("Centroid {}: {}".format(index,cluster))
    index+=1
    
    
    
results_km = fit_model_km.transform(final_data_km)


results_km.select(['BOROUGH_KM_index','NEIGHBORHOOD_KM_index','ZIP_CODE_KM_index','prediction']).show()


results_km.groupby('prediction').count().sort('prediction').show()

### Random Forest Regression Model

In [114]:
df = spark.sql("select * from project_data_csv")

In [115]:
df = df.dropna(how="any")
df.count()

In [116]:
df=df.filter(df.SALE_PRICE.between('80000','50000000'))

In [117]:
train_data_rf,test_data_rf=df.randomSplit([0.7,0.3],seed=12345)


neighbourhood_indexer_rf= StringIndexer(inputCol='NEIGHBORHOOD',outputCol='neighbourhood_index_rf',handleInvalid='keep')
BUILDING_CLASS_CATEGORY_Indexer_rf = StringIndexer(inputCol='BUILDING_CLASS_CATEGORY',outputCol='BUILDING_CLASS_CATEGORY_Index_rf',handleInvalid='keep')
BUILDING_CLASS_AT_TIME_OF_SALE_indexer_rf = StringIndexer(inputCol='BUILDING_CLASS_AT_TIME_OF_SALE',outputCol='BUILDING_CLASS_index_rf',handleInvalid='keep')



# Vector assembler is used to create a vector of input features

assembler_rf=VectorAssembler(inputCols=['BOROUGH','ZIP_CODE','RESIDENTIAL_UNITS','COMMERCIAL_UNITS','LAND_SQUARE_FEET','GROSS_SQUARE_FEET','YEAR_BUILT','TAX_CLASS_AT_TIME_OF_SALE',
                                        'neighbourhood_index_rf'
                                     ],
                            outputCol="features")

# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data

pipe_rf = Pipeline(stages=[neighbourhood_indexer_rf,BUILDING_CLASS_CATEGORY_Indexer_rf,BUILDING_CLASS_AT_TIME_OF_SALE_indexer_rf,assembler_rf])

fitted_pipe_rf=pipe_rf.fit(train_data_rf)

train_data_rf=fitted_pipe_rf.transform(train_data_rf)

# Create an object for the Linear Regression model



rf_model=RandomForestRegressor(featuresCol="features",labelCol='sale_price',maxBins=300)

# Fit the model on the train data

fit_model_rf= rf_model.fit(train_data_rf.select(['features','sale_price']))

# Transform the test data using the model to predict the duration

test_data_rf=fitted_pipe_rf.transform(test_data_rf)

# Store the results in a dataframe

results_rf = fit_model_rf.transform(test_data_rf)

results_rf.select(['sale_price','prediction']).show()

In [118]:
evaluate_rf = RegressionEvaluator(
    labelCol="SALE_PRICE", predictionCol="prediction", metricName="r2")
r2 = evaluate_rf.evaluate(results_rf)
print("R Squared (R2) on test data = %g" % r2)

In [119]:
pd_df_lr = results_rf.toPandas()
fig, ax=plt.subplots(figsize=(10,6))
ax=sns.regplot(pd_df_lr['prediction'],pd_df_lr['SALE_PRICE'])
display(plt.show())


### Conclusions
1) The correlation matrix suggests the maximum correlation for sale price is with gross square feet column which is around 0.42.

2) To predict the price more accurately, we would need more features that have more influence on the price and are critical to price of the listings.

3) The New York sales dataset analysis provided us with finding interesting relations between various features which influence the buying of a property.

4) The sales feature having the most correlation with gross square feet makes sense because the sales will depend on the area of the flat.

5) As from the models We can observe that the R2 for linear regression models is approximately 12 and 28 which seem to be considerably less which concludes to a point that the performance is not so good

6) Coming to the Random forest regression model , We can observe that the R2 value is around 51 which brings us to a conclusion that this model is the best fit for this data set

7) The accuracy for the decision tree is 32 which is considerably a good model

### Summary
• There are 5 distinct boroughs in New York city

• There is a total of 251 neighborhoods in which the properties are listed.

• There are 47 different types of “Building Class Category”

• There are 160 different types of “Building class at the time of sale”

• We can observe that among all the boroughs “Brooklyn” has the highest number of properties with a count of 24047 and is the most valued borough

• “Bedford Stuyvesant” is the neighborhood with highest number of properties among the neighborhoods with 1436 properties which confirms it to be the most popular neighborhood

• There are 14394 properties in the “Building class category” of ‘One Family Dwellings’

• There are 5247 properties in the” Building class at the time of sale ” of “A”

• For the building class at the time of sales “RG” has the highest number of properties with a count close to 600

• For the Residential Property 3 there are about 800 properties in Bedford Stuyvesant neighborhood

• Riverdale is the neighborhood with highest number of primary residential properties with about 520

• In the Manhattan borough Harlem-central is the neighborhood with highest number of properties

• In the Bronx borough Riverdale is the neighborhood with highest number of properties

• In the Brooklyn borough Bedford Stuyvesant is the neighborhood with highest number of properties

• In the Queens borough Flushing-North is the neighborhood with highest number of properties

• In the borough of Staten Island Great Kills is the neighborhood with highest number of properties

• There are more number of residential units in the range of 0-250

• There are more number of commercial units in the initial price range though with few outliers