<a href="https://colab.research.google.com/github/IrynaTkachenko/ai-experience/blob/main/2021_05_25_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Oснови Spark

Для запуску в середовищі Google Colab запустимо наступний код

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://apache.volia.net/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz

--2021-05-25 11:57:40--  https://apache.volia.net/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
Resolving apache.volia.net (apache.volia.net)... 82.144.192.7
Connecting to apache.volia.net (apache.volia.net)|82.144.192.7|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 224374704 (214M) [application/x-gzip]
Saving to: ‘spark-3.1.1-bin-hadoop2.7.tgz.1’


2021-05-25 11:57:44 (59.2 MB/s) - ‘spark-3.1.1-bin-hadoop2.7.tgz.1’ saved [224374704/224374704]



In [2]:
!tar xf '/content/spark-3.1.1-bin-hadoop2.7.tgz'
!pip install -q findspark

Spark та Java у Colab встановлено.
Встановимо шляхи до середовища, які дозволятимуть запускати Pyspark. 


In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"


Запустимо локальну сесію Spark.

In [4]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Вітаємо! Colab вже працює з Pyspark. 
Побудуємо просту модель лінійної регресії.

# Модель лінійної регресії


Модель лінійної регресії є одним із найдавніших та широко застосовуваних підходів до машинного навчання, який передбачає взаємозв'язок між залежними та незалежними змінними. 

Лінійна регресія складається з лінії, що найкраще підходить, через розрізнені точки на графіку, а лінія, що найкраще підходить, відома як лінія регресії.

Мета цієї вправи прогнозувати ціни на житло за заданими ознаками. Давайте спрогнозуємо ціни набору даних про житло в Бостоні, розглядаючи MEDV як вихідну змінну, а всі інші змінні як вхідні.

In [5]:
!wget https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/master/BostonHousing.csv


--2021-05-25 11:57:56--  https://raw.githubusercontent.com/asifahmed90/pyspark-ML-in-Colab/master/BostonHousing.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 35735 (35K) [text/plain]
Saving to: ‘BostonHousing.csv.2’


2021-05-25 11:57:56 (24.7 MB/s) - ‘BostonHousing.csv.2’ saved [35735/35735]



Перевіряємо наявність файлів

In [6]:
!ls

BostonHousing.csv	   spark-3.1.1-bin-hadoop2.7.tgz
BostonHousing.csv.1	   spark-3.1.1-bin-hadoop2.7.tgz.1
BostonHousing.csv.2	   sparkjob.py
sample_data		   testsparkjob.py
spark-3.1.1-bin-hadoop2.7


Тепер, коли ми завантажили набір даних, ми можемо розпочати аналіз.
Для нашої моделі лінійної регресії нам потрібно імпортувати два модулі з Pyspark, тобто Vector Assembler та Linear Regression. Vector Assembler - це трансформатор, який збирає всі функції в один вектор із декількох стовпців, що містять тип float/double. Ми могли б використовувати StringIndexer, якщо будь-який із наших стовпців містить значення рядків для перетворення його в числові значення. На щастя, набір даних BostonHousing містить лише подвійні значення, тому наразі нам не потрібно турбуватися про StringIndexer.

In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StructType, IntegerType, DateType, StringType, FloatType

#'crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'
schema = StructType([
    StructField("crim", FloatType()),
    StructField("zn", FloatType()),
    StructField("indus", FloatType()),
    StructField("chas", FloatType()),
    StructField("nox", FloatType()),
    StructField("rm", FloatType()),
    StructField("age", FloatType()),
    StructField("dis", FloatType()),
    StructField("rad", FloatType()),
    StructField("tax", FloatType()),
    StructField("ptratio", FloatType()),
    StructField("b", FloatType()),
    StructField("lstat", FloatType()),
    StructField("medv", FloatType()),
])

dataset = spark.read.csv('BostonHousing.csv', schema=schema, header =True, 
                         ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)

In [8]:
# Або
dataset = spark.read.csv('BostonHousing.csv', inferSchema=True, header =True, 
                         ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True)

In [9]:
dataset.head(10)

[Row(crim=0.00632, zn=18.0, indus=2.31, chas=0, nox=0.538, rm=6.575, age=65.2, dis=4.09, rad=1, tax=296, ptratio=15.3, b=396.9, lstat=4.98, medv=24.0),
 Row(crim=0.02731, zn=0.0, indus=7.07, chas=0, nox=0.469, rm=6.421, age=78.9, dis=4.9671, rad=2, tax=242, ptratio=17.8, b=396.9, lstat=9.14, medv=21.6),
 Row(crim=0.02729, zn=0.0, indus=7.07, chas=0, nox=0.469, rm=7.185, age=61.1, dis=4.9671, rad=2, tax=242, ptratio=17.8, b=392.83, lstat=4.03, medv=34.7),
 Row(crim=0.03237, zn=0.0, indus=2.18, chas=0, nox=0.458, rm=6.998, age=45.8, dis=6.0622, rad=3, tax=222, ptratio=18.7, b=394.63, lstat=2.94, medv=33.4),
 Row(crim=0.06905, zn=0.0, indus=2.18, chas=0, nox=0.458, rm=7.147, age=54.2, dis=6.0622, rad=3, tax=222, ptratio=18.7, b=396.9, lstat=5.33, medv=36.2),
 Row(crim=0.02985, zn=0.0, indus=2.18, chas=0, nox=0.458, rm=6.43, age=58.7, dis=6.0622, rad=3, tax=222, ptratio=18.7, b=394.12, lstat=5.21, medv=28.7),
 Row(crim=0.08829, zn=12.5, indus=7.87, chas=0, nox=0.524, rm=6.012, age=66.6, di

Зверніть увагу, що ми використовували InferSchema всередині модуля read.csv. InferSchema дозволяє нам автоматично виводити різні типи даних для кожного стовпця.

Давайте роздрукуємося в наборі даних, щоб побачити типи даних кожного стовпця:

In [10]:
dataset.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



Наступним кроком є перетворення всіх функцій з різних стовпців в один стовпець і давайте назвемо цей новий векторний стовпець як 'Attributes' у outputCol.

In [11]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'b', 'lstat'], outputCol = 'Attributes')
output = assembler.transform(dataset)
#Input vs Output
finalized_data = output.select("Attributes","medv")

#print(finalized_data)
finalized_data.show()

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
|[0.03237,0.0,2.18...|33.4|
|[0.06905,0.0,2.18...|36.2|
|[0.02985,0.0,2.18...|28.7|
|[0.08829,12.5,7.8...|22.9|
|[0.14455,12.5,7.8...|27.1|
|[0.21124,12.5,7.8...|16.5|
|[0.17004,12.5,7.8...|18.9|
|[0.22489,12.5,7.8...|15.0|
|[0.11747,12.5,7.8...|18.9|
|[0.09378,12.5,7.8...|21.7|
|[0.62976,0.0,8.14...|20.4|
|[0.63796,0.0,8.14...|18.2|
|[0.62739,0.0,8.14...|19.9|
|[1.05393,0.0,8.14...|23.1|
|[0.7842,0.0,8.14,...|17.5|
|[0.80271,0.0,8.14...|20.2|
|[0.7258,0.0,8.14,...|18.2|
+--------------------+----+
only showing top 20 rows



Тут "Attributes" містяться у вхідних функціях з усіх стовпців, а "medv" - цільовий стовпець.
Далі слід розділити дані навчання та тестування відповідно до нашого набору даних (у цьому випадку 0,8 та 0,2).

In [12]:
#Split training and testing data
train_data,test_data = finalized_data.randomSplit([0.8,0.2])


regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')

#Learn to fit the model from training set
regressor = regressor.fit(train_data)

#To predict the prices on testing set
pred = regressor.evaluate(test_data)

#Predict the model
pred.predictions.show()

+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[0.01965,80.0,1.7...|20.1|21.119097258075737|
|[0.02177,82.5,2.0...|42.3|36.236488681553844|
|[0.02899,40.0,1.2...|26.6|21.995687844596937|
|[0.02985,0.0,2.18...|28.7|24.979849071675442|
|[0.03427,0.0,5.19...|19.5|20.262686886665314|
|[0.03551,25.0,4.8...|22.9| 25.34422557691682|
|[0.04337,21.0,5.6...|20.5|24.286238516525565|
|[0.04379,80.0,3.3...|19.4|26.602795965572252|
|[0.04462,25.0,4.8...|23.9| 27.01555398994781|
|[0.04741,0.0,11.9...|11.9|22.478464711398303|
|[0.05479,33.0,2.1...|28.4|30.443380234699443|
|[0.05515,33.0,2.1...|36.1| 32.23127950179518|
|[0.05602,0.0,2.46...|50.0| 33.98917462577596|
|[0.05644,40.0,6.4...|32.4| 36.07040547764153|
|[0.05646,0.0,12.8...|21.2| 21.37970698136602|
|[0.0566,0.0,3.41,...|23.6|29.818463777893434|
|[0.0578,0.0,2.46,...|37.2| 31.81980072869767|
|[0.06127,40.0,6.4...|33.1| 34.75369118453489|
|[0.06129,20.

Ми також можемо надрукувати коефіцієнт і перехоплення моделі регресії, використовуючи таку команду:

In [13]:
#coefficient of the regression model
coeff = regressor.coefficients

#X and Y intercept
intr = regressor.intercept

print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)


The coefficient of the model is : DenseVector([-0.1025, 0.0575, 0.041, 3.0304, -20.1239, 2.882, 0.0076, -1.4193, 0.2981, -0.012, -0.8926, 0.0075, -0.5264])
The Intercept of the model is : 41.984725


# Iнше

In [14]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)



RMSE: 5.325
MSE: 28.359
MAE: 3.850
r2: 0.777


#Створення Spark служби

Згенеруємо службу та запустимо її

In [15]:
sparkjob =  '''
import sys
 
from pyspark import SparkContext, SparkConf
 
if __name__ == "__main__":
 
  # create Spark context with Spark configuration
  conf = SparkConf().setAppName("Read Text to RDD - Python")
  sc = SparkContext(conf=conf)
 
  # read input text file to RDD
  numbers = sc.parallelize([1,7,8,9,5,77,48])
 
  # aggregate RDD elements using addition function
  sum = numbers.reduce(lambda a,b:a+b)
 
  print ("sum is : " + str(sum))
'''

f = open('testsparkjob.py', 'w')
f.write(sparkjob)
f.close()

In [16]:
!/content/spark-3.1.1-bin-hadoop2.7/bin/spark-submit /content/testsparkjob.py

21/05/25 11:58:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/05/25 11:58:11 INFO SparkContext: Running Spark version 3.1.1
21/05/25 11:58:11 INFO ResourceUtils: No custom resources configured for spark.driver.
21/05/25 11:58:11 INFO SparkContext: Submitted application: Read Text to RDD - Python
21/05/25 11:58:11 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/05/25 11:58:11 INFO ResourceProfile: Limiting resource is cpu
21/05/25 11:58:11 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/05/25 11:58:11 INFO SecurityManager: Changing view acls to: root
21/05/25 11: