In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=27e5d2b2f51ec3d4a405f847c48ec709282f324614489935f86fccf42e8426f9
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .getOrCreate()

df = spark.read.format("csv").load("housing.csv", header=True, inferSchema=True)


In [None]:
train, test = df.randomSplit([0.7, 0.3])


In [None]:
from pyspark.ml.feature import Imputer
empties = train.columns
empties.remove('median_house_value')
empties.remove('ocean_proximity')


imputer = Imputer(inputCols=empties,
                  outputCols=empties)

imputer = imputer.fit(train)

train = imputer.transform(train)
test = imputer.transform(test)

In [None]:
from pyspark.ml.feature import VectorAssembler

vector_asse = VectorAssembler(inputCols=empties,
                                             outputCol='numeric_vector')

train = vector_asse.transform(train)
test = vector_asse.transform(test)

In [None]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='numeric_vector',
                        outputCol='scaled',
                        withStd=True, withMean=True)

scaler = scaler.fit(train)

train = scaler.transform(train)
test = scaler.transform(test)

In [None]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='ocean_proximity',
                        outputCol='ocean_index')

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)


In [None]:
from pyspark.ml.feature import OneHotEncoder

one_hot_encoder = OneHotEncoder(inputCol='ocean_index',
                                outputCol='ocean_onehot')

one_hot_encoder = one_hot_encoder.fit(train)

train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)


In [None]:
train.show()

In [None]:
assembler = VectorAssembler(inputCols=['scaled',
                                       'ocean_onehot'],
                            outputCol='final')

train = assembler.transform(train)
test = assembler.transform(test)

In [None]:
train.select("final").show(truncate=False)

In [None]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='final',
                      labelCol='median_house_value')
lr = lr.fit(train)

In [None]:
pred = lr.transform(test)
sonuc = pred[['prediction','median_house_value']]

sonuc_rdd = sonuc.rdd

In [None]:
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(sonuc_rdd)

print("Mean Squared Error:", metrics.meanSquaredError )
print("Root Mean Squared Error:", metrics.rootMeanSquaredError )
print("Mean Absolute Error:", metrics.meanAbsoluteError )
print("R**2:", metrics.r2 )


Mean Squared Error: 4396705654.4360695
Root Mean Squared Error: 66307.65909332095
Mean Absolute Error: 48623.43026733317
R**2: 0.6696226253072244


In [None]:
from pyspark.ml.regression import RandomForestRegressor
rfr = RandomForestRegressor(featuresCol='final',
                      labelCol='median_house_value')
model = rfr.fit(train)

In [None]:
pred = model.transform(test)
sonuc = pred[['prediction','median_house_value']]

sonuc_rdd = sonuc.rdd

In [None]:
from pyspark.mllib.evaluation import RegressionMetrics

metrics = RegressionMetrics(sonuc_rdd)

print("Mean Squared Error:", metrics.meanSquaredError )
print("Root Mean Squared Error:", metrics.rootMeanSquaredError )
print("Mean Absolute Error:", metrics.meanAbsoluteError )
print("R**2:", metrics.r2 )

In [None]:
from pyspark.ml.classification import LogisticRegression
logic = LogisticRegression(featuresCol='final',
                      labelCol='median_house_value')
model2 = logic.fit(train)