In [666]:
from pyspark.sql import SparkSession #imports SparkSession

In [802]:
spark = SparkSession.builder.appName('House_Build_Prediction').getOrCreate() #Creates the appname 

In [671]:
df = spark.read.csv('/users/f/desktop/House_Build_Prediction5.csv',inferSchema = True, header = True) #Reads the data 

In [672]:
df.head(5)

[Row(Company_Name='A', Houses_Built=2, Duration_Months=10.0, Size_Each_House_m2=100),
 Row(Company_Name='A', Houses_Built=3, Duration_Months=30.0, Size_Each_House_m2=200),
 Row(Company_Name='A', Houses_Built=5, Duration_Months=50.0, Size_Each_House_m2=200),
 Row(Company_Name='A', Houses_Built=10, Duration_Months=60.0, Size_Each_House_m2=120),
 Row(Company_Name='A', Houses_Built=12, Duration_Months=48.0, Size_Each_House_m2=80)]

In [673]:
df.groupBy('Company_Name').count().show(5)#Shows the groups and counts. 

+------------+-----+
|Company_Name|count|
+------------+-----+
|           B|    9|
|           A|   10|
+------------+-----+



In [674]:
from pyspark.ml.feature import StringIndexer #imports string indexer - We should import this because we need to give index numbers to the string values to make it understandable. 

In [675]:
indexer = StringIndexer(inputCol = 'Company_Name', outputCol = 'Company_Name_Index')#indexing the first string feature "Job_title"

In [711]:
indexed3 = indexer.fit(df).transform(df)

In [712]:
indexed2.head(5)

[Row(Company_Name='A', Houses_Built=2, Duration_Months=10.0, Size_Each_House_m2=100, Company_Name_Index=0.0),
 Row(Company_Name='A', Houses_Built=3, Duration_Months=30.0, Size_Each_House_m2=200, Company_Name_Index=0.0),
 Row(Company_Name='A', Houses_Built=5, Duration_Months=50.0, Size_Each_House_m2=200, Company_Name_Index=0.0),
 Row(Company_Name='A', Houses_Built=10, Duration_Months=60.0, Size_Each_House_m2=120, Company_Name_Index=0.0),
 Row(Company_Name='A', Houses_Built=12, Duration_Months=48.0, Size_Each_House_m2=80, Company_Name_Index=0.0)]

In [713]:
indexed3.head(5)

[Row(Company_Name='A', Houses_Built=2, Duration_Months=10.0, Size_Each_House_m2=100, Company_Name_Index=0.0),
 Row(Company_Name='A', Houses_Built=3, Duration_Months=30.0, Size_Each_House_m2=200, Company_Name_Index=0.0),
 Row(Company_Name='A', Houses_Built=5, Duration_Months=50.0, Size_Each_House_m2=200, Company_Name_Index=0.0),
 Row(Company_Name='A', Houses_Built=10, Duration_Months=60.0, Size_Each_House_m2=120, Company_Name_Index=0.0),
 Row(Company_Name='A', Houses_Built=12, Duration_Months=48.0, Size_Each_House_m2=80, Company_Name_Index=0.0)]

In [679]:
from pyspark.ml.linalg import Vectors 



In [680]:
from pyspark.ml.feature import VectorAssembler 

In [714]:
indexed3.columns

['Company_Name',
 'Houses_Built',
 'Duration_Months',
 'Size_Each_House_m2',
 'Company_Name_Index']

In [719]:
assembler2  = VectorAssembler(inputCols = [
 'Houses_Built',
 'Size_Each_House_m2',
 'Company_Name_Index',
 ], outputCol = 'features')

In [720]:
assembler2

VectorAssembler_434ebb2bd685

In [721]:
output = assembler2.transform(indexed3)

In [722]:
output.show()

+------------+------------+---------------+------------------+------------------+----------------+
|Company_Name|Houses_Built|Duration_Months|Size_Each_House_m2|Company_Name_Index|        features|
+------------+------------+---------------+------------------+------------------+----------------+
|           A|           2|           10.0|               100|               0.0| [2.0,100.0,0.0]|
|           A|           3|           30.0|               200|               0.0| [3.0,200.0,0.0]|
|           A|           5|           50.0|               200|               0.0| [5.0,200.0,0.0]|
|           A|          10|           60.0|               120|               0.0|[10.0,120.0,0.0]|
|           A|          12|           48.0|                80|               0.0| [12.0,80.0,0.0]|
|           A|          15|           67.5|                90|               0.0| [15.0,90.0,0.0]|
|           A|          18|          108.0|               120|               0.0|[18.0,120.0,0.0]|
|         

In [723]:
output.select('features','Duration_Months').show()

+----------------+---------------+
|        features|Duration_Months|
+----------------+---------------+
| [2.0,100.0,0.0]|           10.0|
| [3.0,200.0,0.0]|           30.0|
| [5.0,200.0,0.0]|           50.0|
|[10.0,120.0,0.0]|           60.0|
| [12.0,80.0,0.0]|           48.0|
| [15.0,90.0,0.0]|           67.5|
|[18.0,120.0,0.0]|          108.0|
| [20.0,70.0,0.0]|           70.0|
|[24.0,110.0,0.0]|          132.0|
|[30.0,130.0,0.0]|          195.0|
| [4.0,100.0,1.0]|           10.0|
| [8.0,200.0,1.0]|           40.0|
|[20.0,130.0,1.0]|           65.0|
|[25.0,120.0,1.0]|           75.0|
|[28.0,180.0,1.0]|          126.0|
|[32.0,140.0,1.0]|          112.0|
| [34.0,80.0,1.0]|           68.0|
|[36.0,180.0,1.0]|          162.0|
|[40.0,140.0,1.0]|          140.0|
+----------------+---------------+



In [724]:
final_data = output.select(['features','Duration_Months'])

In [725]:
final_data.show()

+----------------+---------------+
|        features|Duration_Months|
+----------------+---------------+
| [2.0,100.0,0.0]|           10.0|
| [3.0,200.0,0.0]|           30.0|
| [5.0,200.0,0.0]|           50.0|
|[10.0,120.0,0.0]|           60.0|
| [12.0,80.0,0.0]|           48.0|
| [15.0,90.0,0.0]|           67.5|
|[18.0,120.0,0.0]|          108.0|
| [20.0,70.0,0.0]|           70.0|
|[24.0,110.0,0.0]|          132.0|
|[30.0,130.0,0.0]|          195.0|
| [4.0,100.0,1.0]|           10.0|
| [8.0,200.0,1.0]|           40.0|
|[20.0,130.0,1.0]|           65.0|
|[25.0,120.0,1.0]|           75.0|
|[28.0,180.0,1.0]|          126.0|
|[32.0,140.0,1.0]|          112.0|
| [34.0,80.0,1.0]|           68.0|
|[36.0,180.0,1.0]|          162.0|
|[40.0,140.0,1.0]|          140.0|
+----------------+---------------+



In [726]:
final_data.head()

Row(features=DenseVector([2.0, 100.0, 0.0]), Duration_Months=10.0)

In [727]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [728]:
train_data.describe().show()

+-------+-----------------+
|summary|  Duration_Months|
+-------+-----------------+
|  count|               11|
|   mean|75.45454545454545|
| stddev|58.19856293133643|
|    min|             10.0|
|    max|            195.0|
+-------+-----------------+



In [785]:
test_data.describe().show()

+-------+-----------------+
|summary|  Duration_Months|
+-------+-----------------+
|  count|                8|
|   mean|          92.3125|
| stddev|40.95375222648801|
|    min|             48.0|
|    max|            162.0|
+-------+-----------------+



In [786]:
from pyspark.ml.regression import LinearRegression #imports the lineerRegression

In [787]:
duration_linear = LinearRegression(labelCol = 'Duration_Months')

In [788]:
trained_duration_model = duration_linear.fit(train_data)

23/01/14 14:24:47 WARN Instrumentation: [ba9b035b] regParam is zero, which might cause numerical instability and overfitting.


In [789]:
duration_results = trained_duration_model.evaluate(test_data)

In [790]:
duration_results.rootMeanSquaredError

12.125916582029038

In [791]:
duration_results.r2

0.8998078485039522

In [792]:
data = ([[2,140,0]])

In [793]:
new_df = spark.createDataFrame(data)

In [794]:
new_df.show()

+---+---+---+
| _1| _2| _3|
+---+---+---+
|  2|140|  0|
+---+---+---+



In [795]:
new_assembler = VectorAssembler(inputCols = [
 '_1',
 '_2',
 '_3',
 ], outputCol = 'features')

In [796]:
new_data_output = new_assembler.transform(new_df)

In [797]:
new_data_output.show()

+---+---+---+---------------+
| _1| _2| _3|       features|
+---+---+---+---------------+
|  2|140|  0|[2.0,140.0,0.0]|
+---+---+---+---------------+



In [798]:
final_new_data_output = new_data_output.select('features')

In [799]:
final_new_data_output.show()

+---------------+
|       features|
+---------------+
|[2.0,140.0,0.0]|
+---------------+



In [800]:
prediction_new_data = trained_duration_model.transform(final_new_data_output)

In [801]:
prediction_new_data.show()

+---------------+-----------------+
|       features|       prediction|
+---------------+-----------------+
|[2.0,140.0,0.0]|34.36829586935157|
+---------------+-----------------+

