In [1]:
from pyspark.sql.functions import col, sum, to_date, to_timestamp, regexp_replace
from pyspark.sql.functions import date_format
from pyspark.sql.types import IntegerType, DoubleType, FloatType

In [2]:
dir_hdfs = "hdfs://namenode:9000/tmp/amd/locatel0311"

In [3]:
df = spark.read.csv(dir_hdfs, header=True, inferSchema=True)
df.printSchema()

root
 |-- id_folio: string (nullable = true)
 |-- fecha_solicitud: string (nullable = true)
 |-- hora_solicitud: string (nullable = true)
 |-- tipo_de_entrada: string (nullable = true)
 |-- tema_solicitud: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- edad: string (nullable = true)
 |-- estatus: string (nullable = true)
 |-- fecha_concluido: string (nullable = true)
 |-- alcaldia: string (nullable = true)
 |-- colonia_datos: string (nullable = true)
 |-- 0311_colonia_registro: string (nullable = true)
 |-- codigo_postal_solicitud: string (nullable = true)
 |-- latitud: string (nullable = true)
 |-- longitud: string (nullable = true)



In [4]:
datos = df.select(['fecha_solicitud','hora_solicitud','tema_solicitud','sexo','edad','estatus','alcaldia','colonia_datos','latitud','longitud','codigo_postal_solicitud'])

In [5]:
datos = datos.withColumn("fecha_solicitud", to_date(col("fecha_solicitud"), "yyyy-MM-dd")) \
.withColumn("hora_solicitud", date_format(col("hora_solicitud"), "HH:MM:SS")) \
.withColumn("edad", col("edad").cast(FloatType())) \
.withColumn("latitud", col("latitud").cast(DoubleType())) \
.withColumn("longitud", col("longitud").cast(DoubleType()))

In [6]:
from pyspark.sql.functions import avg, max, min, count
dfg = datos.groupBy("alcaldia")\
            .agg(avg("edad").alias("edad_promedio"),
            max("edad").alias("edad_maxima"),
            min("edad").alias("edad_minima"),
            count("*").alias("no_registros")
                )

In [7]:
dfg.orderBy("alcaldia", ascending=False).show(100, truncate=False)

+----------------------+------------------+-----------+-----------+------------+
|alcaldia              |edad_promedio     |edad_maxima|edad_minima|no_registros|
+----------------------+------------------+-----------+-----------+------------+
|XOCHIMILCO            |44.02038006058937 |100.0      |0.0        |50073       |
|VENUSTIANO CARRANZA   |34.12961033548656 |93.0       |0.0        |16713       |
|TLALPAN               |39.53971856559237 |98.0       |0.0        |54937       |
|TLAHUAC               |27.59472115793955 |88.0       |0.0        |17358       |
|Otro                  |NULL              |NULL       |NULL       |1           |
|NA                    |46.65257871998707 |100.0      |0.0        |856239      |
|MILPA ALTA            |33.32932330827067 |88.0       |0.0        |2879        |
|MIGUEL HIDALGO        |4.992536800110057 |96.0       |0.0        |97139       |
|LA MAGDALENA CONTRERAS|34.029847908745246|99.0       |0.0        |8593        |
|IZTAPALAPA            |34.5

In [8]:
from pyspark.sql.functions import when
datos = datos\
    .withColumn("alcaldia", when(datos.alcaldia == "IZTACALCO ","IZTACALCO")\
                .otherwise(datos.alcaldia))

In [9]:
datos.groupBy("alcaldia")\
            .agg(avg("edad").alias("edad_promedio"),
            max("edad").alias("edad_maxima"),
            min("edad").alias("edad_minima"),
            count("*").alias("no_registros")
                ).orderBy("alcaldia", ascending=False).show(100, truncate=False)

+----------------------+------------------+-----------+-----------+------------+
|alcaldia              |edad_promedio     |edad_maxima|edad_minima|no_registros|
+----------------------+------------------+-----------+-----------+------------+
|XOCHIMILCO            |44.02038006058937 |100.0      |0.0        |50073       |
|VENUSTIANO CARRANZA   |34.12961033548656 |93.0       |0.0        |16713       |
|TLALPAN               |39.53971856559237 |98.0       |0.0        |54937       |
|TLAHUAC               |27.59472115793955 |88.0       |0.0        |17358       |
|Otro                  |NULL              |NULL       |NULL       |1           |
|NA                    |46.65257871998707 |100.0      |0.0        |856239      |
|MILPA ALTA            |33.32932330827067 |88.0       |0.0        |2879        |
|MIGUEL HIDALGO        |4.992536800110057 |96.0       |0.0        |97139       |
|LA MAGDALENA CONTRERAS|34.029847908745246|99.0       |0.0        |8593        |
|IZTAPALAPA            |34.5

In [10]:
sepomex_df = spark.read\
            .option("header","true")\
            .option("delimiter", "|")\
            .option("interSchema", "true")\
            .csv("hdfs://namenode:9000/tmp/amd/sepomex/")

In [11]:
sepomex_df.printSchema()

root
 |-- d_codigo: string (nullable = true)
 |-- d_asenta: string (nullable = true)
 |-- d_tipo_asenta: string (nullable = true)
 |-- D_mnpio: string (nullable = true)
 |-- d_estado: string (nullable = true)
 |-- d_ciudad: string (nullable = true)
 |-- d_CP: string (nullable = true)
 |-- c_estado: string (nullable = true)
 |-- c_oficina: string (nullable = true)
 |-- c_CP: string (nullable = true)
 |-- c_tipo_asenta: string (nullable = true)
 |-- c_mnpio: string (nullable = true)
 |-- id_asenta_cpcons: string (nullable = true)
 |-- d_zona: string (nullable = true)
 |-- c_cve_ciudad: string (nullable = true)



In [12]:
sepomex_df.show(truncate=False)

+--------+------------------------------------+-------------+--------------+----------------+----------------+-----+--------+---------+----+-------------+-------+----------------+------+------------+
|d_codigo|d_asenta                            |d_tipo_asenta|D_mnpio       |d_estado        |d_ciudad        |d_CP |c_estado|c_oficina|c_CP|c_tipo_asenta|c_mnpio|id_asenta_cpcons|d_zona|c_cve_ciudad|
+--------+------------------------------------+-------------+--------------+----------------+----------------+-----+--------+---------+----+-------------+-------+----------------+------+------------+
|01000   |San Ángel                           |Colonia      |Álvaro Obregón|Ciudad de México|Ciudad de México|01001|09      |01001    |NULL|09           |010    |0001            |Urbano|01          |
|01010   |Los Alpes                           |Colonia      |Álvaro Obregón|Ciudad de México|Ciudad de México|01001|09      |01001    |NULL|09           |010    |0005            |Urbano|01          |


In [13]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
 
schema = StructType([
    StructField("d_codigo", StringType(), True),
    StructField("d_asenta", StringType(), True),
    StructField("d_tipo_asenta", StringType(), True),
    StructField("D_mnpio", StringType(), True),
    StructField("d_estado", StringType(), True),
    StructField("d_ciudad", StringType(), True),
    StructField("d_CP", StringType(), True),
    StructField("c_estado", StringType(), True),
    StructField("c_oficina", StringType(), True),
    StructField("c_CP", StringType(), True),
    StructField("c_tipo_asenta", StringType(), True),
    StructField("c_mnpio", StringType(), True),
    StructField("id_asenta_cpcons", StringType(), True),
    StructField("d_zona", StringType(), True),
    StructField("c_cve_ciudad", StringType(), True)
])


In [14]:
sepomex_df = spark.read\
            .option("header","true")\
            .option("delimiter", "|")\
            .option("skipRows", "1")\
            .option("encoding", "UTF-8")\
            .schema(schema)\
            .csv("hdfs://namenode:9000/tmp/amd/sepomex/")

In [15]:
sepomex_df.show(truncate=False)

+--------+------------------------------------+-------------+--------------+----------------+----------------+-----+--------+---------+----+-------------+-------+----------------+------+------------+
|d_codigo|d_asenta                            |d_tipo_asenta|D_mnpio       |d_estado        |d_ciudad        |d_CP |c_estado|c_oficina|c_CP|c_tipo_asenta|c_mnpio|id_asenta_cpcons|d_zona|c_cve_ciudad|
+--------+------------------------------------+-------------+--------------+----------------+----------------+-----+--------+---------+----+-------------+-------+----------------+------+------------+
|01000   |San Ángel                           |Colonia      |Álvaro Obregón|Ciudad de México|Ciudad de México|01001|09      |01001    |NULL|09           |010    |0001            |Urbano|01          |
|01010   |Los Alpes                           |Colonia      |Álvaro Obregón|Ciudad de México|Ciudad de México|01001|09      |01001    |NULL|09           |010    |0005            |Urbano|01          |


In [16]:
dfcp = sepomex_df.select(['d_codigo','D_mnpio','d_estado'])

In [17]:
dfcp.count()

155922

In [18]:
dfcp.show(truncate = False)

+--------+--------------+----------------+
|d_codigo|D_mnpio       |d_estado        |
+--------+--------------+----------------+
|01000   |Álvaro Obregón|Ciudad de México|
|01010   |Álvaro Obregón|Ciudad de México|
|01020   |Álvaro Obregón|Ciudad de México|
|01030   |Álvaro Obregón|Ciudad de México|
|01030   |Álvaro Obregón|Ciudad de México|
|01040   |Álvaro Obregón|Ciudad de México|
|01049   |Álvaro Obregón|Ciudad de México|
|01050   |Álvaro Obregón|Ciudad de México|
|01060   |Álvaro Obregón|Ciudad de México|
|01060   |Álvaro Obregón|Ciudad de México|
|01070   |Álvaro Obregón|Ciudad de México|
|01080   |Álvaro Obregón|Ciudad de México|
|01089   |Álvaro Obregón|Ciudad de México|
|01090   |Álvaro Obregón|Ciudad de México|
|01090   |Álvaro Obregón|Ciudad de México|
|01090   |Álvaro Obregón|Ciudad de México|
|01100   |Álvaro Obregón|Ciudad de México|
|01109   |Álvaro Obregón|Ciudad de México|
|01110   |Álvaro Obregón|Ciudad de México|
|01110   |Álvaro Obregón|Ciudad de México|
+--------+-

In [19]:
#Verificar duplicados en dfcp
#Agrupamos por el codigo postal 
dfcp_dup = dfcp.groupBy("d_codigo").agg(count("*").alias("count"))\
                .filter("count > 1")

print(dfcp_dup.count())
dfcp_dup.orderBy("count", ascending=False).show(100, truncate=False)

21070
+--------+-----+
|d_codigo|count|
+--------+-----+
|85203   |310  |
|33970   |280  |
|85218   |275  |
|33123   |234  |
|33196   |225  |
|33637   |219  |
|83303   |218  |
|83300   |217  |
|85213   |176  |
|33195   |169  |
|33188   |166  |
|33194   |163  |
|83334   |160  |
|34985   |153  |
|33187   |152  |
|85200   |144  |
|33186   |141  |
|33198   |141  |
|85773   |133  |
|78439   |132  |
|33485   |125  |
|33127   |118  |
|34986   |118  |
|33197   |115  |
|30420   |115  |
|33190   |111  |
|83333   |111  |
|85209   |110  |
|29950   |107  |
|34983   |107  |
|32575   |106  |
|34570   |103  |
|34993   |103  |
|95835   |100  |
|85208   |99   |
|33484   |95   |
|37917   |95   |
|83324   |95   |
|85207   |94   |
|33573   |92   |
|29960   |92   |
|33633   |92   |
|30425   |91   |
|34994   |91   |
|36984   |90   |
|34997   |89   |
|85775   |87   |
|36210   |86   |
|84175   |86   |
|45200   |85   |
|83323   |85   |
|43000   |84   |
|37914   |83   |
|33193   |82   |
|34990   |81   |
|36985  

In [20]:
#Ver duplicados en dfcp
dfcp.filter("d_codigo = 85203").show(truncate=False)

+--------+-------+--------+
|d_codigo|D_mnpio|d_estado|
+--------+-------+--------+
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
|85203   |Cajeme |Sonora  |
+--------+-------+--------+
only showing top 20 rows



In [21]:
#Eliminar codigos duplicados
dfcp = dfcp.dropDuplicates(["d_codigo"])

In [22]:
#Validar que ya no existen codigos repetidos
dfcp.groupBy("d_codigo").agg(count("*").alias("count"))\
    .filter("count>1").show()

+--------+-----+
|d_codigo|count|
+--------+-----+
+--------+-----+



In [23]:
from pyspark.sql.functions import col, lpad, regexp_replace, trim
datos = datos.withColumn(
    "codigo_postal_solicitud", lpad(
        trim(
            regexp_replace(
                col("codigo_postal_solicitud"),"[^0-9]","")
        ),5,"0"
    )        
    
)

In [24]:
datos.select("codigo_postal_solicitud").dropDuplicates(["codigo_postal_solicitud"]).show()

+-----------------------+
|codigo_postal_solicitud|
+-----------------------+
|                  02090|
|                  02070|
|                  07200|
|                  16320|
|                  07820|
|                  04319|
|                  13610|
|                  06660|
|                  09120|
|                  51550|
|                  09560|
|                  94462|
|                  57500|
|                  56490|
|                  05563|
|                  57180|
|                  55520|
|                  27081|
|                  13460|
|                  07810|
+-----------------------+
only showing top 20 rows



In [25]:
from pyspark.sql.functions import when, length
datos\
    .select(["alcaldia", "codigo_postal_solicitud"])\
    .filter((col("codigo_postal_solicitud")== "00000")
            & (length(col("alcaldia"))==5))\
    .show()

+--------+-----------------------+
|alcaldia|codigo_postal_solicitud|
+--------+-----------------------+
|   11850|                  00000|
|   04300|                  00000|
|   01000|                  00000|
|   06400|                  00000|
|   13610|                  00000|
|   11700|                  00000|
|   03400|                  00000|
|   04909|                  00000|
|   15530|                  00000|
|   04600|                  00000|
|   01810|                  00000|
|   06200|                  00000|
|   16020|                  00000|
|   04909|                  00000|
|   03510|                  00000|
|   04870|                  00000|
|   06300|                  00000|
|   04600|                  00000|
|   09819|                  00000|
|   09140|                  00000|
+--------+-----------------------+
only showing top 20 rows



In [26]:
#Actualizar CP cunado sea 00000 cambiar por alcaldia
datos = datos.withColumn(
"codigo_postal_solicitud",
when(
    (col("codigo_postal_solicitud")== "00000")
    & (length(col("alcaldia"))==5),
    col("alcaldia")).otherwise(col("codigo_postal_solicitud"))
)

In [27]:
datos.select(["alcaldia", "codigo_postal_solicitud"]).filter(col("codigo_postal_solicitud") == col("alcaldia")).show(truncate = False)

+--------+-----------------------+
|alcaldia|codigo_postal_solicitud|
+--------+-----------------------+
|11850   |11850                  |
|04300   |04300                  |
|01000   |01000                  |
|06400   |06400                  |
|13610   |13610                  |
|11700   |11700                  |
|03400   |03400                  |
|04909   |04909                  |
|15530   |15530                  |
|04600   |04600                  |
|01810   |01810                  |
|06200   |06200                  |
|16020   |16020                  |
|04909   |04909                  |
|03510   |03510                  |
|04870   |04870                  |
|06300   |06300                  |
|04600   |04600                  |
|09819   |09819                  |
|09140   |09140                  |
+--------+-----------------------+
only showing top 20 rows



In [28]:
from pyspark.sql.functions import col, coalesce
datos_mapeados = datos.join(dfcp.select("d_codigo", "D_mnpio"),\
                           datos["codigo_postal_solicitud"] == dfcp["d_codigo"],\
                            "left")

In [29]:
datos_mapeados.select(["codigo_postal_solicitud", "alcaldia", "d_codigo", "D_mnpio"])\
                .show(100, truncate=False)

+-----------------------+----------+--------+---------------------+
|codigo_postal_solicitud|alcaldia  |d_codigo|D_mnpio              |
+-----------------------+----------+--------+---------------------+
|00000                  |NA        |NULL    |NULL                 |
|00000                  |NA        |NULL    |NULL                 |
|00000                  |NA        |NULL    |NULL                 |
|00000                  |NA        |NULL    |NULL                 |
|00000                  |NA        |NULL    |NULL                 |
|00000                  |NA        |NULL    |NULL                 |
|00000                  |NA        |NULL    |NULL                 |
|00000                  |NA        |NULL    |NULL                 |
|00000                  |NA        |NULL    |NULL                 |
|00000                  |NA        |NULL    |NULL                 |
|00000                  |NA        |NULL    |NULL                 |
|00000                  |NA        |NULL    |NUL

In [30]:
datos_mapeados = datos_mapeados.withColumn("alcaldia", coalesce(col("D_mnpio"), col("alcaldia")))

In [31]:
datos_mapeados.select(["edad", "alcaldia", "codigo_postal_solicitud", "d_codigo"]).show(100, truncate=False)

+----+---------------------+-----------------------+--------+
|edad|alcaldia             |codigo_postal_solicitud|d_codigo|
+----+---------------------+-----------------------+--------+
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA                   |00000                  |NULL    |
|NULL|NA

In [32]:
dfg = datos_mapeados.groupBy("alcaldia").agg(
    avg("edad").alias("edad_promedio"),
    max("edad").alias("edad_maxima"),
    min("edad").alias("edad_minima"),
    count("*").alias("no_registros")
)

In [33]:
dfg.count()

959

In [34]:
dfg.orderBy('alcaldia', ascending = False).show(1549, truncate = False)

+-------------------------------------------------+------------------+-----------+-----------+------------+
|alcaldia                                         |edad_promedio     |edad_maxima|edad_minima|no_registros|
+-------------------------------------------------+------------------+-----------+-----------+------------+
|Álvaro Obregón                                   |46.25045387592916 |97.0       |0.0        |164709      |
|Álamo Temapache                                  |50.0              |50.0       |50.0       |1           |
|Zumpango                                         |35.333333333333336|58.0       |20.0       |16          |
|Zumpahuacán                                      |24.0              |24.0       |24.0       |1           |
|Zongolica                                        |51.0              |51.0       |51.0       |1           |
|Zitácuaro                                        |48.0              |48.0       |48.0       |1           |
|Ziracuaretiro              

In [35]:
datos_mapeados.write.mode("overwrite").option("header", "true").csv("hdfs://namenode:9000/tmp/amd/locatellimpio")