In [1]:
import pandas as pd
import numpy as np
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
from sklearn import preprocessing

from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.tree import RandomForest
from pyspark.ml.feature import HashingTF , Tokenizer
from pyspark.ml import Pipeline

In [2]:
import findspark
findspark.init('/opt/spark-3.2.1-bin-hadoop3.2/')

In [3]:
# yarn mode
spark = SparkSession\
        .builder\
        .master("yarn")\
        .config('spark.executor.instances','18')\
        .config('spark.executor.memory','8G')\
        .appName("dc")\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-05-25 15:22:41,050 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
spark.conf.set("spark.sql.execution.arrow.pyscpark.enabled", True)

In [6]:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [7]:
import pyspark.pandas as ps

In [8]:
ps.set_option("compute.default_index_type", "distributed")

---

## RandomForestClassifier

In [87]:
df = spark.read.csv('2.2.2_ML_financial.csv', header = True, inferSchema = True)
df.printSchema()



root
 |-- address: string (nullable = true)
 |-- style: string (nullable = true)
 |-- percent: double (nullable = true)
 |-- district: string (nullable = true)
 |-- parking_price: double (nullable = true)
 |-- date: integer (nullable = true)
 |-- floor: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- management: integer (nullable = true)
 |-- area: double (nullable = true)
 |-- total_price: double (nullable = true)
 |-- room: double (nullable = true)
 |-- living: double (nullable = true)
 |-- bath: double (nullable = true)
 |-- avg: double (nullable = true)
 |-- interest rate: double (nullable = true)
 |-- CCI: double (nullable = true)
 |-- CCI_rate: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- CPI_rate: double (nullable = true)
 |-- M1A: integer (nullable = true)
 |-- M1A_rate: double (nullable = true)
 |-- M1B: integer (nullable = true)
 |-- M1B_rate: double (nullabl

                                                                                

In [88]:
print((df.count(), len(df.columns)))

(514029, 40)


In [89]:
df = df.drop('district','avg','style','address')

In [90]:
df.printSchema()

root
 |-- percent: double (nullable = true)
 |-- parking_price: double (nullable = true)
 |-- date: integer (nullable = true)
 |-- floor: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- management: integer (nullable = true)
 |-- area: double (nullable = true)
 |-- total_price: double (nullable = true)
 |-- room: double (nullable = true)
 |-- living: double (nullable = true)
 |-- bath: double (nullable = true)
 |-- interest rate: double (nullable = true)
 |-- CCI: double (nullable = true)
 |-- CCI_rate: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- CPI_rate: double (nullable = true)
 |-- M1A: integer (nullable = true)
 |-- M1A_rate: double (nullable = true)
 |-- M1B: integer (nullable = true)
 |-- M1B_rate: double (nullable = true)
 |-- M2: integer (nullable = true)
 |-- M2_rate: double (nullable = true)
 |-- load_house: integer (nullable = true)
 |-- load_house_rate: do

In [91]:
numeric_features = [t[0] for t in df.dtypes if t[1] == 'double']
df.select(numeric_features).describe().toPandas().transpose()

2022-05-25 16:14:53,507 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
percent,514029,63.497010869036565,19.933697010946993,0.0,100.0
parking_price,514029,44.120543782549234,105.1606664078682,0.0,19720.0
age,514029,15.361862852095893,14.69609011985436,0.0,88.0
latitude,514029,25.04894192582704,0.08069287031535521,0.0,25.29294802135113
longitude,514029,121.49020366138929,0.2502124849505386,0.0,121.92797110266169
area,514029,38.63148983812194,27.426338157860933,0.02,10738.54
total_price,514029,1432.471660353794,1219.2875629338164,0.1,186978.1
room,514029,2.663554001817018,1.5687786209757524,0.0,784.0
living,514029,1.698281614461441,0.6046313729459096,0.0,80.0


In [92]:
from pyspark.ml.feature import VectorAssembler

numericCols = ['percent', 'parking_price', 'age', 'latitude','longitude','area','room','living','bath','interest rate','CCI','CCI_rate','CPI','CPI_rate','M1A_rate','M1B_rate','M2_rate','load_house_rate','NonPerforming_Loans_Ratio','TWSE','TWSE_rate','unemployment_rate','income_rate','GDP','GDP_rate']
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
df = assembler.transform(df)
df.show()

+-------+-------------+-----+-----+----+------------------+------------------+----------+-----+-----------+----+------+----+-------------+------+--------+------+--------+-------+--------+--------+--------+--------+-------+----------+---------------+----------+---------------+-------------------------+--------+---------+-----------------+--------+-----------+---------+--------+--------------------+
|percent|parking_price| date|floor| age|          latitude|         longitude|management| area|total_price|room|living|bath|interest rate|   CCI|CCI_rate|   CPI|CPI_rate|    M1A|M1A_rate|     M1B|M1B_rate|      M2|M2_rate|load_house|load_house_rate|load_archi|load_archi_rate|NonPerforming_Loans_Ratio|    TWSE|TWSE_rate|unemployment_rate|  income|income_rate|      GDP|GDP_rate|            features|
+-------+-------------+-----+-----+----+------------------+------------------+----------+-----+-----------+----+------+----+-------------+------+--------+------+--------+-------+--------+--------+--

In [None]:
from pyspark.ml.feature import StringIndexer

label_stringIdx = StringIndexer(inputCol = 'total_price', outputCol = 'label')
df = label_stringIdx.fit(df).transform(df)
df.show()

In [None]:
train, test = df.randomSplit([0.8, 0.2], seed = 1)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

---

## RandomForestRegressor

In [11]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [24]:
df = spark.read.csv('2.2.2_ML_financial.csv', header = True, inferSchema = True)

                                                                                

In [25]:
df = df.drop('district','avg','style','address')
df.printSchema()

root
 |-- percent: double (nullable = true)
 |-- parking_price: double (nullable = true)
 |-- date: integer (nullable = true)
 |-- floor: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- management: integer (nullable = true)
 |-- area: double (nullable = true)
 |-- total_price: double (nullable = true)
 |-- room: double (nullable = true)
 |-- living: double (nullable = true)
 |-- bath: double (nullable = true)
 |-- interest rate: double (nullable = true)
 |-- CCI: double (nullable = true)
 |-- CCI_rate: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- CPI_rate: double (nullable = true)
 |-- M1A: integer (nullable = true)
 |-- M1A_rate: double (nullable = true)
 |-- M1B: integer (nullable = true)
 |-- M1B_rate: double (nullable = true)
 |-- M2: integer (nullable = true)
 |-- M2_rate: double (nullable = true)
 |-- load_house: integer (nullable = true)
 |-- load_house_rate: do

In [26]:
(trainingData, testData) = df.randomSplit([0.8, 0.2])

In [42]:
# 移除total_price特徵
feature_list = []
for col in df.columns:
    if col == 'total_price':
        continue
    else:
        feature_list.append(col)
        
assembler = VectorAssembler(inputCols=feature_list, outputCol="features")

In [53]:
rf = RandomForestRegressor(labelCol="total_price", featuresCol="features", numTrees=100)

In [54]:
pipeline = Pipeline(stages=[assembler, rf])

In [55]:
rfevaluator = RegressionEvaluator(labelCol="total_price", predictionCol="prediction")

rfparamGrid = (ParamGridBuilder()             
               #.addGrid(rf.maxDepth, [10])             
               #.addGrid(rf.maxBins, [20])             
               .addGrid(rf.numTrees, [100])
               .build())

rfcv = CrossValidator(estimator = pipeline,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)

In [29]:
rfcvModel = rfcv.fit(trainingData)

                                                                                

In [30]:
rfpredictions = rfcvModel.transform(testData)

print('r2:', rfevaluator.evaluate(rfpredictions,{rfevaluator.metricName: "r2"}))
print('mae:', rfevaluator.evaluate(rfpredictions,{rfevaluator.metricName: "mae"}))
print('mse:', rfevaluator.evaluate(rfpredictions,{rfevaluator.metricName: "mse"}))



r2: 0.5796537060794487


                                                                                

mae: 405.51044425984963




mse: 712732.6345372349


                                                                                

In [35]:
bestPipeline = rfcvModel.bestModel
bestModel = bestPipeline.stages[1]
importances = bestModel.featureImportances

---

## Run one model to check time

In [74]:
df = spark.read.csv('2.2.2_ML_financial.csv', header = True, inferSchema = True)

                                                                                

In [75]:
df = df.drop('district','avg','style','address')
df.printSchema()

root
 |-- percent: double (nullable = true)
 |-- parking_price: double (nullable = true)
 |-- date: integer (nullable = true)
 |-- floor: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- management: integer (nullable = true)
 |-- area: double (nullable = true)
 |-- total_price: double (nullable = true)
 |-- room: double (nullable = true)
 |-- living: double (nullable = true)
 |-- bath: double (nullable = true)
 |-- interest rate: double (nullable = true)
 |-- CCI: double (nullable = true)
 |-- CCI_rate: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- CPI_rate: double (nullable = true)
 |-- M1A: integer (nullable = true)
 |-- M1A_rate: double (nullable = true)
 |-- M1B: integer (nullable = true)
 |-- M1B_rate: double (nullable = true)
 |-- M2: integer (nullable = true)
 |-- M2_rate: double (nullable = true)
 |-- load_house: integer (nullable = true)
 |-- load_house_rate: do

In [79]:
# 特徵全拿
feature_list = []
for col in df.columns:
    feature_list.append(col)
assembler = VectorAssembler(inputCols=feature_list, outputCol="features")

In [80]:
(trainingData, testData) = df.randomSplit([0.8, 0.2], seed=1)

In [81]:
pipeline = Pipeline(stages=[assembler, rf])

In [85]:
%%timeit
rf = RandomForestRegressor(labelCol="total_price", featuresCol="features", numTrees=100)

pipeline = Pipeline(stages=[assembler, rf])

rfModel = pipeline.fit(trainingData)

rfevaluator = RegressionEvaluator(labelCol="total_price", predictionCol="prediction")

rfpredictions = rfModel.transform(testData)


print('mse:', rfevaluator.evaluate(rfpredictions,{rfevaluator.metricName: "mse"}))
print('r2:', rfevaluator.evaluate(rfpredictions,{rfevaluator.metricName: "r2"}))

                                                                                

mse: 99279.43450531649


                                                                                

r2: 0.9163941391818047


                                                                                

mse: 99279.43450531649


                                                                                

r2: 0.9163941391818047


                                                                                

mse: 99188.29129187236


                                                                                

r2: 0.9164708933137727


                                                                                

mse: 99279.43450531649


                                                                                

r2: 0.9163941391818047


                                                                                

mse: 99279.43450531649


                                                                                

r2: 0.9163941391818047


                                                                                

mse: 99279.43450531649


                                                                                

r2: 0.9163941391818047


                                                                                

mse: 99188.29129187236


                                                                                

r2: 0.9164708933137727




mse: 99279.43450531649




r2: 0.9163941391818047
24.9 s ± 193 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


                                                                                

----

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn import linear_model
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.ensemble import RandomForestRegressor
import matplotlib.pyplot as plt 
plt.style.use('seaborn-whitegrid')
%matplotlib inline 

In [None]:
train = train.toPandas()
test = test.toPandas()

In [None]:
df_pd = pd.concat([train,test],axis=0)

In [None]:
X = df_pd.drop(['features'], axis=1)
y = df_pd[['label']]

In [None]:
from sklearn.ensemble import RandomForestRegressor
X_train,X_test,y_train,y_test=train_test_split(X, y, test_size=0.2, random_state=1) #random_state 種子值

scaler = preprocessing.StandardScaler().fit(X_train)

X_train = scaler.transform(X_train)

model = RandomForestRegressor(n_estimators=100, criterion = 'mse',) # 使用訓練資料訓練模型

model.fit(X_train, y_train)

X_test = scaler.transform(X_test)

y_pred = model.predict(X_test)

print("Mean squared error: {}".format(mean_squared_error(y_test, y_pred)))

print('R2 score: {}'.format(r2_score(y_test, y_pred)))


---