In [40]:
!pip install pyspark



In [42]:
from pyspark.sql import SparkSession, Row
from pyspark.sql import types as T
from pyspark.sql import window as W
from pyspark.sql import functions as F

spark = SparkSession.builder \
        .master("local") \
        .appName("Colab") \
        .getOrCreate()

In [41]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# 1. Linear Regression

In [6]:
import pandas as pd
import numpy as np
df = spark.read.csv('/content/sample_data/california_housing_train.csv', header=True)


DataFrame[longitude: string, latitude: string, housing_median_age: string, total_rooms: string, total_bedrooms: string, population: string, households: string, median_income: string, median_house_value: string]

In [7]:
df.show()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000| 226.000000|     3.191700|      73400.000000|
|-114.570000|33.570000|         20.000000|1454.000000|    326.000000| 624.000000| 262.000000|     1.925000|    

In [8]:
df.count(), len(df.columns)

(17000, 9)

In [11]:
df.columns[:-1]

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [21]:
# # pdf = df.toPandas()

# # pdf.to_parquet('/content/sample_data/california_housing_train.parquet')

# sdf = spark.read.parquet('/content/sample_data/california_housing_train.parquet')
# sdf.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: string (nullable = true)
 |-- median_house_value: string (nullable = true)



In [None]:
# df = df.withColumn('longitude',  F.col('longitude').cast(T.DoubleType()))
# df = df.withColumn('latitude',  F.col('latitude').cast(T.DoubleType()))
# df = df.withColumn('housing_median_age',  F.col('housing_median_age').cast(T.DoubleType()))
# df = df.withColumn('total_rooms',  F.col('total_rooms').cast(T.DoubleType()))
# df = df.withColumn('total_bedrooms',  F.col('total_bedrooms').cast(T.DoubleType()))
# df = df.withColumn('population',  F.col('population').cast(T.DoubleType()))
# df = df.withColumn('households',  F.col('households').cast(T.DoubleType()))
# df = df.withColumn('median_income',  F.col('median_income').cast(T.DoubleType()))
# df = df.withColumn('median_house_value',  F.col('median_house_value').cast(T.DoubleType()))

In [22]:
for col in df.columns:
  df = df.withColumn(col,  F.col(col).cast(T.DoubleType()))
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [23]:
from pyspark.ml.feature import VectorAssembler

featureassembler = VectorAssembler(inputCols=df.columns[:-1], outputCol='features')

In [27]:
feature_df = featureassembler.transform(df)

In [29]:
model_df = feature_df.select('median_house_value', 'features')

model_df.show(5, truncate=False)

+------------------+------------------------------------------------------+
|median_house_value|features                                              |
+------------------+------------------------------------------------------+
|66900.0           |[-114.31,34.19,15.0,5612.0,1283.0,1015.0,472.0,1.4936]|
|80100.0           |[-114.47,34.4,19.0,7650.0,1901.0,1129.0,463.0,1.82]   |
|85700.0           |[-114.56,33.69,17.0,720.0,174.0,333.0,117.0,1.6509]   |
|73400.0           |[-114.57,33.64,14.0,1501.0,337.0,515.0,226.0,3.1917]  |
|65500.0           |[-114.57,33.57,20.0,1454.0,326.0,624.0,262.0,1.925]   |
+------------------+------------------------------------------------------+
only showing top 5 rows



In [31]:
train_df, test_df = model_df.randomSplit(weights=[0.75, 0.25], seed=42)

train_df.count(), test_df.count()

(12804, 4196)

In [35]:
from pyspark.ml.regression import LinearRegression

lg_model = LinearRegression(featuresCol = 'features', labelCol='median_house_value')

In [36]:
lg_fit = lg_model.fit(train_df)

lg_fit.coefficients

DenseVector([-43141.8894, -42785.3442, 1158.5097, -8.0177, 109.8986, -37.344, 48.3053, 40416.8488])

In [37]:
pred_train = lg_fit.evaluate(train_df)



pred_train.predictions.show()

+------------------+--------------------+------------------+
|median_house_value|            features|        prediction|
+------------------+--------------------+------------------+
|           14999.0|[-123.17,40.31,36...|28888.394730238244|
|           14999.0|[-122.74,39.71,16...|60928.992368592415|
|           14999.0|[-117.02,36.4,19....|-16705.60729798721|
|           17500.0|[-118.33,34.15,39...| 170604.6205744804|
|           22500.0|[-122.32,37.93,33...| 175643.6282556099|
|           22500.0|[-116.57,35.43,8....|-67638.03387087304|
|           26900.0|[-119.46,35.13,46...|174920.23057235684|
|           27500.0|[-114.67,33.92,17...|-57447.09097884316|
|           28300.0|[-120.59,34.7,29....|108921.64240872767|
|           30000.0|[-115.73,33.35,23...| 48690.50679440377|
|           32500.0|[-121.52,38.58,24...| 76468.16213495238|
|           32500.0|[-121.36,38.56,20...| 72996.99865406519|
|           32500.0|[-115.88,32.93,15...|147429.02650986845|
|           32900.0|[-11

In [38]:
pred_test = lg_fit.evaluate(test_df)
pred_test.predictions.show()

+------------------+--------------------+------------------+
|median_house_value|            features|        prediction|
+------------------+--------------------+------------------+
|           14999.0|[-117.86,34.24,52...|234176.34721590858|
|           22500.0|[-121.29,37.95,52...| 79767.94892306905|
|           25000.0|[-114.65,32.79,21...|-21152.08489229856|
|           26600.0|[-119.45,35.13,34...| 163494.7945826659|
|           30000.0|[-119.45,35.07,45...|188197.52765391069|
|           34200.0|[-121.76,41.5,31....|-59129.58339740988|
|           36600.0|[-118.99,35.32,26...| 67873.69127641711|
|           38800.0|[-120.51,36.55,20...| 94679.51552490517|
|           38800.0|[-118.28,34.02,52...| 156371.2685364713|
|           39400.0|[-119.01,35.33,42...|112181.70837089606|
|           40000.0|[-120.38,40.98,27...|-78424.46896877838|
|           40000.0|[-115.53,34.91,12...|-7385.651015340351|
|           41700.0|[-119.44,36.48,27...|33801.152133275755|
|           42100.0|[-11

In [39]:
print("R2 for train data: " ,pred_train.r2)
print("R2 for test data: " ,pred_test.r2)



R2 for train data:  0.6394471875054302
R2 for test data:  0.6465212172511926


## 2. 분류 모델(Rf)

In [44]:
iris_df = spark.read.csv('/content/drive/MyDrive/Python Spark/iris.csv', header=True)
iris_df.show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|         .2| Setosa|
|         4.9|          3|         1.4|         .2| Setosa|
|         4.7|        3.2|         1.3|         .2| Setosa|
|         4.6|        3.1|         1.5|         .2| Setosa|
|           5|        3.6|         1.4|         .2| Setosa|
|         5.4|        3.9|         1.7|         .4| Setosa|
|         4.6|        3.4|         1.4|         .3| Setosa|
|           5|        3.4|         1.5|         .2| Setosa|
|         4.4|        2.9|         1.4|         .2| Setosa|
|         4.9|        3.1|         1.5|         .1| Setosa|
|         5.4|        3.7|         1.5|         .2| Setosa|
|         4.8|        3.4|         1.6|         .2| Setosa|
|         4.8|          3|         1.4|         .1| Setosa|
|         4.3|          3|         1.1| 

In [46]:
iris_df.printSchema()

root
 |-- sepal_length: string (nullable = true)
 |-- sepal_width: string (nullable = true)
 |-- petal_length: string (nullable = true)
 |-- petal_width: string (nullable = true)
 |-- species: string (nullable = true)



In [47]:
for i in iris_df.columns[:-1]:
  iris_df = iris_df.withColumn(i, F.col(i).cast(T.DoubleType()))

iris_df.printSchema()


root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [91]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=iris_df.columns[:-1], outputCol='features')





In [92]:
iris_assembler = assembler.transform(iris_df)

In [93]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'species', outputCol='label_index')

In [94]:
iris_index = indexer.fit(iris_assembler).transform(iris_assembler)


In [95]:
iris_index.dropDuplicates(subset = ['species', 'label_index']).show()

+------------+-----------+------------+-----------+----------+-----------------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|         features|label_index|
+------------+-----------+------------+-----------+----------+-----------------+-----------+
|         5.1|        3.5|         1.4|        0.2|    Setosa|[5.1,3.5,1.4,0.2]|        0.0|
|         7.0|        3.2|         4.7|        1.4|Versicolor|[7.0,3.2,4.7,1.4]|        1.0|
|         6.3|        3.3|         6.0|        2.5| Virginica|[6.3,3.3,6.0,2.5]|        2.0|
+------------+-----------+------------+-----------+----------+-----------------+-----------+



In [96]:
iris_train, iris_test = iris_index.randomSplit([0.75, 0.25], seed=42)

iris_train.count(), iris_test.count()

(116, 34)

In [97]:
from pyspark.ml.classification import RandomForestClassifier



rf = RandomForestClassifier(featuresCol= 'features', labelCol='label_index')





In [98]:
rf_model = rf.fit(iris_train)

pred_test = rf_model.transform(iris_train)

In [99]:
iris_prediction = pred_test.select('label_index', 'prediction')

In [100]:
iris_prediction.columns

['label_index', 'prediction']

In [101]:
iris_prediction.filter(F.col('label_index') !=F.col('prediction'))

DataFrame[label_index: double, prediction: double]

In [102]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol = 'label_index', predictionCol='prediction')

evaluator.evaluate(iris_prediction)

0.9913740417797212

In [103]:
from pyspark.mllib.evaluation import MulticlassMetrics

metrix = MulticlassMetrics(iris_prediction.rdd.map(tuple))



In [104]:
metrix.confusionMatrix().toArray()

array([[35.,  0.,  0.],
       [ 0., 38.,  0.],
       [ 0.,  1., 42.]])