# PySpark Dataframe Tutorial – PySpark Programming with Dataframes
Примеры взяты из [этого блога](https://www.edureka.co/blog/pyspark-dataframe-tutorial/)

In [1]:
from pyspark.sql import *

In [2]:
spark = SparkSession.builder.master("local").appName("Tutorial")\
.config("spark.some.config.option", "some-value") \
.getOrCreate()

## Создание DataFrame из списка Rows

Создадим два DataFreme'a:
 - работники (Employee)
 - отделы (Department)

### Работники
Создадим список объектов `Row` и определим поля с помощью функции `pyspark.sql.Row`:
 - Имя (firstName)
 - Фамилия (lastName)
 - Электронная почта (email)
 - Зарплата (salary)

In [3]:
Employee = Row('firstName', 'lastName', 'email', 'salary')

In [4]:
type(Employee)

pyspark.sql.types.Row

Теперь добавим в таблицу записи:

In [5]:
employee1 = Employee('Basher', 'armbrust', 'bash@edureka.co', 100000)
employee2 = Employee('Daniel', 'meng', 'daniel@stanford.edu', 120000 )
employee3 = Employee('Muriel', None, 'muriel@waterloo.edu', 140000 )
employee4 = Employee('Rachel', 'wendell', 'rach_3@edureka.co', 160000 )
employee5 = Employee('Zach', 'galifianakis', 'zach_g@edureka.co', 160000 )

Объект Employee итерируемый

In [6]:
Employee[1]

'lastName'

In [7]:
print(employee3)

Row(firstName='Muriel', lastName=None, email='muriel@waterloo.edu', salary=140000)


### Отделы
Для отделов определим следующие поля:
 - id (идентификатор отдела)
 - name (название отдела)

In [8]:
department1 = Row(id='123456', name='HR')
department2 = Row(id='789012', name='OPS')
department3 = Row(id='345678', name='FN')
department4 = Row(id='901234', name='DEV')

### Определим работников в отделы

In [9]:
departmentWithEmployees1 = Row(department=department1, 
                               employees=[employee1, employee2, employee5])

departmentWithEmployees2 = Row(department=department2, 
                               employees=[employee3, employee4])

departmentWithEmployees3 = Row(department=department3, 
                               employees=[employee1, employee4, employee3])

departmentWithEmployees4 = Row(department=department4, 
                               employees=[employee2, employee3])

## Создадим наконец DataFrame из списка Row

Список Row

In [10]:
departmentsWithEmployees_Seq = [departmentWithEmployees1, 
                                departmentWithEmployees2,
                                departmentWithEmployees3,
                                departmentWithEmployees4]

Теперь воспользуемся методом `createDataFrame`.

In [11]:
df = spark.createDataFrame(departmentsWithEmployees_Seq)

In [12]:
df.show(n=2)

+-------------+--------------------+
|   department|           employees|
+-------------+--------------------+
| [123456, HR]|[[Basher, armbrus...|
|[789012, OPS]|[[Muriel,, muriel...|
+-------------+--------------------+
only showing top 2 rows



# Чтение из csv файла в DataFrame

Данные взяты [здесь](https://www.kaggle.com/abecklas/fifa-world-cup/version/5#WorldCupPlayers.csv)

## FIFA World Cup

**Описание данных**
- RoundID - Unique ID of the round
- MatchID - Unique ID of the match
- Team Initials - Player's team initials
- Coach - Name Name and country of the team coach
- Line-up - S=Line-up, N=Substitute
- Shirt - Number Shirt number if available
- Player - Name Name of the player
- Position - C=Captain, GK=Goalkeeper
- Event:
G=Goal, OG=Own Goal, Y=Yellow Card, R=Red Card, SY = Red Card by second yellow, P=Penalty, MP=Missed Penalty, I = Substitution In, O=Substitute Out

In [13]:
fifa_df = spark.read.csv("WorldCupPlayers.csv", inferSchema=True, header=True)

inferSchema опредяет схему из данных для этого требуется один дополнительный проход

In [14]:
fifa_df.show(n=3)

+-------+-------+-------------+-------------------+-------+------------+----------------+--------+-----+
|RoundID|MatchID|Team Initials|         Coach Name|Line-up|Shirt Number|     Player Name|Position|Event|
+-------+-------+-------------+-------------------+-------+------------+----------------+--------+-----+
|    201|   1096|          FRA|CAUDRON Raoul (FRA)|      S|           0|     Alex THEPOT|      GK| null|
|    201|   1096|          MEX|   LUQUE Juan (MEX)|      S|           0| Oscar BONFIGLIO|      GK| null|
|    201|   1096|          FRA|CAUDRON Raoul (FRA)|      S|           0|Marcel LANGILLER|    null| G40'|
+-------+-------+-------------+-------------------+-------+------------+----------------+--------+-----+
only showing top 3 rows



Посмотрим структуру данных (Schema)

In [15]:
fifa_df.printSchema()

root
 |-- RoundID: integer (nullable = true)
 |-- MatchID: integer (nullable = true)
 |-- Team Initials: string (nullable = true)
 |-- Coach Name: string (nullable = true)
 |-- Line-up: string (nullable = true)
 |-- Shirt Number: integer (nullable = true)
 |-- Player Name: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Event: string (nullable = true)



### Размер DataFrame

In [16]:
print("\n".join(fifa_df.columns))
print()
print("Size : {} x {}\n".format(fifa_df.count(), len(fifa_df.columns)))

RoundID
MatchID
Team Initials
Coach Name
Line-up
Shirt Number
Player Name
Position
Event

Size : 37784 x 9



## Описание конкретного признака (столбца)

Описание это:
    - количество объектов
    - среднее значение
    - стандартное отклонение
    - минимум
    - максимум

In [17]:
fifa_df.describe('Coach Name').show()

+-------+-------------------+
|summary|         Coach Name|
+-------+-------------------+
|  count|              37784|
|   mean|               null|
| stddev|               null|
|    min|ACOSTA Nelson (URU)|
|    max|         ZICO (BRA)|
+-------+-------------------+



In [18]:
fifa_df.describe('Position').show()

+-------+--------+
|summary|Position|
+-------+--------+
|  count|    4143|
|   mean|    null|
| stddev|    null|
|    min|       C|
|    max|     GKC|
+-------+--------+



### Выбор (select) нескольких признаков (колонок)

In [19]:
fifa_df.select('Player Name','Coach Name').show(n=5)

+----------------+-------------------+
|     Player Name|         Coach Name|
+----------------+-------------------+
|     Alex THEPOT|CAUDRON Raoul (FRA)|
| Oscar BONFIGLIO|   LUQUE Juan (MEX)|
|Marcel LANGILLER|CAUDRON Raoul (FRA)|
|    Juan CARRENO|   LUQUE Juan (MEX)|
| Ernest LIBERATI|CAUDRON Raoul (FRA)|
+----------------+-------------------+
only showing top 5 rows



### Выбор (select) уникальных значений признаков (колонок)

In [20]:
fifa_df.select('Player Name','Coach Name').distinct().show(n=5)

+--------------------+--------------------+
|         Player Name|          Coach Name|
+--------------------+--------------------+
|    Arturo FERNANDEZ| BRU Francisco (ESP)|
|Cayetano CARRERAS...|DURAND LAGUNA Jos...|
|  Ernesto MASCHERONI|SUPPICI Alberto (...|
|          Aziz FAHMY|   McREA James (SCO)|
|        Gyula POLGAR|    NADAS Odon (HUN)|
+--------------------+--------------------+
only showing top 5 rows



### Фильтрация данных

Выберем только те данные для которых MatchID = '1096' и посчитаем их количество

In [39]:
fifa_df.filter(fifa_df['MatchID']=='1096').show(n=5)
print(fifa_df.filter(fifa_df.MatchID=='1096').count())

+-------+-------+-------------+-------------------+-------+------------+----------------+--------+-----+
|RoundID|MatchID|Team Initials|         Coach Name|Line-up|Shirt Number|     Player Name|Position|Event|
+-------+-------+-------------+-------------------+-------+------------+----------------+--------+-----+
|    201|   1096|          FRA|CAUDRON Raoul (FRA)|      S|           0|     Alex THEPOT|      GK| null|
|    201|   1096|          MEX|   LUQUE Juan (MEX)|      S|           0| Oscar BONFIGLIO|      GK| null|
|    201|   1096|          FRA|CAUDRON Raoul (FRA)|      S|           0|Marcel LANGILLER|    null| G40'|
|    201|   1096|          MEX|   LUQUE Juan (MEX)|      S|           0|    Juan CARRENO|    null| G70'|
|    201|   1096|          FRA|CAUDRON Raoul (FRA)|      S|           0| Ernest LIBERATI|    null| null|
+-------+-------+-------------+-------------------+-------+------------+----------------+--------+-----+
only showing top 5 rows

33


Можно задавать более сложные условия.
Выберем игроков - капитанов, забивших гол на 40 минуте.

In [22]:
fifa_df.filter((fifa_df.Position=='C') & (fifa_df.Event=="G40'")).show()

+-------+-------+-------------+--------------------+-------+------------+----------------+--------+-----+
|RoundID|MatchID|Team Initials|          Coach Name|Line-up|Shirt Number|     Player Name|Position|Event|
+-------+-------+-------------+--------------------+-------+------------+----------------+--------+-----+
|    201|   1089|          PAR|DURAND LAGUNA Jos...|      S|           0|Luis VARGAS PENA|       C| G40'|
|    429|   1175|          HUN|  DIETZ Karoly (HUN)|      S|           0|   Gyorgy SAROSI|       C| G40'|
+-------+-------+-------------+--------------------+-------+------------+----------------+--------+-----+



## Сортировка данных OrderBy

Отсортируем данные по убывынию MatchID

In [23]:
fifa_df.orderBy("MatchID", ascending=False).show(n=5)

+-------+---------+-------------+--------------------+-------+------------+-----------+--------+-----+
|RoundID|  MatchID|Team Initials|          Coach Name|Line-up|Shirt Number|Player Name|Position|Event|
+-------+---------+-------------+--------------------+-------+------------+-----------+--------+-----+
| 255931|300186515|          ECU|RUEDA Reinaldo (COL)|      S|          22|  DOMINGUEZ|      GK| null|
| 255931|300186515|          FRA|DESCHAMPS Didier ...|      S|          10|    BENZEMA|    null| null|
| 255931|300186515|          FRA|DESCHAMPS Didier ...|      S|           1|     LLORIS|     GKC| null|
| 255931|300186515|          ECU|RUEDA Reinaldo (COL)|      S|           2|     GUAGUA|    null| null|
| 255931|300186515|          FRA|DESCHAMPS Didier ...|      S|           5|      SAKHO|    null| O61'|
+-------+---------+-------------+--------------------+-------+------------+-----------+--------+-----+
only showing top 5 rows



Можно также сортировать по нескольким признаками задавать им порядом по возрастанию или по убыванию

In [24]:
fifa_df.orderBy(["MatchID", "Shirt Number"], ascending=[0, 1]).show(n=5)

+-------+---------+-------------+--------------------+-------+------------+-----------+--------+-----+
|RoundID|  MatchID|Team Initials|          Coach Name|Line-up|Shirt Number|Player Name|Position|Event|
+-------+---------+-------------+--------------------+-------+------------+-----------+--------+-----+
| 255931|300186515|          ECU|RUEDA Reinaldo (COL)|      N|           1|   BANGUERA|      GK| null|
| 255931|300186515|          FRA|DESCHAMPS Didier ...|      S|           1|     LLORIS|     GKC| null|
| 255931|300186515|          ECU|RUEDA Reinaldo (COL)|      S|           2|     GUAGUA|    null| null|
| 255931|300186515|          FRA|DESCHAMPS Didier ...|      N|           2|    DEBUCHY|    null| null|
| 255931|300186515|          ECU|RUEDA Reinaldo (COL)|      S|           3|      ERAZO|    null| Y83'|
+-------+---------+-------------+--------------------+-------+------------+-----------+--------+-----+
only showing top 5 rows



## Группировка данных

Посчитаем количество игроков из одной страны. И отсортируем данные по убыванию.

In [25]:
fifa_df.groupBy("Team Initials").count().orderBy("count", ascending=False).show(n=5)

+-------------+-----+
|Team Initials|count|
+-------------+-----+
|          BRA| 2403|
|          ITA| 1843|
|          ARG| 1807|
|          ENG| 1378|
|          FRG| 1364|
+-------------+-----+
only showing top 5 rows



## Создание собственных функций (UDF - user defined function)

PySpark UDF работает как `.map()` и `.apply()` в `pandas`. Особенность задания UDF заключается в том, что нам нужно определить тип выходного значения.

Предположим мы хотим сделать простую функцию, которая будет возводить значение в крвадрат.

In [26]:
def sqr(x):
    return x**2

In [27]:
sqr(5)

25

Чтобы использовать эту функцию в pyspark нужно ее зарегистрировать как UDF. Важно, чтобы входной тип совпадал с возвращаемым иначе можем полчить null.

In [28]:
from pyspark.sql.types import *
from pyspark.sql import functions

In [29]:
sqr_udf = functions.udf(lambda x: sqr(x), IntegerType())

Для примера возведем в квадрат номера игроков (это бесмысленно, но подойдет для демонстрации)

In [38]:
fifa_df.select('Shirt Number', sqr_udf('Shirt Number').alias('Squre Shirt Number'))\
.distinct()\
.orderBy('Shirt Number', ascending=False)\
.show(5)

+------------+------------------+
|Shirt Number|Squre Shirt Number|
+------------+------------------+
|          23|               529|
|          22|               484|
|          21|               441|
|          20|               400|
|          19|               361|
+------------+------------------+
only showing top 5 rows



## WithColumn

In [43]:
fifa_df.withColumn('Player Name', functions.lower(fifa_df['Player Name'])).show(n=2)

+-------+-------+-------------+-------------------+-------+------------+---------------+--------+-----+
|RoundID|MatchID|Team Initials|         Coach Name|Line-up|Shirt Number|    Player Name|Position|Event|
+-------+-------+-------------+-------------------+-------+------------+---------------+--------+-----+
|    201|   1096|          FRA|CAUDRON Raoul (FRA)|      S|           0|    alex thepot|      GK| null|
|    201|   1096|          MEX|   LUQUE Juan (MEX)|      S|           0|oscar bonfiglio|      GK| null|
+-------+-------+-------------+-------------------+-------+------------+---------------+--------+-----+
only showing top 2 rows



In [44]:
fifa_df.show(n=2)

+-------+-------+-------------+-------------------+-------+------------+---------------+--------+-----+
|RoundID|MatchID|Team Initials|         Coach Name|Line-up|Shirt Number|    Player Name|Position|Event|
+-------+-------+-------------+-------------------+-------+------------+---------------+--------+-----+
|    201|   1096|          FRA|CAUDRON Raoul (FRA)|      S|           0|    Alex THEPOT|      GK| null|
|    201|   1096|          MEX|   LUQUE Juan (MEX)|      S|           0|Oscar BONFIGLIO|      GK| null|
+-------+-------+-------------+-------------------+-------+------------+---------------+--------+-----+
only showing top 2 rows



## SQL запросы

Чтобы создавать SQL запросы необходимо сначала "зарегистрировать" таблицу и дать ей назание.

In [31]:
fifa_df.registerTempTable("fifa")

In [32]:
spark.sql('select * from fifa').show(n=2)

+-------+-------+-------------+-------------------+-------+------------+---------------+--------+-----+
|RoundID|MatchID|Team Initials|         Coach Name|Line-up|Shirt Number|    Player Name|Position|Event|
+-------+-------+-------------+-------------------+-------+------------+---------------+--------+-----+
|    201|   1096|          FRA|CAUDRON Raoul (FRA)|      S|           0|    Alex THEPOT|      GK| null|
|    201|   1096|          MEX|   LUQUE Juan (MEX)|      S|           0|Oscar BONFIGLIO|      GK| null|
+-------+-------+-------------+-------------------+-------+------------+---------------+--------+-----+
only showing top 2 rows



In [33]:
spark.sql("select Event from fifa where event like 'G%'").show(n=5)

+---------+
|    Event|
+---------+
|     G40'|
|     G70'|
|G43' G87'|
|     G19'|
|     G45'|
+---------+
only showing top 5 rows

