<a href="https://colab.research.google.com/github/Rompil/hands-on/blob/master/PySpark%20Hands_on.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache-mirror.rbc.ru/pub/apache/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xvzf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
!wget http://devdemo.competentum.com/gt_sales_by_pos_month.csv

# Практическое введение в PySpark для Data Scientists (Часть 1)

---

by Roman Pilyugin

## Содержание

---


1. [Немного теории. RDD, Transformation, Action](#theory)
2. [Если я уже умею в Python и Pandas](#forSkilledPeople)
3. [Сравнение операций Pandas и PySpark](#PandasVsPySpark)
    1. [Чтение данных](#csvRead)
    3. [Некоторые основные примеры манипуляций с данными](#moreOps)
    4. [Визуализация данным](#pics)
    5. [Сохранение данных](#storeData)
4. [Погружаемся в Machine Learning](#ML)

##  Немного теории. RDD, Transformation, Action <a name =theory></a>

**RDD** (Resilient Distributed Dataset) - Это фундаментальная абстракция структуры данных в Spark. Ее можно представить в виде неизменяемой коллекции объектов. Сама коллекция может быть размещена по нескольким нодам (отдельным машинам) для параллельной обработки.

**Transformation** - операция, результатом которой является новый RDD. Таким образром, transformation задает связь ( последовательность) между RDD в виде последовательности преобразований. 

*Надо отметить, что все операции Transformation выполняются в **Lazy mode**, т.е. пока не вызвать одну из оперций Action, то преобразования не выполняются.*

### Основные операции Transformation

* map(function) — *применяет функцию function к каждому элементу датасета*

* .filter(function) — *возвращает все элементы датасета, на которых функция function вернула истинное значение*

* .distinct([numTasks]) — *возвращает датасет, который содержит уникальные элементы исходного датасета*

**Также стоит отметить об операциях над множествами, смысл которых понятен из названий:**

* .union(otherDataset)

* .intersection(otherDataset)

* .cartesian(otherDataset) — *новый датасет содержит в себе всевозможные пары (A,B), где первый элемент принадлежит исходному датасету, а второй — датасету-аргументу*





**Action** - операции, результат которых появляется немедленно, в отличии от Transformation. Это могут быть операции для вывода результата вычислений, сохранения в отдельный файл или выгрузка во внешнее хранилище.
*Можно сказать, что Action операция опзволяет извлечь данные из RDD.*

### Основные операции Action


* .saveAsTextFile(path) — *сохраняет данные в текстовый файл (в hdfs, на локальную машину или в любую другую поддерживаемую файловую систему — полный список можно посмотреть в документации)*

* .collect() — *возвращает элементы датасета в виде массива. Как правило, это применяется в случаях, когда данных в датасете уже мало (применены различные фильтры и преобразования) — и необходима визуализация, либо дополнительный анализ данных, например средствами пакета Pandas*

* .take(n) — *возвращает в виде массива первые n элементов датасета*

* .count() — *возвращает количество элементов в датасете*

* .reduce(function) — *знакомая операция для тех, кто знаком с MapReduce. Из механизма этой операции следует, что функция function (которая принимает на вход 2 аргумента возвращает одно значение) должна быть обязательно коммутативной и ассоциативной*



## Если я уже умею в Python и Pandas <a name=forSkilledPeople></a>

<p>Для практикующего специалиста по анализу данных PySpark может быть полезен когда: </p>


*   Уже знакомы c Python (Scala, Java, R) и Pandas DataFrame
*   Есть возможность развернуть кластер для обработки данных
*   Нужно обрабатывать больше объемы данных [^1].

[^1]:   Под большими данными понимаются те, что не помещаются в оперативной памяти одного компьютера



## Сравнение операций Pandas и PySpark <a name=PandasVsPySpark></a>

### Чтение данных<a name=csvRead></a>
В PySpark можно читать данные непосредственно из notebook`а

```python
simpleData = [5,7,1,12,10,25]
newRDD = sc.parallelize(localData)
```
Но чаще всего приходится иметь дело с внешними источниками данных, которые поддерживает Hadoop - HDFS, HBase и т.д.
```python
alsoRDD = sc.textFile("path_to_your_data_file")
```
Замечу, что в обоих случаях мы получаем RDD, о котором мы говорили раньше и с ними уже доступны операции Transformation и Action. 

Чтобы получить что-то более привычное, типа DataFrame можно выполнить:


In [0]:
instant_df = spark.sparkContext.parallelize([(1, 2, 3, 'a b c'),\
                                     (4, 5, 6, 'd e f'),\
                                     (7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])
instant_df.show() # Only now all transformations are executed. .show() - Action method

Или даже так:

In [0]:
Employee = spark.createDataFrame([('1', 'Joe', '70000', '1'),\
                                  ('2', 'Henry', '80000', '2'),\
                                  ('3', 'Sam', '60000', '2'),\
                                  ('4', 'Max', '90000', '1')],\
                                 ['Id', 'Name', 'Sallary','DepartmentId'])
Employee.show()


После первого знакомства с DataFrame в PySpark мы мможем сравнить с DataFrame в Pandas.

Ну и для первого примера рассмотрим чтение данных из CSV-файла.

__Как это в Pandas__
```python
import pandas as pd
df = pd.read_csv('/FileStore/tables/gt_sales_by_pos_month.csv')
```
Конечно, есть дополнительные параметры для детальнойго указания параметров CSV-файла и я их опустил.

__В PySpark__

In [0]:
df = spark.read.options(header=True).options(inferSchema=True).csv('gt_sales_by_pos_month.csv')
# the first line of files are used to name columns and are not included in data.
# automatically infer column types. It requires one extra pass over the data and is false by default.

Подробнее про чение файлов и возможные опции в [документации](https://docs.databricks.com/spark/latest/data-sources/read-csv.html#reading-files) 

Более того, DataFrame во многом схожа с таблицей в базе данных и можно посмотреть её схему:

In [0]:
df.printSchema()

### Манипуляции с данными <a name=manipulation></a>

Взглянем на результат предыдущей операции.
Для этого можем посмотреть на первые строки нашего DataFrame.

__Как это в Pandas__
```python
df.head(5)
```

__В PySpark__

In [0]:
df.show()

In [0]:
df.show(5)

В остальном, некоторые операции практически идентичны.

In [0]:
df.columns # узнаем про столбцы в таблице

In [0]:
df.dtypes # узнаем про типы данных в таблице

In [0]:
df.describe().show() # .show() needs to be call to show results

**Примечание 1:** В pandas и PySpark метод .describe() по-разному считает  стандартное отклонение. В первом случае несмещенная , а во втором смещенная оценка стандартного отклонения.

**Выбор данных.**

При работы с данными в основном манипулируют столбцами.

Посмотреть отдельный столбец или даже несколько можно так:

In [0]:
df.select('sales_date').distinct().show()

In [0]:
df.select(['brand_manufacturer', 'brand_variant']).distinct().count()

**Примечание 2:** *.distinct()* - возвращает неупорядоченную коллекцию уникальных элементов.

**Примечание 3:** *.count()* - поведение похоже на метов .count() в pandas, но отличается. Если в pandas результат равен количеству не NA/null элементов, то в PySpark оно равно просто количеству строк в заданном DataFrame.

__Фильтрация__

В PySpark DataFrame нет индексов для строк и выбор строк осуществляется по заданию определенных условий- фильтров. Это работает аналогично тому, как в pandas.Dataframe.

In [0]:
df[df.meg_mrsp > 200].distinct().count()

Таким образом можно формировать сложные уловия выбора:

In [0]:
df[(df.meg_mrsp > 85)&(df.brand_variant=='JADE LA ROSE 100 SSL')].count() # to know how many items satisfy the condition

**Операции со столбцами. **

__Удаление столбцов__

Может понадобиться убрать некоторые столбцы из таблицы т.к. они нам не пригодятся в дальнейшем.
*order_number* - отличный кандидат для этого.

__В Pandas__
```python
df.drop('order_number', axis=1)
```

__В PySpark__

In [0]:
df.drop('order_number').show(5)

В PySpark нет понятия индекса и axis. Вам не надо указывать явную ось.

__Добавление столбцов__.

DataFrame неизменяемы т.е. нельзя изменить содержимое столбца, но можно создать новый DataFrame.

**В  Pandas:**
```python
df['new_column'] = 'default value'
```
Конечно, в реальной задаче будет не так просто, это просто пример.

**В PySpark:**

In [0]:
df.withColumn('Dummy_Column', df.sales_volume_sticks * 2 ).show(5)

Возможна ситуация, когда применить изменения нужно "на месте" (in-place) - неизменяя заголовок столбца, хотя df неизменяемый объект.

In [0]:
df.withColumn('Dummy_Column', df.sales_volume_sticks * 2 ).drop('sales_volume_sticks').withColumnRenamed('Dummy_Column','sales_volume_sticks').show(5)

**Перегруппировка данных.**

Аналогична методу в pandas, отличий особых нет. 

**В Pandas**
```python
df.groupby('brand_manufacturer').agg({'sales_volume_sticks':'sum','meg_mrsp':'count'})
```
**В PySpark:**

In [0]:
df.groupby('brand_manufacturer').agg({'sales_volume_sticks':'sum','meg_mrsp':'count'}).show()

Стоит отдельно сказать про метод .agg() - для стандартных операций аггрегации проблем нет, но в PySpark нет пока возможности для использования кастомных (пользоветельских) функций и приходится использовать втроенный набор: 'sum', 'min', 'max', 'count' и 'mean'.

In [0]:
df.groupby('brand_manufacturer').min('meg_mrsp').show(5)

Конечно, тем небольшим набором функций возможности PySpark не ограничиваются. Есть отдлельный модуль pyspark.sql.functions c наиболее частовстречающимися функциями. ПОдробнее об этих функция в [документации](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$).

In [0]:
from pyspark.sql import functions as F
print(dir(F))

Теперь, зная о таком модуле мы можем переписать предыдущие примеры по другому.

In [0]:
df.groupby('brand_manufacturer').agg(F.min('meg_mrsp').alias('min_meg_mrsp'),\
                                    F.max('meg_mrsp').alias('max_meg_mrsp'),\
                                    F.min('sales_volume_sticks').alias('min_sales_volume_sticks'),\
                                    F.max('sales_volume_sticks').alias('max_sales_volume_sticks')).show(5)

Основной причиной почему стоит пользоваться именно встроенными функциями вместо самописных состоит в том,  что они написаны не на python, в на scala и выполняются нативным кодом на JVM с распределенными данными на нескольких нодах. Безусловно, такой код производительнее по сравнению с обычными питоновскими функциями.

Но может возникнуть ситуация, что стандартными функциями не обойтись. В этом случае можно воспользоваться User Defined Function (UDF)

Определим функцию, которую потом применим к столбцу.

In [0]:
def remove_negative_values(number):
  '''
  This function remove negative values with zeros
  
  '''
  return number if number > 0 else 0

В Pandas могло быть сделано так:
```python
pandasF = lambda row: remove_negative_values(row.sales_volume_sticks)
df.apply(pandasF, axis=1)
```

In [0]:
sparkF = F.udf(remove_negative_values, 'float')
df.select(sparkF(df.sales_volume_sticks).alias('result')).show(5)

Отчистка данных является одним и необходимых этапов в процессе приготовления данных перед непосредсвенных их анализом. Удаление строк с пропущенными значениями или замена значений в них на другие  довольно стандартные операции и не могло быть так, что для них не реализовали соответствующие методы.


In [0]:
df.fillna({'sales_volume_sticks':0.0, 'meg_mrsp':0.0 })
df.dropna()

Можно выбросить еще все дублирующие друг друга строки:

In [0]:
df.dropDuplicates()

Но может так случиться, что вам понадобится добавлять данные в DataFrame. На этот случай есть специальный метод .unionAll(). Схожий по назначению метод в pandas .concat(), но результат у нах может быть разный. 

Пример, когда два DataFrame  имеют отлиную друг от друга scheme, но все равно происходит слияние, даже  не возникает ошибки. Но если будет разное число столбцов, то ошибка возникнет, что отличается от поведения .concat() в pandas, где нераспознаные столбцы будут добавлены в итоговый DataFrame, а недостающие значения будут заменены на NaN.

In [0]:
new_instant_df = spark.sparkContext.parallelize([(1, 2, 3, 'a b c'),\
                                     (4, 5, 6, 'd e f'),\
                                     (42, 8, 9, 1)]).toDF(['col1', 'col2', 'col3','col5'])
new_instant_df.show()
instant_df.unionAll(new_instant_df).show()

**Приведение типов данных**

После того, как представление о структуре данных были получены можно посмотреть какой тип данных был выбран для каждого из столбцов в DataFrame, если ни один не подходит, то тип по-умолчанию - string. Иногда приходится явным образов преобразовывать содержимое одной ячейки к другому типу.

In [0]:
#from pyspark.sql.types import Integertype
df.withColumn('int_meg_mrsp', F.col('meg_mrsp').cast('int')).drop('meg_mrsp').withColumnRenamed('int_meg_mrsp','meg_mrsp').show()

Так же часто может понадобиться упорядочивать DataFrame по возрастанию или убыванию вдоль заданной колонки.
**В Pandas:**
```python
df.sort_value('meg_mrsp', ascending=True) # sorting from low to high
```
**В PySpark:**

In [0]:
df.sort('meg_mrsp',ascending=True).select('meg_mrsp').distinct().show()

In [0]:
df.orderBy(['brand_manufacturer','brand_variant'], ascending=[0,1]).distinct().show(20)

Если вам все же привычнее работать с pandas.DataFrame, то для этих случаев есть замечательный метод .toPandas().
Тут нужно еще упомянуть такой проект как [Apache Arrow](https://arrow.apache.org/), который позводяет оптимизировать работу  с pandas и numpy в PySpark. При этом, эта функция  по-умолчанию отключена. Чтобы включить ее надо просто установить флаг в True. Хотя это пока не вышло за рамки эксперимента.
```python
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
```
Еще одна важная вещь, о которой нужно помнить при преобразовании в pandas.DataFrame, это то, почему приходится использовать PySpark - большие объемы данных. Неосмотрительный вызов  .toPandas() может значительно снизить, а то и просто повесить, компьютер из-за слишком большого объема данных в оперативной памяти.
Хорошей практикой считается предварительно взглянуть на данные, что планируются преобразовать в pandas.DataFrame

In [0]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
df.show(5)
pdf = df.select('*').toPandas()
pdf.info()

На это возможности для оптимизации не заканчиваются. В основном оптимизация касается работой с память, ведь как говорилось ранее, что все методы Transformation сами по себе не выполняются, а лишь строят граф преобразований исходных данных, который выполняется при вызове одного из методов Action. Может так случиться, что вызывая несколько раз методы Action, мы затставляем раз за разом выполнять лдни и теже довольно трудоемкие задачи. Этого можно избежать, единажды закэшировав результат. Наприемр сразу после чтения, после всех промежуточных преобразований или перед записью данных в файл. 

Для освобождения памяти можно вызвать метод .unpersist()

In [0]:
%timeit df.sort('meg_mrsp',ascending=True).select('meg_mrsp').distinct().count()

In [0]:

df = df.cache()

In [0]:
%timeit df.sort('meg_mrsp',ascending=True).select('meg_mrsp').distinct().count()

 Когда уже все манипуляции с данными совершины, то самое время записать их в файл или другое внешнее хранилище.
 Это не создает обычно никаких сложностей:

In [0]:
df.write.csv('as_original_file.csv')

.csv() - это Action метод, который вызывает цепочку преобразований.
Можно последовательно записать все преобразования в строчку - это называется pipeline, хоя это термин нам еще встретится дальше.

In [0]:
spark.read.options(header=True)\
.options(inferSchema=True)\
.csv('gt_sales_by_pos_month.csv')\
.dropDuplicates()\
.fillna({'sales_volume_sticks':0.0, 'meg_mrsp':0.0 })\
.withColumn('int_meg_mrsp', F.col('meg_mrsp').cast('int')).drop('meg_mrsp').withColumnRenamed('int_meg_mrsp','meg_mrsp')\
.orderBy(['brand_manufacturer','brand_variant'], ascending=[0,1])\
.coalesce(1).write.format("com.databricks.spark.csv")\
.option("header", "true").csv('modified_example1.csv')

Но такой способ хранение результатов не самый эффективный, все данные для записи в один файл передаются в один едиственный worker, который и осуществляет запись.

#### Визуализация.

Для отображения результатов обработки и представления данных удобно использовать различные графики, но в самом PySpark встроенных инструментов для этого нет и приходится прибегать к сторонним модулям. Самый простой способ - это конвертировать DataFrame в pandas.DataFrame и уже использовтаь стандартные средства отображения (Matplotlib, seaborn или Bokeh).

Предполагается, что вы работаете с PySpark в Databricks и для отображения данных использовать можно функцию display(), которую можно настроить под конкретные требования к графику.



In [0]:
display(df)


In [0]:
display(df.take(5))