# Задача 3

### Часть 2
Нужно, используя `Spark`:
- найти явку (%) по всем регионам, результат отсортировать по убыванию
- выбрать любимого кандидата и найти тот избирательный участок, на котором он получил наибольший результат (учитывать участки на которых проголосовало больше 300 человек)
- найти регион, где разница между ТИК с наибольшей явкой и наименьшей максимальна
- посчитать дисперсию по явке для каждого региона (учитывать УИК)
- для каждого кандидата посчитать таблицу: результат (%, округленный до целого) - количество УИК, на которых кандидат получил данный результат


Результаты принимаются в виде `Jupyter Notebook`, `Spark Notebook` или исходных файлов на `Scala`.

In [35]:
##### import pyspark
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType

In [47]:
os.environ["PYSPARK_PYTHON"] = "python3"

VOTING_FILE = "vibory.csv"

### Попытки RDD

In [262]:
# sc = SparkContext(master='local[*]')

# rdd = sc.textFile(VOTING_FILE) \
    #.map(lambda line: line.split(',')) 

# rdd.takeSample(num=10, withReplacement=False) 

# результат - Py4JavaError

## Spark DataFrames

In [None]:
spark = SparkSession \
    .builder \
    .appName("PySpark") \
    .getOrCreate()

In [None]:
# изначальные названия столбцов

schema = ['','Регион','ТИК','УИК','Число включенных в список',\
          'Число бюллетеней, полученных участковой избирательной комиссией',\
          'Число бюллетеней, выданных избирателям, проголосовавшим досрочно', \
          'Число бюллетеней, выданных в помещении для голосования в день голосования', \
          'Число бюллетеней, выданных вне помещения для голосования в день голосования', \
          'Число погашенных бюллетеней', \
          'Число бюллетеней в переносных ящиках для голосования',\
          'Число бюллетеней в стационарных ящиках для голосования', \
          'Число недействительных бюллетеней', \
          'Число действительных бюллетеней', \
          'Число утраченных бюллетеней', \
          'Число бюллетеней, не учтенных при получении ', \
          'Бабурин С. Н.', 'Грудинин П. Н.', 'Жириновский В. В.','Путин В. В.', \
          'Собчак К. А.', 'Сурайкин М. А.', 'Титов Б. Ю.','Явлинский Г. А.']

In [404]:
# сократим значения столбцов и сделаем символические обозначения на английском для удобства

df = spark.read.csv(VOTING_FILE, inferSchema=True, header=True).toDF('Num','Region','TIK','UIK', \
          'all people', 'recieved papers', \
          'papers issued early', 'papers issued inside', 'papers issued out', 'cancelled papers', \
          'papers in portable boxes', 'papers in stacionary boxes', \
          'bad papers', 'good papers', 'lost papers', 'not counted papers', \
          'Baburin', 'Grudinin', 'Zhirinovskyi', 'Putin', \
          'Sobchak', 'Suraikin', 'Titov', 'Yavlisky')

## Найти явку (%) по всем регионам, результат отсортировать по убыванию

Предполагается, что явка = сумме всех выданных бюллетеней / число всех избирателей в списке  (по всем УИК и всем ТИК)

In [416]:
df_presence = df.groupBy('Region') \
  .agg(F.sum('all people').alias('sum all people'), \
      F.sum('papers issued early').alias('sum papers issued early'), \
      F.sum('papers issued inside').alias('sum papers issued inside'), \
      F.sum('papers issued out').alias('sum papers issued out')) \
  .sort('sum all people', ascending=False) 

df_presence.show(10)

+--------------------+--------------+-----------------------+------------------------+---------------------+
|              Region|sum all people|sum papers issued early|sum papers issued inside|sum papers issued out|
+--------------------+--------------+-----------------------+------------------------+---------------------+
|        город Москва|       7543682|                      0|                 4348576|               172779|
|  Московская область|       5829578|                      0|                 3540550|               167006|
|  Краснодарский край|       4049575|                    229|                 2825178|               327900|
| город Санкт-Пете...|       3632398|                   1795|                 2248088|                70251|
| Свердловская обл...|       3345221|                   3210|                 2003456|                79001|
|  Ростовская область|       3210796|                      0|                 1920477|               159301|
| Республика Башко.

In [379]:
from functools import reduce
from operator import add
from pyspark.sql.functions import col

# подсчет суммых всех выданных бюллетеней

df_presence = df_presence \
  .withColumn('sum papers', reduce(add, [col(x) for x in ['sum papers issued early','sum papers issued inside', 'sum papers issued out']])) \
  .select('Region', 'sum all people', 'sum papers') \
  .sort('sum all people', ascending=False) 

df_presence.show(10)

+--------------------+--------------+----------+
|              Region|sum all people|sum papers|
+--------------------+--------------+----------+
|        город Москва|       7543682|   4521355|
|  Московская область|       5829578|   3707556|
|  Краснодарский край|       4049575|   3153307|
| город Санкт-Пете...|       3632398|   2320134|
| Свердловская обл...|       3345221|   2085667|
|  Ростовская область|       3210796|   2079778|
| Республика Башко...|       3045698|   2297834|
| Республика Татар...|       2919482|   2260309|
| Челябинская область|       2632596|   1748424|
| Нижегородская об...|       2618865|   1728008|
+--------------------+--------------+----------+
only showing top 10 rows



In [380]:
# расчет явки

df_presence_perc = df_presence.withColumn('% presence', col('sum papers') / col('sum all people')) \
  .select('Region', '% presence') \
  .sort('% presence', ascending=False)
    
df_presence_perc.show(10)

+--------------------+------------------+
|              Region|        % presence|
+--------------------+------------------+
|Территория за пре...|0.9806986984380843|
|     Республика Тыва|0.9366369316170003|
| Ямало-Ненецкий а...|0.9190125747324195|
| Кабардино-Балкар...|0.9180157863562131|
| Чеченская Респуб...|0.9154001705101918|
| Республика Север...|0.8998810071492768|
| Республика Дагестан|0.8747795414462081|
| Карачаево-Черкес...|0.8740544668203033|
| Кемеровская область|0.8322733516003962|
| Чукотский автоно...|0.8228138695924391|
+--------------------+------------------+
only showing top 10 rows



In [417]:
# итоговая функция расчета и сохранения результата явки

df.groupBy('Region') \
  .agg(F.sum('all people').alias('sum all people'), \
      F.sum('papers issued early').alias('sum papers issued early'), \
      F.sum('papers issued inside').alias('sum papers issued inside'), \
      F.sum('papers issued out').alias('sum papers issued out')) \
  .withColumn('sum papers', reduce(add, [col(x) for x in ['sum papers issued early','sum papers issued inside', 'sum papers issued out']])) \
  .withColumn('% presence', col('sum papers') / col('sum all people')) \
  .select('Region', '% presence') \
  .sort('% presence', ascending=False) \
  .toPandas().to_csv("President elections % presence.csv")

## Выбрать любимого кандидата и найти тот избиратльный участок, на котором он получил наибольший результат (учитывать участки на которых проголосовало больше 300 человек)

Фильтруем результаты на участках по условию больше 300 голосов, из оставшихся выбираем максимум и выводим местоположение

In [381]:
favorites = ['Baburin', 'Grudinin', 'Zhirinovskyi', 'Putin', \
          'Sobchak', 'Suraikin', 'Titov', 'Yavlisky']

# для проверки по всем
favorite = favorites[1]

In [419]:
df_favorite = df.filter(col(favorite) >= 300) \
      .select('Region','TIK','UIK', favorite) \
      .groupBy('Region','TIK','UIK') \
      .agg(F.max(favorite).alias('Max')) \
      .sort('Max', ascending=False)

df_favorite.show(1)

+-------------------+---------+----+----+
|             Region|      TIK| UIK| Max|
+-------------------+---------+----+----+
| Московская область|Ленинская|1306|1141|
+-------------------+---------+----+----+
only showing top 1 row



## Найти регион, где разница между ТИК с наибольшей явкой и наименьшей максимальна

Рассчитать явку для ТИК, затем рассчитать разницу, выбрать максимум

In [388]:
# Явка по ТИК

df_TIK = df.groupBy('Region','TIK') \
  .agg(F.sum('all people').alias('sum all TIK people'), \
      F.sum('papers issued early').alias('sum TIK papers issued early'), \
      F.sum('papers issued inside').alias('sum TIK papers issued inside'), \
      F.sum('papers issued out').alias('sum TIK papers issued out')) \
  .withColumn('sum papers', reduce(add, [col(x) for x in ['sum TIK papers issued early','sum TIK papers issued inside', 'sum TIK papers issued out']])) \
  .withColumn('% presence', col('sum papers') / col('sum all TIK people')) \
  .select('Region', 'TIK', '% presence') \
  .sort('Region')

df_TIK.show()

+---------------+--------------------+------------------+
|         Region|                 TIK|        % presence|
+---------------+--------------------+------------------+
| Алтайский край|          Калманская|0.6171454219030521|
| Алтайский край|           Заринская|0.7782430083741507|
| Алтайский край| Сибирская городская|0.7997578692493946|
| Алтайский край|  Барнаул, Ленинская|0.6390487043367891|
| Алтайский край|            Троицкая| 0.679664570230608|
| Алтайский край|        Тюменцевская|0.7187410586552218|
| Алтайский край|Барнаул, Индустри...|0.6548344857415406|
| Алтайский край|           Чарышская|0.6825721671693236|
| Алтайский край|            Целинная|0.7428841756682173|
| Алтайский край|          Тогульская|0.7332916926920675|
| Алтайский край|     Панкрушихинская|0.6834268230494646|
| Алтайский край|           Хабарская|0.6735224586288416|
| Алтайский край|     Шелаболихинская|0.7076369996113486|
| Алтайский край|          Рубцовская|0.7446634783623003|
| Алтайский кр

In [390]:
# расчет разницы, сортировка по убыванию

df_TIK.groupBy('Region') \
  .agg((F.max('% presence') - F.min('% presence')).alias('Diff')) \
  .sort('Diff', ascending=False) \
  .show(10)

# конкретно максимум
# df_TIK.show(1)

+--------------------+-------------------+
|              Region|               Diff|
+--------------------+-------------------+
| Архангельская об...|0.49859164840684894|
| Сахалинская область| 0.4842508035844767|
| Республика Дагестан|0.46389878363882775|
|   Самарская область|0.41698130506775954|
|   Красноярский край| 0.4159372590331167|
| Саратовская область| 0.4003895056872615|
| Республика Татар...|  0.394714905703235|
|  Краснодарский край| 0.3725799222313191|
| Республика Адыге...|0.36020071603446446|
| Республика Калмыкия|0.35800470471564194|
+--------------------+-------------------+
only showing top 10 rows



## Посчитать дисперсию по явке для каждого региона (учитывать УИК)

In [420]:
# создаем столбец с количеством посетивших УИК людей, затем группировка по региону рассчитывает дисперсию

df_presence_var = df.withColumn('sum papers',\
    reduce(add, [col(x) for x in ['papers issued early','papers issued inside', 'papers issued out']])) \
  .select('Region','TIK', 'UIK','sum papers') \
  .groupBy('Region') \
  .agg(F.stddev('sum papers').alias('Std')) \
  .sort('Std', ascending=False) 

df_presence_var.toPandas().to_csv("Presence Std.csv")

df_presence_var.show(10)

+--------------------+-----------------+
|              Region|              Std|
+--------------------+-----------------+
|Территория за пре...|1332.153361037134|
| Чеченская Респуб...|779.5717067527235|
| Ямало-Ненецкий а...|739.3290449083053|
| Республика Север...|677.9089014204244|
| Республика Дагестан|657.2845481650252|
|     Республика Тыва|641.6682099770799|
| Кемеровская область|621.0871610694816|
| Магаданская область|614.8118916285797|
| Кабардино-Балкар...|614.0460846716691|
|    Брянская область|592.4564291304055|
+--------------------+-----------------+
only showing top 10 rows



## Для каждого кандидата посчитать таблицу: результат (%, округленный до целого) - количество УИК, на которых кандидат получил данный результат

Процент, полученный кандидатом = число голосов за него / число выданных на участке бюллетеней (всех пришедших голосовать людей)

In [422]:
candidates = ['Baburin', 'Grudinin', 'Zhirinovskyi', 'Putin', \
          'Sobchak', 'Suraikin', 'Titov', 'Yavlisky']

papers = ['papers issued early', 'papers issued inside', 'papers issued out']

In [392]:
# группируем результаты по регионам

df_groupBy_Region = df_goupBy_TIK \
      .groupBy('Region') \
      .agg(F.sum('Baburin').alias('Baburin'), \
          F.sum('Grudinin').alias('Grudinin'), \
          F.sum('Zhirinovskyi').alias('Zhirinovskyi'), \
          F.sum('Putin').alias('Putin'), \
          F.sum('Sobchak').alias('Sobchak'), \
          F.sum('Suraikin').alias('Suraikin'), \
          F.sum('Titov').alias('Titov'), \
          F.sum('Yavlisky').alias('Yavlisky')) \
      .sort('Region', ascending=True) 

df_groupBy_Region.show(10)

+--------------------+-------+--------+------------+------+-------+--------+-----+--------+
|              Region|Baburin|Grudinin|Zhirinovskyi| Putin|Sobchak|Suraikin|Titov|Yavlisky|
+--------------------+-------+--------+------------+------+-------+--------+-----+--------+
|      Алтайский край|   7581|  281978|       84785|770278|  11788|    7855| 5532|    7259|
|    Амурская область|   2358|   73485|       37909|264493|   4428|    2466| 2080|    1951|
| Архангельская об...|   4448|   51868|       46925|407190|  10588|    3842| 4982|    6239|
| Астраханская обл...|   2185|   64047|       19339|342195|   5060|    2823| 2233|    2504|
| Белгородская обл...|   5218|   93102|       49685|711392|   8474|    6534| 4835|    4445|
|    Брянская область|   4472|   68375|       43940|636087|   7463|    4265| 4175|    3524|
| Владимирская обл...|   5440|   93649|       58822|546042|  10777|    5075| 6098|    6147|
| Волгоградская об...|   8040|  140708|       69909|929541|  14403|    8116| 685

In [423]:
from pyspark.sql.functions import lit
# итоговое число голосов за каждого кандидата
# добавляем новый столбец с единым по всей таблице значением
# чтобы по нему можно было сделать группировку и получить общую сумму голосов по всем кандидатам

df_groupBy_Russia = df_groupBy_Region.withColumn("Fin", lit('Russia')) \
      .groupBy('Fin') \
      .agg(F.sum('Baburin').alias('Baburin'), \
          F.sum('Grudinin').alias('Grudinin'), \
          F.sum('Zhirinovskyi').alias('Zhirinovskyi'), \
          F.sum('Putin').alias('Putin'), \
          F.sum('Sobchak').alias('Sobchak'), \
          F.sum('Suraikin').alias('Suraikin'), \
          F.sum('Titov').alias('Titov'), \
          F.sum('Yavlisky').alias('Yavlisky')) 

df_groupBy_Russia.show(10)

+------+-------+--------+------------+--------+-------+--------+------+--------+
|   Fin|Baburin|Grudinin|Zhirinovskyi|   Putin|Sobchak|Suraikin| Titov|Yavlisky|
+------+-------+--------+------------+--------+-------+--------+------+--------+
|Russia| 475599| 8608210|     4124987|56048674|1225330|  496001|552007|  762304|
+------+-------+--------+------------+--------+-------+--------+------+--------+



In [395]:
# расчет количества людей, посетивших выборы

df_papers = df_presence.withColumn('Fin', lit('Russia')) \
    .groupBy('Fin') \
    .agg(F.sum('sum papers').alias('papers'))

df_papers.show()

# Число избирателей, пришедших на выборы
sum_papers = df_papers.collect()[0]['papers']

+------+--------+
|   Fin|  papers|
+------+--------+
|Russia|73128855|
+------+--------+



In [396]:
# расчет процента голосов за каждого кандидата с округлением

num_voting = [df_groupBy_Russia.collect()[0][c] for c in candidates]
num_voting = [round(num / sum_papers * 100) for num in num_voting]
num_voting

[1, 12, 6, 77, 2, 1, 1, 1]

In [397]:
# функции округления и численные операции со столбцами в SparkDataFrame осуществляются сложно
# я не нашла хорошие варианты для работы, поэтому перешла на SparkSQL
# при переходе на SQL необходимо, чтобы все названия столбцов были единой строкой без пробелов

df = spark.read.csv(VOTING_FILE, inferSchema=True, header=True).toDF('Num','Region','TIK','UIK', \
          'all_people', 'recieved_papers', \
          'papers_issued_early', 'papers_issued_inside', 'papers_issued_out', 'cancelled_papers', \
          'papers_in_portable_boxes', 'papers_in_stacionary_boxes', \
          'bad_papers', 'good_papers', 'lost_papers', 'not_counted_papers', \
          'Baburin', 'Grudinin', 'Zhirinovskyi', 'Putin', \
          'Sobchak', 'Suraikin', 'Titov', 'Yavlisky')

# добавляем снова столбец с единым значением по всей таблице для расчета общей суммы
df = df.withColumn('Fin', lit('Russia'))
# создаем View 
df.createOrReplaceTempView('voting')

In [399]:
# общая таблица с добавленным столбцов числа посетивших выборы людей
df_UIK_vot_every = spark.sql("""SELECT Fin, Region, TIK, UIK,
Baburin, Grudinin, Zhirinovskyi, Putin, Sobchak, Suraikin, Titov, Yavlisky,
(papers_issued_early + papers_issued_inside + papers_issued_out) AS sum_papers 
FROM voting""")

df_UIK_vot_every.show(10)

# создание View
df_UIK_vot_every.createOrReplaceTempView('uik_voting')

+------+--------------------+----------+---+-------+--------+------------+-----+-------+--------+-----+--------+----------+
|   Fin|              Region|       TIK|UIK|Baburin|Grudinin|Zhirinovskyi|Putin|Sobchak|Suraikin|Titov|Yavlisky|sum_papers|
+------+--------------------+----------+---+-------+--------+------------+-----+-------+--------+-----+--------+----------+
|Russia| Республика Адыге...|Адыгейская|  1|      0|     137|          32| 1977|     14|       0|    1|       5|      2169|
|Russia| Республика Адыге...|Адыгейская|  2|     15|      86|          65| 2389|     13|       5|    6|      15|      2616|
|Russia| Республика Адыге...|Адыгейская|  3|      1|      62|          13| 2645|      6|       3|    4|       0|      2739|
|Russia| Республика Адыге...|Адыгейская|  4|      5|     288|          12| 1642|     21|       6|    2|       2|      1999|
|Russia| Республика Адыге...|Адыгейская|  5|      2|      44|           6|  624|      6|       0|    0|       2|       687|
|Russia|

In [401]:
# Расчет процента проголосовавших за кандидата по каждому УИК

df_UIK_percenrage = spark.sql("""SELECT Fin, Region, TIK, UIK,
round(Baburin / sum_papers * 100) as Baburin, 
round(Grudinin / sum_papers * 100) as Grudinin,
round(Zhirinovskyi / sum_papers * 100) as Zhirinovskyi, 
round(Putin / sum_papers * 100) as Putin, 
round(Sobchak / sum_papers * 100) as Sobchak, 
round(Suraikin / sum_papers * 100) as Suraikin, 
round(Titov / sum_papers * 100) as Titov, 
round(Yavlisky / sum_papers * 100) as Yavlisky
FROM uik_voting""")

df_UIK_percenrage.show(10)

# создание View
df_UIK_percenrage.createOrReplaceTempView('uik_percentage')

+------+--------------------+----------+---+-------+--------+------------+-----+-------+--------+-----+--------+
|   Fin|              Region|       TIK|UIK|Baburin|Grudinin|Zhirinovskyi|Putin|Sobchak|Suraikin|Titov|Yavlisky|
+------+--------------------+----------+---+-------+--------+------------+-----+-------+--------+-----+--------+
|Russia| Республика Адыге...|Адыгейская|  1|    0.0|     6.0|         1.0| 91.0|    1.0|     0.0|  0.0|     0.0|
|Russia| Республика Адыге...|Адыгейская|  2|    1.0|     3.0|         2.0| 91.0|    0.0|     0.0|  0.0|     1.0|
|Russia| Республика Адыге...|Адыгейская|  3|    0.0|     2.0|         0.0| 97.0|    0.0|     0.0|  0.0|     0.0|
|Russia| Республика Адыге...|Адыгейская|  4|    0.0|    14.0|         1.0| 82.0|    1.0|     0.0|  0.0|     0.0|
|Russia| Республика Адыге...|Адыгейская|  5|    0.0|     6.0|         1.0| 91.0|    1.0|     0.0|  0.0|     0.0|
|Russia| Республика Адыге...|Адыгейская|  6|    0.0|     4.0|         2.0| 93.0|    0.0|     0.0

In [352]:
num_voting

[1, 12, 6, 77, 2, 1, 1, 1]

In [402]:
# Итоговая таблица с проверкой по рассчитанным ранее процентам проголосовавших

spark.sql("""SELECT Fin, sum(case Baburin when 1 then 1 else 0 end) as Baburin,
sum(case Grudinin when 12 then 1 else 0 end) as Grudinin,
sum(case Zhirinovskyi when 6 then 1 else 0 end) as Zhirinovskyi,
sum(case Putin when 77 then 1 else 0 end) as Putin,
sum(case Sobchak when 2 then 1 else 0 end) as Sobchak,
sum(case Suraikin when 1 then 1 else 0 end) as Suraikin,
sum(case Titov when 1 then 1 else 0 end) as Titov,
sum(case Yavlisky when 1 then 1 else 0 end) as Yavlisky
FROM uik_percentage 
GROUP BY(Fin) """).show()

+------+-------+--------+------------+-----+-------+--------+-----+--------+
|   Fin|Baburin|Grudinin|Zhirinovskyi|Putin|Sobchak|Suraikin|Titov|Yavlisky|
+------+-------+--------+------------+-----+-------+--------+-----+--------+
|Russia|  47609|    7816|       13605| 4652|  18358|   48504|39719|   30605|
+------+-------+--------+------------+-----+-------+--------+-----+--------+

