In [None]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
import pyreadstat  ##conda install conda-forge::pyreadstat
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
#----------------------------------
# Mass appraisal functions
#----------------------------------

def weightedMean(pred, sp):
    '''
    Returns the weighted mean ratio
    
        Parameters:
            pred (pandas.Series): Series of predicted values
            sp   (pandas.Series): Series of sale prices
            
        Returns:
            weighted mean (numpy.float64): Weighted mean ratio
            
    '''
    return pred.sum() / sp.sum()

def averageDeviation(pred, sp):
    '''
    Returns the average deviation
    
        Parameters:
            pred (pandas.Series): Series of predicted values
            sp   (pandas.Series): Series of sale prices
            
        Returns:
            average deviation (numpy.float64): Average difference between each value
            
    '''
    medianRatio = (pred / sp).median()
    return ((pred / sp) - medianRatio).abs().sum() / len(sp) 

def COD(pred, sp):
    '''
    Returns the coefficient of dispersion
    
        Parameters:
            pred (pandas.Series): Series of predicted values
            sp   (pandas.Series): Series of sale prices
            
        Returns:
            coefficient of dispersion (numpy.float64): Average deviation as a percentage
            
    '''
    medianRatio = (pred / sp).median()
    return (100.00 * averageDeviation(pred, sp)) / medianRatio

def PRD(pred, sp):
    '''
    Returns the price related differential
    
        Parameters:
            pred (pandas.Series): Series of predicted values
            sp   (pandas.Series): Series of sale prices
            
        Returns:
            price related differential (numpy.float64): Statistic for measuring assessment regressivity
            
    '''
    meanRatio = (pred / sp).mean()
    return meanRatio / weightedMean(pred, sp)

def PRB(pred, sp, showGraph=False):
    '''
    Returns the price related bias
    
        Parameters:
            pred (pandas.Series): Series of predicted values
            sp   (pandas.Series): Series of sale prices
            
        Returns:
            price related bias results (dict): Dictionary containing the PRB statistic and it's significance
            
    '''
    RATIO = pred / sp
    medianRatio = (RATIO).median()
    VALUE = (0.50 * sp) + (0.50 * pred / pred.median())
    LN_VALUE = np.log(VALUE) / np.log(2)
    PCT_DIFF = (RATIO - medianRatio) / medianRatio
    modelData = sm.add_constant(LN_VALUE)
    model = sm.OLS(PCT_DIFF, modelData).fit()
    if showGraph:
        p = sns.lmplot(x='LN_VALUE', y='PCT_DIFF', data = pd.DataFrame.from_dict({"LN_VALUE" : LN_VALUE, "PCT_DIFF" : PCT_DIFF}), lowess = True, line_kws={'color': 'red'})
        p.fig.set_figwidth(10)
        p.fig.set_figheight(6)
        p.ax.ticklabel_format(useOffset=False)
    return {"PRB" : model.params[0], "Sig" : model.pvalues[0]}


In [None]:
def addContinuous(df, series, columnName):
    if (series.eq(0).any()):
        s = pd.Series(np.log(series + 1), name = columnName)
    else:
        s = pd.Series(np.log(series), name = columnName)
    return pd.concat([df, s], axis = 1)

def addBinaries(df, categorical, base, prefix, minSales = 0):
    cols = pd.get_dummies(categorical, prefix = prefix).astype(float)
    cols.drop(base, axis = 1, inplace = True)
    for x in cols:
        if len(cols.loc[cols[x] == 1, x]) < minSales:
            print("Insufficent Sales: " + x)
            cols.drop(x, axis = 1, inplace = True)
           
    return pd.concat([df, cols], axis = 1)

# This function dynamically builds a prediction statement
# from the model parameters.  A const param entry is expected.
def estimateParcelValue(predData, modelParams):
    s = "lambda x: " + str(modelParams['const'])
    
    for i,c in modelParams.drop('const').items():
        s += " + (" + str(c) + " * x['" + str(i) + "'])" 
    
    f = eval(s)
    return np.exp(predData.apply(f, axis = 1)).round(0)

# Calculates model coverage by parameters
# with optional grouping
def getParameterCoverage(df):
    results = pd.Series(name = 'ParameterCoverage')
    
    for col in df.columns:
        if len(df[col].value_counts()) == 2:
            #It's a Binary
            results[col] = len(df[df[col] == 1])                
        else:
            #It's Continuous
            results[col] = len(df[df[col] != 0])
                
    return results

In [None]:
data = pd.read_spss('..\\SPSS\\Data\\GISValTechSampleData.sav')
data['ParcelId'] = data['ParcelId'].astype(int)
data['SaleDate'] = pd.to_datetime(data['SaleDate'])
data['SalesPrice'] = data['SalesPrice'].astype(int)
data['Sqft'] = data['Sqft'].astype(int)
data['LandSize'] = data['LandSize'].astype(int)
data['Quality'] = pd.Categorical(data['Quality'], ordered = True, categories = [
    'Poor', 'BelowAverage', 'Average', 'AboveAverage', 'Superior'])
data['GarageSize'] = data['GarageSize'].astype(int)
data['EffAge'] = data['EffAge'].astype(int)
data['NBHD'] = pd.Categorical(data['NBHD'].astype(int))

data.set_index('ParcelId', inplace = True)
data.head()
data.dtypes

In [None]:
data['SPPSF'] = data['SalesPrice'] / data['Sqft']
data.groupby(['Quality'], observed = True)['SPPSF'].describe()
#data.groupby(['Quality'], observed = True)['SPPSF'].aggregate(['count', 'mean', 'median', 'std' ])

In [None]:
data.groupby(['NBHD'], observed = True)['SPPSF'].aggregate(['count', 'mean', 'median', 'std' ])

In [None]:
data['SYEAR'] = data['SaleDate'].dt.year
data['SMONTH'] = data['SaleDate'].dt.month
data['SDATE'] = data['SaleDate'].dt.to_period('M').dt.to_timestamp()
pd.crosstab(data['SYEAR'], data['SMONTH'])

In [None]:
startdate = pd.to_datetime('1/1/2023')
basedate = pd.to_datetime('12/31/2023')
timeperiod = (basedate.to_period('M') - startdate.to_period('M')).n
timeperiod

In [None]:
data['Months'] = [(x - startdate.to_period('M')).n for x in data['SaleDate'].dt.to_period('M')]
data['Month'] = timeperiod - data['Months']
data.head()


In [None]:
def calculatePctGood(effAge):
    if effAge < 0:
        effAge = 0
    elif effAge > 60:
        effAge = 60    
    return round(1 - (effAge / 100), 2)
        
    
data['Pct_Good'] = [calculatePctGood(x) for x in data['EffAge']]
data[['EffAge', 'Pct_Good']].head()
    

In [None]:
modelData = pd.DataFrame({"LN_SalesPrice" : np.log(data['SalesPrice']), "const" : 1})
modelData = pd.concat([modelData, data['Months']], axis = 1 )
modelData = addContinuous(modelData, data['Sqft'], 'LN_Sqft')
modelData = addContinuous(modelData, data['LandSize'], 'LN_LandSize')
modelData = addBinaries(modelData, data['Quality'], 'Quality_Average', 'Quality', minSales = 6)
modelData = addContinuous(modelData, data['Bathrooms'], 'LN_Bathrooms')
modelData = addContinuous(modelData, data['Pct_Good'], 'LN_Pct_Good')
modelData = addContinuous(modelData, data['GarageSize'], 'LN_GarageSize')
#modelData = addContinuous(modelData, data['GarageSize'] / 480, 'LN_GarageSize_Ratio')
modelData = addBinaries(modelData, data['NBHD'], 'NBHD_103', 'NBHD', minSales = 6)
modelData

In [None]:
# This assumes dependent variable is in the first column
while True:
    model = sm.OLS(
        modelData.iloc[:, 0],
        modelData.drop(modelData.columns[0], axis = 1)
    ).fit()
    ix = model.pvalues[model.pvalues.index != 'const'].idxmax()
    if(model.pvalues[ix] <= .05):
        break
    else:
        print("Dropping {0} p-value: {1} ".format(str(ix), model.pvalues[ix].round(3)))
        modelData.drop(columns = ix, inplace = True)

model.summary()

In [None]:
# show all methods and functions of an object
dir(model)

In [None]:
model.params

In [None]:
model.bse

In [None]:
getParameterCoverage(modelData)

In [None]:
data['ESP'] = estimateParcelValue(modelData, model.params ).astype(int)
data['Ratio'] = round((data['ESP'] / data['SalesPrice']), 2)
data.head()

In [None]:
PRB(data['ESP'], data['SalesPrice'], showGraph = True)

In [None]:
stats = pd.DataFrame(columns = ['Count', 'Mean', 'Median', 'WgtMean', 'Min', 'Max', 'PRD', 'COD'])
for name, gData in data.groupby(by='NBHD', observed = True):
    stats.loc[name] = [
        len(gData.index),
        '%0.2f' % gData['Ratio'].mean(),
        '%0.2f' % gData['Ratio'].median(),
        '%0.2f' % weightedMean(gData['ESP'], gData['SalesPrice']),
        gData['Ratio'].min(),
        gData['Ratio'].max(),
        '%0.2f' % PRD(gData['ESP'], gData['SalesPrice']),
        '%0.2f' % COD(gData['ESP'], gData['SalesPrice'])
    ]

stats

In [None]:
stats = pd.DataFrame(columns = ['Count', 'Mean', 'Median', 'WgtMean', 'Min', 'Max', 'PRD', 'COD'])
for name, gData in data.groupby(by='SDATE', observed = True):
    stats.loc[name.strftime('%b %Y')] = [
        len(gData.index),
        '%0.2f' % gData['Ratio'].mean(),
        '%0.2f' % gData['Ratio'].median(),
        '%0.2f' % weightedMean(gData['ESP'], gData['SalesPrice']),
        gData['Ratio'].min(),
        gData['Ratio'].max(),
        '%0.2f' % PRD(gData['ESP'], gData['SalesPrice']),
        '%0.2f' % COD(gData['ESP'], gData['SalesPrice'])
    ]

stats

In [None]:
p = sns.lmplot(x='Months', y='Ratio', data = data, lowess = True, line_kws={'color': 'red'})
p.fig.set_figwidth(8)
p.fig.set_figheight(4)
p.ax.set_title("Ratio by SaleDate")
p.ax.ticklabel_format(useOffset=False)
label = [x.strftime('%b %Y') for x in data.sort_values(by='SaleDate')["SDATE"].unique()]
p.ax.set_xticks(range(len(label)), labels=label)
p.ax.tick_params(axis="x", labelrotation=45)
plt.axhline(y=1.00, color = 'black')

In [None]:
p = sns.lmplot(x='Sqft', y='Ratio', data = data, lowess = True, line_kws={'color': 'red'})
p.fig.set_figwidth(8)
p.fig.set_figheight(4)
p.ax.set_title("Ratio by Sqft")
p.ax.ticklabel_format(useOffset=False)
plt.axhline(y=1.00, color = 'black')

In [None]:
p = sns.lmplot(x='LandSize', y='Ratio', data = data, lowess = True, line_kws={'color': 'red'})
p.fig.set_figwidth(8)
p.fig.set_figheight(4)
p.ax.set_title("Ratio by LandSize")
p.ax.ticklabel_format(useOffset=False)
plt.axhline(y=1.00, color = 'black')

In [None]:
p = sns.lmplot(x='EffAge', y='Ratio', data = data, lowess = True, line_kws={'color': 'red'})
p.fig.set_figwidth(8)
p.fig.set_figheight(4)
p.ax.set_title("Ratio by EffAge")
p.ax.ticklabel_format(useOffset=False)
plt.axhline(y=1.00, color = 'black')

In [None]:
p = sns.lmplot(x='Bathrooms', y='Ratio', data = data, lowess = True, line_kws={'color': 'red'})
p.fig.set_figwidth(8)
p.fig.set_figheight(4)
p.ax.set_title("Ratio by Bathrooms")
p.ax.ticklabel_format(useOffset=False)
plt.axhline(y=1.00, color = 'black')

In [None]:
p = sns.lmplot(x='GarageSize', y='Ratio', data = data, lowess = True, line_kws={'color': 'red'})
p.fig.set_figwidth(8)
p.fig.set_figheight(4)
p.ax.set_title("Ratio by GarageSize")
p.ax.ticklabel_format(useOffset=False)
plt.axhline(y=1.00, color = 'black')

In [None]:
data.groupby('Quality', observed=True)['Ratio'].describe()

In [None]:
data.loc[(data['Ratio'] >= 1.5) | (data['Ratio'] <= .65), ]

In [None]:
data.loc[(data['Ratio'] >= 1.5) | (data['Ratio'] <= .65), ].to_excel('.\\reports\\multiplicativebadratio.xlsx')