In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


In [2]:
FILENAME = './cik.csv'

In [3]:
spark = SparkSession \
    .builder \
    .appName('pyspark') \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/20 15:56:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
schema = StructType([
    StructField('region', StringType(), False),
    StructField('tik', StringType(), False),
    StructField('uik', StringType(), False),
    StructField('registered_voters', IntegerType(), False),
    StructField('total_ballots', IntegerType(), False),
    StructField('early_voters', IntegerType(), False),
    StructField('tik_voters', IntegerType(), False),
    StructField('home_voters', IntegerType(), False),
    StructField('empty_ballots', IntegerType(), False),
    StructField('home_ballots', IntegerType(), False),
    StructField('tik_ballots', IntegerType(), False),
    StructField('bad_ballots', IntegerType(), False),
    StructField('normal_ballots', IntegerType(), False),
    StructField('missing_ballots', IntegerType(), False),
    StructField('extra_ballots', IntegerType(), False),
    StructField('baburin', IntegerType(), False),
    StructField('grudinin', IntegerType(), False),
    StructField('zhirinovsky', IntegerType(), False),
    StructField('putin', IntegerType(), False),
    StructField('sobchak', IntegerType(), False),
    StructField('suraykin', IntegerType(), False),
    StructField('titov', IntegerType(), False),
    StructField('yavlinsky', IntegerType(), False),
])

In [5]:
df = spark.read.csv(FILENAME, schema=schema)

найти явку (%) по всем регионам, результат отсортировать по убыванию

In [6]:
appearance = df.groupBy(['region']) \
    .agg(F.sum('early_voters').alias('ev_sum'),
        F.sum('tik_voters').alias('tv_sum'),
        F.sum('home_voters').alias('hv_sum'),
        F.sum('registered_voters').alias('total')) \
    .dropna()

col_list = appearance.columns[1:-1]
appearance = appearance.withColumn('appearance', sum([F.col(c) for c in col_list]) / appearance.total) \
    .sort('appearance', ascending=False) \
    .select(['region', 'appearance']) \
    .show()



+--------------------+------------------+
|              region|        appearance|
+--------------------+------------------+
|Территория за пре...|0.9806986984380843|
|     Республика Тыва|0.9366369316170003|
|Ямало-Ненецкий ав...|0.9190125747324195|
|Кабардино-Балкарс...|0.9180157863562131|
|Республика Северн...|0.8998810071492768|
| Республика Дагестан|0.8747795414462081|
|Карачаево-Черкесс...|0.8740544668203033|
| Кемеровская область|0.8322733516003962|
|Чукотский автоном...|0.8228138695924391|
|Республика Ингушетия|0.8195937165135314|
|    Брянская область| 0.797078003370434|
|   Тюменская область|0.7892544508025854|
|  Краснодарский край|0.7786760339047925|
|Республика Татарс...|0.7742157684137118|
|Чувашская Республ...|0.7622296238501433|
|Республика Башкор...|0.7544523455707033|
|  Республика Бурятия|0.7519978782127258|
|Республика Адыгея...|0.7431189118555477|
| Ставропольский край|0.7384811380095883|
|  Пензенская область|0.7374614571776765|
+--------------------+------------

                                                                                

 выбрать  произвольного кандидата и найти тот избирательный участок, на котором он получил наибольший результат (учитывать участки на которых проголосовало больше 300 человек)

In [12]:
cand = 'sobchak'
cols = df.columns[5:8]
temp = df.withColumn('overall_voted', sum([F.col(c) for c in cols]))
candidat_votes = temp.filter(temp.overall_voted > 300) \
    .select(
        df.region,
        df.tik,
        df.uik,
    (F.col(cand)/df.registered_voters).alias('share')
).dropna() \
.sort('share', ascending=False) \
.show(1)

+------------+---------------+---------+-------------------+
|      region|            tik|      uik|              share|
+------------+---------------+---------+-------------------+
|город Москва|район Печатники|УИК №3709|0.18028846153846154|
+------------+---------------+---------+-------------------+
only showing top 1 row



найти регион, где разница между ТИК с наибольшей явкой и наименьшей максимальна (учитывать %)

In [29]:
appearance = df.groupBy(['region', 'tik']) \
    .agg(F.sum('early_voters').alias('ev_sum'),
        F.sum('tik_voters').alias('tv_sum'),
        F.sum('home_voters').alias('hv_sum'),
        F.sum('registered_voters').alias('total')) \
    .dropna()

col_list = appearance.columns[2:-1]
appearance = appearance.withColumn('appearance', sum([F.col(c) for c in col_list]) / appearance.total) \
    .drop('ev_sum', 'tv_sum', 'hv_sum', 'total')

appearance = appearance.groupBy(['region']) \
    .agg(F.max('appearance').alias('max_ap'),
         F.min('appearance').alias('min_ap'))

appearance = appearance.withColumn('diff', appearance.max_ap - appearance.min_ap) \
.sort('diff', ascending=False) \
.show(1)

+--------------------+----------------+-------------------+-------------------+
|              region|          max_ap|             min_ap|               diff|
+--------------------+----------------+-------------------+-------------------+
|Архангельская обл...|0.99795605518651|0.49936440677966104|0.49859164840684894|
+--------------------+----------------+-------------------+-------------------+
only showing top 1 row



посчитать дисперсию по явке для каждого региона (по УИК)

In [36]:
temp = df.groupBy('region') \
    .agg(F.stddev((df.early_voters + df.home_voters + df.tik_voters) / df.registered_voters).alias('std')).show()

+--------------------+-------------------+
|              region|                std|
+--------------------+-------------------+
|Республика Саха (...|0.11374120492897338|
|  Республика Хакасия|0.11719818576021894|
|Республика Ингушетия| 0.0697747654830594|
|Удмуртская Респуб...|0.09658085675037424|
|     Камчатский край|0.16861471602822437|
|Республика Башкор...|0.12284393684931046|
|Карачаево-Черкесс...|0.12319562690839514|
| Республика Дагестан|0.12935039014984046|
|Республика Татарс...|0.14150150293888705|
| Ставропольский край|0.13401966931050247|
|Республика Северн...|0.07930140237807017|
|    Хабаровский край|0.13813899925138123|
| Республика Марий Эл| 0.0818500080733599|
|Республика Адыгея...|0.16330124345063626|
|  Республика Бурятия|0.13306357237654534|
| Республика Калмыкия|0.15349925960543603|
|       Пермский край|0.10105338052282166|
|  Краснодарский край|0.13940623710253583|
|     Приморский край|0.16641672216989184|
|Кабардино-Балкарс...|0.03760708051294459|
+----------

для каждого кандидата посчитать таблицу: результат (%, округленный до целого) - количество УИК, на которых кандидат получил данный результат

In [74]:
def show_share_counts(cand, result):
    temp = df.withColumn(f'{cand}_share', F.ceil(100 * F.col(cand) / df.registered_voters))
    temp = temp.filter(F.col(f'{cand}_share') >=  result). \
        groupBy(F.col(f'{cand}_share')) \
        .agg(F.count(F.col(f'{cand}_share')).alias('count')) \
        .sort(F.col('count'), ascending=False) \
        .show()

result = 30
cands_list = df.columns[-8:]
for c in cands_list:
    print(c)
    show_share_counts(c, result)

baburin
+-------------+-----+
|baburin_share|count|
+-------------+-----+
|           45|    1|
+-------------+-----+

grudinin
+--------------+-----+
|grudinin_share|count|
+--------------+-----+
|            30|   36|
|            31|   34|
|            32|   34|
|            34|   32|
|            33|   21|
|            35|   20|
|            37|   20|
|            36|   17|
|            39|   11|
|            38|    9|
|            40|    9|
|            50|    8|
|            43|    7|
|            42|    6|
|            46|    5|
|            41|    4|
|            45|    4|
|            44|    3|
|            47|    3|
|            52|    2|
+--------------+-----+
only showing top 20 rows

zhirinovsky
+-----------------+-----+
|zhirinovsky_share|count|
+-----------------+-----+
|               30|   16|
|               34|   14|
|               40|    8|
|               32|    6|
|               42|    6|
|               36|    6|
|               37|    5|
|               38|   