### Análisis de Elecciones generales España
No hace ni un mes que fueron las elecciones generales y en nada nos tocan las municipales… Y encima vamos a trabajar con datos del INE y del Ministerio de Interior para analizar los resultados de las elecciones generales de 2016.

1. Carga de ficheros

In [1]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("ejercicios").setMaster("local")
sc = SparkContext(conf=conf)

In [2]:
elec = sc.textFile("hdfs://localhost:9000/user/bigdata/spark/PECElecciones.csv")
elec

hdfs://localhost:9000/user/bigdata/spark/PECElecciones.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [3]:
mun = sc.textFile("hdfs://localhost:9000/user/bigdata/spark/PECMunicipios.csv")
mun

hdfs://localhost:9000/user/bigdata/spark/PECMunicipios.csv MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

2. Cargar el fichero de municipios con Spark Core quitando la cabecera.

In [4]:
def f(idx, iter):
    output = []
    for sublist in iter:
        output.append(sublist)
    if idx>0:
        return(output)
    else:
        return(output[1:])
def p(x) : print(x)

In [5]:
mun_df = mun.map(lambda x: x.replace("\"","").split(";")).mapPartitionsWithIndex(f)
mun_df

PythonRDD[4] at RDD at PythonRDD.scala:53

3. Cargar el fichero de elecciones con Spark Core quitando la cabecera.

In [6]:
elec_df = elec.map(lambda x: x.replace("\"","").split(";")).mapPartitionsWithIndex(f)
elec_df

PythonRDD[5] at RDD at PythonRDD.scala:53

4. Generar un dataframe a partir del RDD de municipios. Ajustar los tipos, borrar espacios, etc.

In [7]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType, FloatType

sparkSQL = SparkSession \
    .builder \
    .appName("SparkSQL") \
    .master("local") \
    .getOrCreate()

In [8]:
schema_mun = StructType([
    StructField('codigo', StringType(), True),
    StructField('comunidad', StringType(), True),
    StructField('provincia', StringType(), True),
    StructField('municipio', StringType(), True),
    StructField('poblacion', StringType(), True),
])

In [9]:
mun2 = sparkSQL.createDataFrame(mun_df, schema_mun)

In [10]:
from pyspark.sql.functions import rtrim
mun2 = mun2.withColumn("comunidad", rtrim(mun2.comunidad))
mun2 = mun2.withColumn("provincia", rtrim(mun2.provincia))
mun2 = mun2.withColumn("municipio", rtrim(mun2.municipio))
#mun2 = mun2.withColumn("poblacion", mun2["poblacion"].cast(IntegerType()))
#mun2 = mun2.withColumn("codigo", mun2["codigo"].cast(IntegerType()))

In [11]:
mun2.show()

+------+---------+---------+--------------------+---------+
|codigo|comunidad|provincia|           municipio|poblacion|
+------+---------+---------+--------------------+---------+
|  4001|Andaluc�a|  Almer�a|                Abla|     1342|
|  4002|Andaluc�a|  Almer�a|            Abrucena|     1279|
|  4003|Andaluc�a|  Almer�a|                Adra|    24670|
|  4004|Andaluc�a|  Almer�a|           Alb�nchez|      805|
|  4005|Andaluc�a|  Almer�a|           Alboloduy|      653|
|  4006|Andaluc�a|  Almer�a|               Albox|    11429|
|  4007|Andaluc�a|  Almer�a|             Alcolea|      812|
|  4008|Andaluc�a|  Almer�a|            Alc�ntar|      570|
|  4009|Andaluc�a|  Almer�a|Alcudia de Monteagud|      168|
|  4010|Andaluc�a|  Almer�a|             Alhabia|      700|
|  4011|Andaluc�a|  Almer�a|   Alhama de Almer�a|     3763|
|  4012|Andaluc�a|  Almer�a|              Alic�n|      220|
|  4013|Andaluc�a|  Almer�a|             Almer�a|   194203|
|  4014|Andaluc�a|  Almer�a|            

5. Generar un dataframe a partir del RDD de elecciones. Ajustar los tipos, borrar espacios, etc.

In [12]:
from pyspark.sql.types import IntegerType

In [13]:
schema_elec = StructType([
    StructField('codigo', StringType(), True),
    StructField('mesas', StringType(), True),
    StructField('censo', StringType(), True),
    StructField('votantes', StringType(), True),
    StructField('validos', StringType(), True),
    StructField('blanco', StringType(), True),
    StructField('nulos', StringType(), True),
    StructField('pp', StringType(), True),
    StructField('psoe', StringType(), True),
    StructField('podemosiuequo', StringType(), True),
    StructField('ciudadanos', StringType(), True),
    StructField('ecp', StringType(), True),
    StructField('podemoscompromiseupv', StringType(), True),
    StructField('erccasti', StringType(), True),
    StructField('cdc', StringType(), True),
    StructField('podemosenmareaanovaeu', StringType(), True),
    StructField('eajpnv', StringType(), True),
    StructField('ehbildu', StringType(), True),
    StructField('ccapnc', StringType(), True),
    StructField('pacma', StringType(), True),
    StructField('recortescerogrupoverde', StringType(), True),
    StructField('upyd', StringType(), True),
    StructField('vox', StringType(), True),
    StructField('bngnos', StringType(), True),
    StructField('pcpe', StringType(), True),
    StructField('gbai', StringType(), True),
    StructField('eb', StringType(), True),
    StructField('fedelasjons', StringType(), True),
    StructField('si', StringType(), True),
    StructField('somval', StringType(), True),
    StructField('ccd', StringType(), True),
    StructField('sain', StringType(), True),
    StructField('ph', StringType(), True),
    StructField('centromoderado', StringType(), True),
    StructField('plib', StringType(), True),
    StructField('ccdci', StringType(), True),
    StructField('upl', StringType(), True),
    StructField('pcoe', StringType(), True),
    StructField('and', StringType(), True),
    StructField('jxc', StringType(), True),
    StructField('pfyv', StringType(), True),
    StructField('cilus', StringType(), True),
    StructField('pxc', StringType(), True),
    StructField('mas', StringType(), True),
    StructField('izar', StringType(), True),
    StructField('unidaddelpueblo', StringType(), True),
    StructField('prepal', StringType(), True),
    StructField('ln', StringType(), True),
    StructField('repo', StringType(), True),
    StructField('independientesfia', StringType(), True),
    StructField('entaban', StringType(), True),
    StructField('imc', StringType(), True),
    StructField('puede', StringType(), True),
    StructField('fe', StringType(), True),
    StructField('alcd', StringType(), True),
    StructField('fme', StringType(), True),
    StructField('hrtsln', StringType(), True),
    StructField('udt', StringType(), True)
    
])

In [14]:
elec2 = sparkSQL.createDataFrame(elec_df, schema_elec)

In [15]:
elec2 = elec2.withColumn("codigo", elec2["codigo"].cast(IntegerType()))
elec2 = elec2.withColumn("mesas", elec2["mesas"].cast(IntegerType()))
elec2 = elec2.withColumn("censo", elec2["censo"].cast(IntegerType()))
elec2 = elec2.withColumn("votantes", elec2["votantes"].cast(IntegerType()))
elec2 = elec2.withColumn("validos", elec2["validos"].cast(IntegerType()))
elec2 = elec2.withColumn("blanco", elec2["blanco"].cast(IntegerType()))
elec2 = elec2.withColumn("nulos", elec2["nulos"].cast(IntegerType()))
elec2 = elec2.withColumn("pp", elec2["pp"].cast(IntegerType()))
elec2 = elec2.withColumn("psoe", elec2["psoe"].cast(IntegerType()))
elec2 = elec2.withColumn("podemosiuequo", elec2["podemosiuequo"].cast(IntegerType()))
elec2 = elec2.withColumn("ciudadanos", elec2["ciudadanos"].cast(IntegerType()))
elec2 = elec2.withColumn("ecp", elec2["ecp"].cast(IntegerType()))
elec2 = elec2.withColumn("podemoscompromiseupv", elec2["podemoscompromiseupv"].cast(IntegerType()))
elec2 = elec2.withColumn("erccasti", elec2["erccasti"].cast(IntegerType()))
elec2 = elec2.withColumn("cdc", elec2["cdc"].cast(IntegerType()))
elec2 = elec2.withColumn("podemosenmareaanovaeu", elec2["podemosenmareaanovaeu"].cast(IntegerType()))
elec2 = elec2.withColumn("eajpnv", elec2["eajpnv"].cast(IntegerType()))
elec2 = elec2.withColumn("ehbildu", elec2["ehbildu"].cast(IntegerType()))
elec2 = elec2.withColumn("ccapnc", elec2["ccapnc"].cast(IntegerType()))
elec2 = elec2.withColumn("pacma", elec2["pacma"].cast(IntegerType()))
elec2 = elec2.withColumn("recortescerogrupoverde", elec2["recortescerogrupoverde"].cast(IntegerType()))
elec2 = elec2.withColumn("upyd", elec2["upyd"].cast(IntegerType()))
elec2 = elec2.withColumn("vox", elec2["vox"].cast(IntegerType()))
elec2 = elec2.withColumn("bngnos", elec2["bngnos"].cast(IntegerType()))
elec2 = elec2.withColumn("pcpe", elec2["pcpe"].cast(IntegerType()))
elec2 = elec2.withColumn("gbai", elec2["gbai"].cast(IntegerType()))
elec2 = elec2.withColumn("eb", elec2["eb"].cast(IntegerType()))
elec2 = elec2.withColumn("fedelasjons", elec2["fedelasjons"].cast(IntegerType()))
elec2 = elec2.withColumn("si", elec2["si"].cast(IntegerType()))
elec2 = elec2.withColumn("somval", elec2["somval"].cast(IntegerType()))
elec2 = elec2.withColumn("ccd", elec2["ccd"].cast(IntegerType()))
elec2 = elec2.withColumn("sain", elec2["sain"].cast(IntegerType()))
elec2 = elec2.withColumn("ph", elec2["ph"].cast(IntegerType()))
elec2 = elec2.withColumn("centromoderado", elec2["centromoderado"].cast(IntegerType()))
elec2 = elec2.withColumn("plib", elec2["plib"].cast(IntegerType()))
elec2 = elec2.withColumn("ccdci", elec2["ccdci"].cast(IntegerType()))
elec2 = elec2.withColumn("upl", elec2["upl"].cast(IntegerType()))
elec2 = elec2.withColumn("pcoe", elec2["pcoe"].cast(IntegerType()))
elec2 = elec2.withColumn("and", elec2["and"].cast(IntegerType()))
elec2 = elec2.withColumn("jxc", elec2["jxc"].cast(IntegerType()))
elec2 = elec2.withColumn("pfyv", elec2["pfyv"].cast(IntegerType()))
elec2 = elec2.withColumn("cilus", elec2["cilus"].cast(IntegerType()))
elec2 = elec2.withColumn("pxc", elec2["pxc"].cast(IntegerType()))
elec2 = elec2.withColumn("mas", elec2["mas"].cast(IntegerType()))
elec2 = elec2.withColumn("izar", elec2["izar"].cast(IntegerType()))
elec2 = elec2.withColumn("unidaddelpueblo", elec2["unidaddelpueblo"].cast(IntegerType()))
elec2 = elec2.withColumn("prepal", elec2["prepal"].cast(IntegerType()))
elec2 = elec2.withColumn("ln", elec2["ln"].cast(IntegerType()))
elec2 = elec2.withColumn("repo", elec2["repo"].cast(IntegerType()))
elec2 = elec2.withColumn("independientesfia", elec2["independientesfia"].cast(IntegerType()))
elec2 = elec2.withColumn("entaban", elec2["entaban"].cast(IntegerType()))
elec2 = elec2.withColumn("imc", elec2["imc"].cast(IntegerType()))
elec2 = elec2.withColumn("puede", elec2["puede"].cast(IntegerType()))
elec2 = elec2.withColumn("fe", elec2["fe"].cast(IntegerType()))
elec2 = elec2.withColumn("alcd", elec2["alcd"].cast(IntegerType()))
elec2 = elec2.withColumn("fme", elec2["fme"].cast(IntegerType()))
elec2 = elec2.withColumn("hrtsln", elec2["hrtsln"].cast(IntegerType()))
elec2 = elec2.withColumn("udt", elec2["udt"].cast(IntegerType()))


Creamos el join de los ficheros de elecciones y municipios

In [16]:
df=elec2.join(mun2, ['codigo'], how='full')


6.Generar un dataframe a partir del dataframe de elecciones con el código de municipio y normalizando los votos a partidos. Deberían quedar tres campos (municipio, partido, votos).

In [17]:
df2=df.drop("mesas","censo","votantes","validos","blanco","nulos","comunidad","provincia").drop("codigo")

In [18]:
dfs= df2.selectExpr("municipio","stack(51,'pp',pp,'psoe',psoe,'podemosiuequo',podemosiuequo,'ciudadanos',ciudadanos,'ecp',ecp,'podemoscompromiseupv',podemoscompromiseupv,'erccasti',erccasti,'cdc',cdc,'podemosenmareaanovaeu',podemosenmareaanovaeu,'eajpnv',eajpnv,'ehbildu',ehbildu,'ccapnc',ccapnc,'pacma',pacma,'recortescerogrupoverde',recortescerogrupoverde,'upyd',upyd,'vox',vox,'bngnos',bngnos,'pcpe',pcpe,'gbai',gbai,'eb',eb,'fedelasjons',fedelasjons,'si',si,'somval',somval,'ccd',ccd,'sain',sain,'ph',ph,'centromoderado',centromoderado,'plib',plib,'ccdci',ccdci,'upl',upl,'pcoe',pcoe,'and',and,'jxc',jxc,'pfyv',pfyv,'cilus',cilus,'pxc',pxc,'mas',mas,'izar',izar,'unidaddelpueblo',unidaddelpueblo,'prepal',prepal,'ln',ln,'repo',repo,'independientesfia',independientesfia,'entaban',entaban,'imc',imc,'puede',puede,'fe',fe,'alcd',alcd,'fme',fme,'hrtsln',hrtsln,'udt',udt) as (partidos, votos)").where("votos is not null").show()

+---------+--------------------+-----+
|municipio|            partidos|votos|
+---------+--------------------+-----+
|   Viator|                  pp| 1017|
|   Viator|                psoe| 1116|
|   Viator|       podemosiuequo|  272|
|   Viator|          ciudadanos|  408|
|   Viator|                 ecp|    0|
|   Viator|podemoscompromiseupv|    0|
|   Viator|            erccasti|    0|
|   Viator|                 cdc|    0|
|   Viator|podemosenmareaano...|    0|
|   Viator|              eajpnv|    0|
|   Viator|             ehbildu|    0|
|   Viator|              ccapnc|    0|
|   Viator|               pacma|   18|
|   Viator|recortescerogrupo...|    2|
|   Viator|                upyd|    2|
|   Viator|                 vox|    3|
|   Viator|              bngnos|    0|
|   Viator|                pcpe|    3|
|   Viator|                gbai|    0|
|   Viator|                  eb|    5|
+---------+--------------------+-----+
only showing top 20 rows



7. Generar un fichero con el top 10 de población de municipios de España y otro con el bottom 10 de población. Necesitamos el nombre de los municipios y su población sólo. (1 punto extra) Sacar además del nombre la autonomía y provincia.

In [19]:
#mun2 = mun2.withColumn("poblacion", mun2["poblacion"].cast(IntegerType()))
import pyspark.sql.functions as psf
mun2.groupBy("comunidad","provincia","municipio")\
    .agg(psf.sum("poblacion").alias("top10_poblacion"))\
    .sort(desc("top10_poblacion"))\
    .show(10)

mun2.groupBy("comunidad","provincia","municipio")\
    .agg(psf.sum("poblacion").alias("bot10_poblacion"))\
    .sort("bot10_poblacion")\
    .show(10)

+--------------------+-------------------+--------------------+---------------+
|           comunidad|          provincia|           municipio|top10_poblacion|
+--------------------+-------------------+--------------------+---------------+
| Comunidad de Madrid|             Madrid|              Madrid|      3141991.0|
|           Catalunya|          Barcelona|           Barcelona|      1604555.0|
|Comunitat Valenciana|Valencia / Val�ncia|            Valencia|       786189.0|
|           Andaluc�a|            Sevilla|             Sevilla|       693878.0|
|              Arag�n|           Zaragoza|            Zaragoza|       664953.0|
|           Andaluc�a|             M�laga|              M�laga|       569130.0|
|    Regi�n de Murcia|             Murcia|              Murcia|       439889.0|
|       Illes Balears|      Illes Balears|   Palma de Mallorca|       400578.0|
|            Canarias|         Las Palmas|Las Palmas de Gra...|       379766.0|
|          Pa�s Vasco|            Bizkai

8. Queremos saber los 10 municipios donde ha habido más participación (el porcentaje de votos respecto el censo es más alto) y donde ha habido más abstención. Generaremos un fichero del top y uno del bottom con los datos del municipio y el % de participación. se divide votantes/censo y se ordena el porcentaje 

In [21]:
df_partic=df.select("municipio","censo","votantes")
df_partic = df_partic.withColumn("porc_participac", (df_partic["votantes"]/df_partic["censo"]))

Se observa que los municipios con mayor participación son ...

In [22]:
df_partic.sort(desc("porc_participac")).show(10)

+--------------------+-----+--------+---------------+
|           municipio|censo|votantes|porc_participac|
+--------------------+-----+--------+---------------+
|  Estepa de San Juan|    8|       8|            1.0|
|  Belmonte de Campos|   30|      30|            1.0|
|             La Zoma|   25|      25|            1.0|
|             Cidam�n|   22|      22|            1.0|
|Alcolea de las Pe�as|    8|       8|            1.0|
|          Salcedillo|   10|      10|            1.0|
|  Arevalillo de Cega|   27|      27|            1.0|
|         Castilnuevo|    8|       8|            1.0|
|       Torremochuela|   10|      10|            1.0|
|            Almohaja|   19|      19|            1.0|
+--------------------+-----+--------+---------------+
only showing top 10 rows



In [23]:
df_partic.sort("porc_participac").show(10)

+-------------------+-----+--------+-------------------+
|          municipio|censo|votantes|    porc_participac|
+-------------------+-----+--------+-------------------+
|              Arano|   99|      42|0.42424242424242425|
|Les Valls de Valira|  590|     258|0.43728813559322033|
|         Baliarrain|   95|      44| 0.4631578947368421|
|             Bele�a|  180|      84| 0.4666666666666667|
|            Llobera|  175|      83| 0.4742857142857143|
|             Balboa|  322|     153| 0.4751552795031056|
|           Albiztur|  252|     121| 0.4801587301587302|
|          Hernialde|  255|     123| 0.4823529411764706|
|            Ezkurra|  143|      69| 0.4825174825174825|
|           Purujosa|   35|      17| 0.4857142857142857|
+-------------------+-----+--------+-------------------+
only showing top 10 rows



In [29]:
df2=df.drop("mesas","censo","votantes","blanco","nulos","comunidad","provincia").drop("codigo")
df_stack= df2.selectExpr("municipio","validos","stack(51,'pp',pp,'psoe',psoe,'podemosiuequo',podemosiuequo,'ciudadanos',ciudadanos,'ecp',ecp,'podemoscompromiseupv',podemoscompromiseupv,'erccasti',erccasti,'cdc',cdc,'podemosenmareaanovaeu',podemosenmareaanovaeu,'eajpnv',eajpnv,'ehbildu',ehbildu,'ccapnc',ccapnc,'pacma',pacma,'recortescerogrupoverde',recortescerogrupoverde,'upyd',upyd,'vox',vox,'bngnos',bngnos,'pcpe',pcpe,'gbai',gbai,'eb',eb,'fedelasjons',fedelasjons,'si',si,'somval',somval,'ccd',ccd,'sain',sain,'ph',ph,'centromoderado',centromoderado,'plib',plib,'ccdci',ccdci,'upl',upl,'pcoe',pcoe,'and',and,'jxc',jxc,'pfyv',pfyv,'cilus',cilus,'pxc',pxc,'mas',mas,'izar',izar,'unidaddelpueblo',unidaddelpueblo,'prepal',prepal,'ln',ln,'repo',repo,'independientesfia',independientesfia,'entaban',entaban,'imc',imc,'puede',puede,'fe',fe,'alcd',alcd,'fme',fme,'hrtsln',hrtsln,'udt',udt) as (partidos, votos)").where("votos is not null")

In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc
df_stack.filter(col('validos')==col('votos')).show()

+--------------------+-------+--------+-----+
|           municipio|validos|partidos|votos|
+--------------------+-------+--------+-----+
|         Castilnuevo|      7|      pp|    7|
|         Congostrina|     10|      pp|   10|
|   Portillo de Soria|     12|      pp|   12|
|         Valdemadera|      7|      pp|    7|
|    La Vid de Bureba|     11|      pp|   11|
|Rebollosa de Jadr...|      9|      pp|    9|
+--------------------+-------+--------+-----+

