In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Load the DeepSolar Data and Fields List

In [2]:
solar=pd.read_csv('../../deepsolar_tract.csv',encoding = "ISO-8859-1")
solar.head()
solar_fields=pd.read_csv('../deepsolar fields.csv')
solar_fields.head()

Unnamed: 0,Field,Description,Unit,Data Type,Formula,Possible Values,Observed Max,Observed Min,Theoretical Min,Theoretical Max,Relevant Feature
0,Unnamed: 0,Index,,Numeric,,,72537.0,0.0,,,0
1,tile_count,total number of tiles in census tract,,Numeric,,,4468.0,0.0,0.0,,0
2,solar_system_count,Total number of solar systems in census tract,,Numeric,,,1535.0,0.0,0.0,,0
3,total_panel_area,,,Numeric,,,592031.075,0.0,0.0,,0
4,fips,FIPS identifier for the census tract,,String,,,,,,,0


# Load/Test PySpark

In [3]:
from pyspark import SparkContext
sc = SparkContext()

In [4]:
import numpy as np

TOTAL = 100
dots = sc.parallelize([2.0 * np.random.random(2) - 1.0 for i in range(TOTAL)]).cache()
print("Number of random points:", dots.count())

stats = dots.stats()
print('Mean:', stats.mean())
print('stdev:', stats.stdev())

Number of random points: 100
Mean: [ 0.00671292  0.0173606 ]
stdev: [ 0.52734147  0.57312687]


# Train Model Using Spark ML

In [5]:
from numpy import allclose
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.sql.session import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
spark = SparkSession(sc)
from pyspark.ml.linalg import DenseVector
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

## Pre-process for Spark ML

In [None]:
features=solar_fields.loc[(solar_fields['Relevant Feature']==1)]['Field'].tolist()
#all_variables=['number_of_solar_system_per_household']features+

#Set infinity and blank spaces to NaN in independent variables, set infinite to 0 in dependent variable
solar2=solar[features].replace([np.inf,' '],np.nan)
solar2['number_of_solar_system_per_household']=solar['number_of_solar_system_per_household'].replace([np.inf,np.nan],0)

#Create binary version of number_of_solar_system_per_household for RF classifier

solar2['solar_flag']=solar2['number_of_solar_system_per_household'].apply(lambda x: int(x>0))


#convert states to indicator indices
solar2['state']=solar2['state'].astype('category').cat.codes

#create binary version of vote dem win variables

solar2['voting_2016_dem_win']=solar2['voting_2016_dem_win'].apply(lambda x: int(x))
solar2['voting_2012_dem_win']=solar2['voting_2012_dem_win'].apply(lambda x: int(x))

## Write solar2 to csv - cleaner than trying to convert pandas df to pyspark df

In [65]:
solar2.to_csv('ml_frame.csv',index=False)

## Read csv into pyspark df

In [6]:
df = spark.read.format("csv").option("header", True).option("inferSchema", True).load("ml_frame.csv")

In [7]:
df.printSchema()

root
 |-- average_household_income: double (nullable = true)
 |-- gini_index: double (nullable = true)
 |-- per_capita_income: double (nullable = true)
 |-- population_density: double (nullable = true)
 |-- state: integer (nullable = true)
 |-- education_less_than_high_school_rate: double (nullable = true)
 |-- education_high_school_graduate_rate: double (nullable = true)
 |-- education_college_rate: double (nullable = true)
 |-- education_bachelor_rate: double (nullable = true)
 |-- education_master_rate: double (nullable = true)
 |-- education_professional_school_rate: double (nullable = true)
 |-- education_doctoral_rate: double (nullable = true)
 |-- race_white_rate: double (nullable = true)
 |-- race_black_africa_rate: double (nullable = true)
 |-- race_indian_alaska_rate: double (nullable = true)
 |-- race_asian_rate: double (nullable = true)
 |-- race_islander_rate: double (nullable = true)
 |-- race_other_rate: double (nullable = true)
 |-- race_two_more_rate: double (nullable 

## Process Spark Df for Random Forest Classifier

In [9]:
cols=df.columns
indep_vars=[i for i in cols if i not in ['solar_flag','number_of_solar_system_per_household']]
classifier_cols=['solar_flag']+indep_vars
regressor_cols=['number_of_solar_system_per_household']+indep_vars

In [10]:
classifier_data=df.select(classifier_cols).rdd.map(lambda x: (x[0], DenseVector(x[1:])))
regressor_data=df.select(regressor_cols).rdd.map(lambda x: (x[0], DenseVector(x[1:])))

In [11]:
classifier_df = spark.createDataFrame(classifier_data, ["label", "features"])
regressor_df=spark.createDataFrame(regressor_data, ["label", "features"])

## Create Model Pipeline - Classifier

In [13]:
#index the labels and the features for the random forest model
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(classifier_df)
featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=50).fit(classifier_df)

#split the data into training and test sets
(trainingData, testData) = classifier_df.randomSplit([0.8, 0.2],seed=1234)

#define the model and the label converter
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=100,maxBins=50)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

#put all the steps together into a pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

#fit the training data
model = pipeline.fit(trainingData)

#generate predictions on the test data
predictions = model.transform(testData)

## Evaluate Model Accuracy

In [18]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print('test set accuracy: ',accuracy)

test set accuracy:  0.7919685365348789


## Get Feature Importances

In [25]:
rfModel = model.stages[2]
feature_importances=rfModel.featureImportances
#print(len(indep_vars))
fia=feature_importances.toArray()
#print(fia)
fi_list=[]
for i in range(fia.shape[0]):
    fi_list.append((indep_vars[i],fia[i]))
sorted_list=sorted(fi_list,reverse=True,key=lambda k: k[1])
sorted_list[0:20]

[('population_density', 0.11299866881886732),
 ('heating_fuel_coal_coke_rate', 0.098176844853940903),
 ('occupancy_vacant_rate', 0.049762576548371677),
 ('race_asian_rate', 0.046163656759442014),
 ('occupation_agriculture_rate', 0.034832859441115825),
 ('electricity_price_commercial', 0.033415698065753757),
 ('housing_unit_median_gross_rent', 0.026326667549875869),
 ('property_tax', 0.023731684609069541),
 ('electricity_price_overall', 0.023721467351555704),
 ('electricity_consume_industrial', 0.022019324924780013),
 ('housing_unit_median_value', 0.021936595356244731),
 ('travel_time_average', 0.021796981588708852),
 ('state', 0.021055579803029127),
 ('education_high_school_graduate_rate', 0.020809441498531023),
 ('lon', 0.018780965379348695),
 ('number_of_years_of_education', 0.017303545051805938),
 ('race_white_rate', 0.016746424223133879),
 ('electricity_consume_total', 0.016123029513001503),
 ('transportation_public_rate', 0.015691885592182733),
 ('incentive_residential_state_level