# Apache Spark Scala API

## 1. Spark Cluster

### 1.1. Get Spark

Получим Apache Spark из Maven

In [1]:
import $ivy.`org.apache.spark::spark-sql:3.0.0`;

[32mimport [39m[36m$ivy.$                                  ;[39m

In [2]:
import org.apache.log4j.{Level, Logger};
Logger.getLogger("org").setLevel(Level.OFF);

[32mimport [39m[36morg.apache.log4j.{Level, Logger};
[39m

### 1.2. Connection

Для использования Spark необходимо создать SparkSession с параметрами:

+ **appName:** имя в [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, то что использует Spark Workers;
+ **spark.executor.memory:** кол-во ОЗУ для executor'a, должно быть не больше чем SPARK_WORKER_MEMORY в конфиге.

In [None]:
import org.apache.spark.sql._

val spark = SparkSession.
            builder().
            appName("scala-spark-notebook").
            master("spark://spark-master:7077").
            config("spark.executor.memory", "512m").
            getOrCreate()

SparkSession имеет и больше конфигураций, их можно указать в **config** методах. Все описания есть в [документации](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/SparkSession.html).

## 2. Data

Процесс повторяет ноутбук PySpark

### 2.1. Read

Прочитаем данные из директории ([данные с Kaggle](https://www.kaggle.com/bank-of-england/a-millennium-of-macroeconomic-data)) 

In [4]:
val data = spark.read.format("csv").option("sep", ",").option("header", "true").load("data/uk-macroeconomic-data.csv")

[36mdata[39m: [32mDataFrame[39m = [Description: string, Real GDP of England at market prices: string ... 75 more fields]

In [5]:
data.count

[36mres4[39m: [32mLong[39m = [32m841L[39m

In [6]:
data.columns.size

[36mres5[39m: [32mInt[39m = [32m77[39m

In [None]:
data.printSchema

In [8]:
var unemployment = data.select("Description", "Population (GB+NI)", "Unemployment rate")

In [9]:
unemployment.show(10)

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|      Units|              000s|                %|
|       1209|              null|             null|
|       1210|              null|             null|
|       1211|              null|             null|
|       1212|              null|             null|
|       1213|              null|             null|
|       1214|              null|             null|
|       1215|              null|             null|
|       1216|              null|             null|
|       1217|              null|             null|
+-----------+------------------+-----------------+
only showing top 10 rows



In [10]:
val cols_description = unemployment.filter(unemployment("Description") === "Units")

[36mcols_description[39m: [32mDataset[39m[[32mRow[39m] = [Description: string, Population (GB+NI): string ... 1 more field]

In [11]:
cols_description.show()

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|      Units|              000s|                %|
+-----------+------------------+-----------------+



In [12]:
unemployment = unemployment.join(cols_description, unemployment("Description") === cols_description("Description"), "left_anti")

In [13]:
unemployment.show(10)

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|       1209|              null|             null|
|       1210|              null|             null|
|       1211|              null|             null|
|       1212|              null|             null|
|       1213|              null|             null|
|       1214|              null|             null|
|       1215|              null|             null|
|       1216|              null|             null|
|       1217|              null|             null|
|       1218|              null|             null|
+-----------+------------------+-----------------+
only showing top 10 rows



In [14]:
unemployment = unemployment.na.drop()

In [15]:
unemployment = unemployment.
                withColumnRenamed("Description", "year").
                withColumnRenamed("Population (GB+NI)", "population").
                withColumnRenamed("Unemployment rate", "unemployment_rate")

In [16]:
unemployment.show(10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



### 3.4. Write

In [17]:
unemployment.repartition(1).write.format("csv").mode("overwrite").option("sep", ",").option("header", "true").save("data/uk-macroeconomic-unemployment-data.csv")