#Tarea 8. Análisis de datos de indicadores de enfermedades crónicas
Autores:   
      Javier Moreno Venegas  
      María del Rocío Cabello Toscano  
      Juan Antonio Jiménez Guerrero  
Fecha: 05/06/2018    



Esta tarea consiste en analizar los datos de indicadores de enfermedades crónicas que se proporcionan en el portal data.gov en la dirección https://catalog.data.gov/dataset/u-s-chronic-disease-indicators-cdi. El objetivo de la tarea es usar Spark para obtener, a partir del fichero csv que contiene los datos, una tabla similar a la tabla I del informe que se aporta en la página, que es el año 2013, que incluya dos columnas: Indicator Group e Individual Measures.
La tarea se realizará en dos versiones: usando la API de RDDs y usando la de datasets.

A continuación exponemos el código del programa utilizando la API de RDDs. Para ello usamos el módulo pySpark que nos permite el análisis de gran cantidad de datos de manera concurrente. El filtro principal usado corresponde a los datos del año 2013 y escogemos las columnas correspondientes a Indicator Group e Individual Measures.

In [2]:
import sys

from pyspark import SparkConf, SparkContext
import time

def main(file_name: str) -> None:

    spark_conf = SparkConf()
    spark_context = SparkContext(conf=spark_conf)

    logger = spark_context._jvm.org.apache.log4j
    logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)

    start_computing_time = time.time()

    topics = \
    spark_context.textFile(file_name) \
    .map(lambda line: line.split(",")) \
    .filter(lambda list: list[0] == '2013') \
    .map(lambda list: (list[5].strip(" \""), list[6].strip(" \""))) \
    .filter(lambda list: list[0] != 'Topic' and list[1] != 'Question') \
    .distinct() \
    .map(lambda list: (list[0],1)) \
    .reduceByKey(lambda x, y: x + y) \
    .sortBy(lambda pair: pair[0]) 

    result = topics.collect()

    for pair in result:
        print(pair)

    total_computing_time = time.time() - start_computing_time
    print("Computing time: ", str(total_computing_time))

    spark_context.stop()


if __name__ == "__main__":
    """
    Python program that uses Apache Spark to find 
    """
    main("datos.csv")

('Alcohol', 9)
('Arthritis', 10)
('Asthma', 8)
('Cancer', 3)
('Cardiovascular Disease', 17)
('Chronic Kidney Disease', 4)
('Chronic Obstructive Pulmonary Disease', 14)
('Diabetes', 17)
('Disability', 1)
('Immunization', 1)
('Mental Health', 2)
('Nutrition', 1)
('Older Adults', 2)
('Overarching Conditions', 13)
('Reproductive Health', 1)
('SID', 2)
('Tobacco', 13)
('UDS', 1)
('USDA', 1)
Computing time:  6.409118175506592


In [None]:
Repetimos el mismo procedimiento,pero esta vez con la API de dataframe:

In [8]:
import time, sys

from pyspark.sql import SparkSession

def main(file_name) -> None:

    spark_session = SparkSession \
        .builder \
        .getOrCreate()

    logger = spark_session._jvm.org.apache.log4j
    logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)

    start_computing_time = time.time()

    data_frame = spark_session \
        .read \
        .format("csv") \
        .options(header='true', inferschema='true') \
        .load(file_name)

    #data_frame.printSchema()
    #data_frame.show()

    data_frame \
        .filter(data_frame["YearStart"] == 2013) \
        .select("Topic", "Question") \
        .distinct() \
        .orderBy("Topic") \
        .groupBy("Topic") \
        .count() \
        .show()

    total_computing_time = time.time() - start_computing_time
    print("Computing time: ", str(total_computing_time))


if __name__ == '__main__':
    """
    Python program that uses Apache Spark to read a .csv file and shows a table using the dataframe API.
    """

    main("datos.csv")

+--------------------+-----+
|               Topic|count|
+--------------------+-----+
|                 SID|    2|
|                USDA|    1|
|             Alcohol|    9|
|           Arthritis|   10|
|              Asthma|    8|
|              Cancer|    3|
|Cardiovascular Di...|   17|
|Chronic Kidney Di...|    4|
|Chronic Obstructi...|   14|
|            Diabetes|   17|
|          Disability|    1|
|        Immunization|    1|
|       Mental Health|    2|
|Nutrition, Physic...|   25|
|        Older Adults|    2|
|         Oral Health|    1|
|Overarching Condi...|   13|
| Reproductive Health|    1|
|             Tobacco|   13|
+--------------------+-----+

Computing time:  4.116872549057007


Como podemos observar,los datos obtenidos son idénticos, sin embargo, el formato de salida es diferente.