In [1]:
import pyspark
import os

from pyspark.sql import functions as F
from pyspark import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession \
    .builder \
    .appName("PySpark") \
    .getOrCreate()

In [3]:
df = spark.read.csv("data.csv", inferSchema=True, sep=';', header=True)

In [4]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- UIK: string (nullable = true)
 |-- region: string (nullable = true)
 |-- TIK: string (nullable = true)
 |-- 1: integer (nullable = true)
 |-- 2: integer (nullable = true)
 |-- 3: integer (nullable = true)
 |-- 4: integer (nullable = true)
 |-- 5: integer (nullable = true)
 |-- 6: integer (nullable = true)
 |-- 7: integer (nullable = true)
 |-- 8: integer (nullable = true)
 |-- 9: integer (nullable = true)
 |-- 10: integer (nullable = true)
 |-- 11: integer (nullable = true)
 |-- 12: integer (nullable = true)
 |-- 13: integer (nullable = true)
 |-- 14: integer (nullable = true)
 |-- 15: integer (nullable = true)
 |-- 16: integer (nullable = true)
 |-- 17: integer (nullable = true)
 |-- 18: integer (nullable = true)
 |-- 19: integer (nullable = true)
 |-- 20: integer (nullable = true)



In [5]:
df.show(5)

+---+------+--------------------+----------+----+----+---+----+---+---+---+----+---+----+---+---+---+---+---+----+---+---+---+---+
|_c0|   UIK|              region|       TIK|   1|   2|  3|   4|  5|  6|  7|   8|  9|  10| 11| 12| 13| 14| 15|  16| 17| 18| 19| 20|
+---+------+--------------------+----------+----+----+---+----+---+---+---+----+---+----+---+---+---+---+---+----+---+---+---+---+
|  0|УИК №1|Республика Адыгея...|Адыгейская|2256|2181|  0|2107| 62| 12| 62|2107|  3|2166|  0|  0|  0|137| 32|1977| 14|  0|  1|  5|
|  1|УИК №2|Республика Адыгея...|Адыгейская|2700|2633|  0|2575| 41| 17| 41|2575| 22|2594|  0|  0| 15| 86| 65|2389| 13|  5|  6| 15|
|  2|УИК №3|Республика Адыгея...|Адыгейская|2858|2752|  0|2664| 75| 13| 75|2664|  5|2734|  0|  0|  1| 62| 13|2645|  6|  3|  4|  0|
|  3|УИК №4|Республика Адыгея...|Адыгейская|2066|2034|  0|1857|142| 35|142|1857| 21|1978|  0|  0|  5|288| 12|1642| 21|  6|  2|  2|
|  4|УИК №5|Республика Адыгея...|Адыгейская| 700| 714|  0| 676| 11| 27| 11| 676|  3

## I. найти явку (%) по всем регионам, результат отсортировать по убыванию 

In [6]:
df = df.withColumn("prescence", df['9'] + df['10'])

In [7]:
newdf = df \
    .groupBy('region') \
    .agg(F.sum('1'), F.sum('prescence'))

In [8]:
newdf.withColumn('% prescence', 100 * newdf['sum(prescence)'] / newdf['sum(1)']) \
    .select('region', '% prescence') \
    .orderBy('% prescence', ascending=False) \
    .show(newdf.count(),truncate=False)

+----------------------------------------+------------------+
|region                                  |% prescence       |
+----------------------------------------+------------------+
|Территория за пределами РФ              |98.01821236184206 |
|Республика Тыва                         |93.62542974951742 |
|Ямало-Ненецкий автономный округ         |91.87213306617983 |
|Кабардино-Балкарская Республика         |91.6978754085207  |
|Чеченская Республика                    |91.50929703298175 |
|Республика Северная Осетия - Алания     |89.93704831730909 |
|Республика Дагестан                     |87.44351419820495 |
|Карачаево-Черкесская Республика         |87.343809390099   |
|Кемеровская область                     |83.07556958224816 |
|Чукотский автономный округ              |82.26051697921946 |
|Республика Ингушетия                    |81.95937165135314 |
|Брянская область                        |79.65976807571519 |
|Тюменская область                       |78.8829673763749  |
|Республ

## II. Выбрать любимого кандидата и найти тот избирательный участок, на котором он получил наибольший результат (учитывать участки на которых проголосовало больше 300 человек).
Собчак

In [9]:
df.filter(df['prescence'] > 300) \
    .withColumn('Sobchak %', df['17'] / df['prescence']) \
    .select('UIK', 'region', 'TIK', 'Sobchak %') \
    .orderBy('Sobchak %', ascending=False) \
    .show(1, truncate=False)
  

+---------+--------------------------+----+-------------------+
|UIK      |region                    |TIK |Sobchak %          |
+---------+--------------------------+----+-------------------+
|УИК №8239|Территория за пределами РФ|null|0.27004608294930876|
+---------+--------------------------+----+-------------------+
only showing top 1 row



## III. Найти регион, где разница между ТИК с наибольшей явкой и наименьшей максимальна

In [10]:
newdf = df.groupBy("TIK").sum('1','prescence')
newdf.show(3)

+--------------------+------+--------------+
|                 TIK|sum(1)|sum(prescence)|
+--------------------+------+--------------+
|          Гиагинская| 23049|         20707|
|Горно-Алтайская г...| 44831|         27836|
|        Мечетлинская| 17246|         14514|
+--------------------+------+--------------+
only showing top 3 rows



In [11]:
newdf = newdf \
    .withColumn('% prescence', 100 * newdf['sum(prescence)'] / newdf['sum(1)']) \
    .select('TIK', '% prescence')
newdf.show(3)

+--------------------+-----------------+
|                 TIK|      % prescence|
+--------------------+-----------------+
|          Гиагинская|89.83903857000304|
|Горно-Алтайская г...|62.09096384198434|
|        Мечетлинская|84.15864548301056|
+--------------------+-----------------+
only showing top 3 rows



In [12]:
newdf = df \
    .select('region', 'TIK') \
    .distinct() \
    .join(newdf, 'TIK')
newdf.show(3)

+-----------+-------------------+-----------------+
|        TIK|             region|      % prescence|
+-----------+-------------------+-----------------+
|    Чойская|   Республика Алтай|66.98490914690484|
| Акушинская|Республика Дагестан|89.71489377316259|
|Яшалтинская|Республика Калмыкия|65.23534269199008|
+-----------+-------------------+-----------------+
only showing top 3 rows



In [13]:
newdf = newdf \
    .groupBy('region') \
    .agg(F.max("% prescence") \
    .alias("max"), F.min("% prescence").alias("min"))
newdf.show(3)

+--------------------+-----------------+-----------------+
|              region|              max|              min|
+--------------------+-----------------+-----------------+
|Республика Саха (...|95.19961977186311|63.01959939694163|
|Калининградская о...| 87.9983712802416|52.39083750894775|
|Новосибирская обл...| 74.9002849002849|56.90181617672399|
+--------------------+-----------------+-----------------+
only showing top 3 rows



In [14]:
newdf.withColumn('delta', newdf['max'] - newdf['min']) \
    .select('region', 'delta') \
    .orderBy('delta', ascending=False) \
    .show(1, False)

+---------------------+----------------+
|region               |delta           |
+---------------------+----------------+
|Архангельская область|49.8591648406849|
+---------------------+----------------+
only showing top 1 row



## IV. Посчитать дисперсию по явке для каждого региона (учитывать УИК)

In [15]:
df.withColumn('% prescence', 100 * df['prescence'] / df['1']) \
    .groupBy('region') \
    .agg(F.stddev('% prescence')) \
    .show(df.count(), False)

+----------------------------------------+------------------------+
|region                                  |stddev_samp(% prescence)|
+----------------------------------------+------------------------+
|Республика Саха (Якутия)                |11.37482852142748       |
|Калининградская область                 |12.078053322613252      |
|Новосибирская область                   |9.988699795127882       |
|город Севастополь                       |6.772049161617466       |
|Свердловская область                    |8.446734659471256       |
|Республика Хакасия                      |11.725682708694196      |
|Магаданская область                     |14.387651017723126      |
|Республика Ингушетия                    |6.977476548305936       |
|Удмуртская Республика                   |9.659438665662844       |
|Камчатский край                         |16.87347577751214       |
|Саратовская область                     |15.426531267153354      |
|Республика Башкортостан                 |12.291

## V. Для каждого кандидата посчитать таблицу: результат (%, округленный до целого) - количество УИК, на которых кандидат получил данный результат

In [16]:
candidates =  df.columns[16:-1]

In [17]:
for cand in candidates:
    print("candidate", cand)
    df \
    .withColumn(cand + "cand %", F.round(100 * df[cand] / df['prescence'])) \
    .na.fill(0.0) \
    .groupBy(cand + "cand %") \
    .count() \
    .orderBy(cand + "cand %") \
    .show(100)

candidate 13
+--------+-----+
|13cand %|count|
+--------+-----+
|     0.0|45115|
|     1.0|47940|
|     2.0| 3704|
|     3.0|  556|
|     4.0|  179|
|     5.0|   89|
|     6.0|   35|
|     7.0|   20|
|     8.0|   22|
|     9.0|   10|
|    10.0|    7|
|    11.0|    5|
|    12.0|    1|
|    13.0|    1|
|    14.0|    3|
|    15.0|    1|
|    17.0|    1|
|    18.0|    2|
|    20.0|    3|
|    21.0|    1|
|    23.0|    1|
|    44.0|    1|
+--------+-----+

candidate 14
+--------+-----+
|14cand %|count|
+--------+-----+
|     0.0| 1181|
|     1.0| 2164|
|     2.0| 3073|
|     3.0| 3425|
|     4.0| 3655|
|     5.0| 3863|
|     6.0| 4001|
|     7.0| 4543|
|     8.0| 5278|
|     9.0| 6087|
|    10.0| 7060|
|    11.0| 7735|
|    12.0| 7891|
|    13.0| 7440|
|    14.0| 6116|
|    15.0| 4861|
|    16.0| 3718|
|    17.0| 2925|
|    18.0| 2352|
|    19.0| 1887|
|    20.0| 1519|
|    21.0| 1283|
|    22.0| 1057|
|    23.0|  885|
|    24.0|  765|
|    25.0|  573|
|    26.0|  440|
|    27.0|  358|
|   