# Indicators for Chronic Disease Surveillance

## Authors

Juan Luis Onieva Zafra

Jesús Gómez Sola

Paloma Domínguez Sánchez

## Abstract

In this proyect we present an analysis of data of indicators of chronic diseases that are provided in the data.gov portal at the address <https://catalog.data.gov/dataset/u-s-chronic-disease-indicators-cdi>, from where we have downloaded the CSV file. 

<img src="MMWR.png" width="600">

The objective of the task is to use Spark to obtain various queries and represent them in a table and graph format. For this purpose, we are going to work with two different APIs: RDDs and datasets.

Our work is divided into: 

- **Tests**:

    *__init__.py*

    *AnalysisTest.py*

    *ReadCSVTest.py*

- **CDI**:

    *ReadCSV.py*
    
    *Analysis.py*
        
    *cdi.py*
    
    
- LICENSE

- README.md

- requirements.txt

- setup.py


## CDI

#### ReadCSV.py

This class is in charge of reading the CSV file format as RDD or data_frame. For this purpose the class create a spark_session object which read a file in format 'CSV' that is passed as parameter.

We can see defined two functions:


*def read_csv_with_data_frame(file_csv: str) -> DataFrame*

*def read_csv_with_rdd(file_csv: str) -> SparkContext*





In [19]:
"""
This script contains the function to read the csv with different methods
"""


from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.utils import AnalysisException
from py4j.protocol import Py4JError
from pyspark import SparkConf, SparkContext
import os


def read_csv_with_data_frame(file_csv: str) -> DataFrame:
    """
    Read CSV with as data frame with spark
    :param file_csv: file name of csv
    :return: all the data of the file as data frame
    """
    spark_session = SparkSession \
        .builder \
        .getOrCreate()

    logger = spark_session._jvm.org.apache.log4j
    logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)

    try:
        data_frame = spark_session\
            .read\
            .format("csv") \
            .options(header='true', inferschema='true')\
            .load(file_csv)
    except Py4JError:
        raise AnalysisException('There is no csv file in:'  + str(os.path))

    return data_frame


def read_csv_with_rdd(file_csv: str) -> SparkContext:
    """
    Read csv file with rdd, then take only the columns 5 (TopicID) and 6 (Question) and produce a list of tuples
    :param file_csv: file name of csv
    :return: list of tuples (TopicID, Question)s
    """
    spark_conf = SparkConf()
    spark_context = SparkContext(conf=spark_conf)
    logger = spark_context._jvm.org.apache.log4j
    logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
    rdd = spark_context \
        .textFile(file_csv)
    header = rdd.first()
    rdd = rdd.filter(lambda row: row!=header) \
        .map(lambda line: line.split(",")) \
        .map(lambda line: (line[4], line[6])) \
        .distinct() \
        .map(lambda list: (list[0], 1)) \
        .reduceByKey(lambda x, y: x + y) \
        .sortBy(lambda pair: pair[0]) \
        .collect()
    spark_context.stop()
    return rdd


#### Analysis.py
This class has defined several descriptive functions responsible for executing the corresponding queries:

" def get_data_frame_count_type_of_topic(data_frame: DataFrame) -> DataFrame " returns the number of type of diseases. 

" def get_data_frame_count_male_gender_by_topic(data_frame: DataFrame) -> DataFrame " returns the number of men that has each disease.

" def get_data_frame_count_black_ethnicity_by_topic(data_frame: DataFrame) -> DataFrame " returns the number of black ethnicity people that has each disease:


The last function is responsible for graphically representing previously defended functions:

def plot_type_of_topic(data_frame: DataFrame) -> None


In [22]:
"""
This script contains the necessary functions to deal with the data, obtain data frame and show some graphics
"""

import matplotlib.pyplot as plt
from pyspark.sql import DataFrame
from pyspark.sql.utils import AnalysisException
from py4j.protocol import Py4JError
import pandas as pb


def get_data_frame_count_type_of_topic(data_frame: DataFrame) -> pb.DataFrame:
    """
    From all the data, it takes the columns TopicID and Question and for each topic, count the number of+
    different SubTopic/Question
    :param data_frame: generate with pyspark, and contain all the data from the csv file
    :return: data frame of panda package
    """
    try:
        data_frame = data_frame \
            .select("TopicID", "Question") \
            .distinct() \
            .groupBy("TopicID") \
            .count() \
            .sort("TopicID")
    except Py4JError:
        raise AnalysisException('One columns is incorrect')
    print("The following table represent the number of the type of each topic")
    data_frame.show()
    data_frame_pandas = data_frame.toPandas()
    return data_frame_pandas


def get_rdd_count_type_of_topy(rdd: list) -> pb.DataFrame:
    """
    Take an specific list from rdd spark, which is formed as list of tuples (Topic, Question)
    :param rdd: list of tuples(Topic, Question)
    :return: data frame of package Pandas
    """
    data_frame_pandas = pb.DataFrame(rdd, columns=['Topic', 'Question'])
    print(data_frame_pandas)
    return data_frame_pandas


def get_data_frame_count_male_gender_by_topic(data_frame: DataFrame) -> pb.DataFrame:
    """
    From all the data, it takes the columns TopicID, and count the topic based on the gender
    :param data_frame: generate with pyspark, and contain all the data from the csv file
    :return: data frame of panda package
    """
    data_frame_topic = data_frame \
        .filter(data_frame["Stratification1"].contains("Male")) \
        .distinct() \
        .groupBy("TopicID") \
        .count() \
        .sort("TopicID")

    print("The following table represent the number of men group by the topic: ")
    data_frame_topic.show()
    data_frame_pandas = data_frame.toPandas()
    return data_frame_pandas


def get_data_frame_count_black_ethnicity_by_topic(data_frame: DataFrame) -> pb.DataFrame:
    """
    From all the data, it takes the columns TopicID, and count the topic based on the ethnicity
    :param data_frame: generate with pyspark, and contain all the data from the csv file
    :return: data frame of panda package
    """
    data_frame_topic = data_frame \
        .filter(data_frame["Stratification1"].contains("Black, non-Hispanic")) \
        .distinct() \
        .groupBy("TopicID") \
        .count() \
        .sort("TopicID")

    print("The following table represent the number of black ethnicity people group by the topic: ")
    data_frame_topic.show()
    data_frame_pandas = data_frame.toPandas()
    return data_frame_pandas


def plot_type_of_topic(data_frame: pb.DataFrame) -> None:
    """
    Plot a data frame with bar type
    :param data_frame:
    :return:
    """
    plt.interactive(False)
    plt.figure()
    data_frame.plot(kind='bar', x= data_frame['TopicID'])
    plt.show()


#### cdi.py

This is the MAIN class of the proyect which is in charge of join the rest of classes. 

We define a data_frame object for each query and for each of them we call the function "plot_type_of_topic" to represent the data.

In [None]:
file_csv = 'cdi/data/Chronic_Disease_Indicators_CDI.csv'
data_frame = read_csv_with_data_frame(file_csv)
data_frame_count_type = get_data_frame_count_type_of_topic(data_frame)
plot_type_of_topic(data_frame_count_type)



This graph correspond to all types of diseases of the study. As we can see the most abundant disease is NPAW and the least one is DIS and IMM

In [None]:
data_frame_count_men = get_data_frame_count_male_gender_by_topic(data_frame)
plot_type_of_topic(data_frame_count_men)


The following table represent the number of men group by the topic: 
+-------+-----+
|TopicID|count|
+-------+-----+
|    ALC| 3405|
|    ART| 5280|
|    AST| 4857|
|    CAN|  440|
|    CKD| 1425|
|   COPD| 9792|
|    CVD| 9333|
|    DIA| 9732|
|    DIS|  371|
|    IMM|  660|
|    MTH|  660|
|   NPAW| 3630|
|    OLD| 1920|
|    ORH| 1424|
|    OVC| 4601|
|    TOB| 3300|
+-------+-----+



This graph correspond to number of men that have each disease. As we can see the most abundant disease in men are COPD and DIA. The least abundant are DIS and CAN.

In [None]:
data_frame_count_black_ethnicity = get_data_frame_count_black_ethnicity_by_topic(data_frame)
plot_type_of_topic(data_frame_count_black_ethnicity)


## Tests

#### AnalysisTest.py

In [16]:
import unittest
from pyspark.sql.utils import AnalysisException


class MyTestCase(unittest.TestCase):

    def setUp(self):
        self.data_frame = read_csv_with_data_frame('data/pruebas.csv')
        self.data_frame_wrong = read_csv_with_data_frame('data/pruebas-wrong-column.csv')

    def test_when_count_subtopic_data_frame_should_have_at_least_columns_with_topic_and_subtopic(self):
        with self.assertRaises(AnalysisException):
            get_data_frame_count_type_of_topic(self.data_frame_wrong)

    def test_the_number_of_topic_must_be_correct(self):
        data_frame_topic = get_data_frame_count_type_of_topic(self.data_frame)
        total = data_frame_topic.count()
        expected_value = 3
        self.assertEqual(expected_value, total)

    def test_the_total_number_must_correspond_with_size_of_csv(self):
        data_frame_topic = get_data_frame_count_type_of_topic(self.data_frame)
        data_frame_pandas = data_frame_topic.toPandas()
        total = sum(data_frame_pandas['count'])
        expected_value = 6
        self.assertEqual(expected_value, total)


#### ReadCSVTest.py

In [18]:
import unittest
from pyspark.sql.utils import AnalysisException


class MyTestCase(unittest.TestCase):

    def test_read_csv_from_data_frame_read_correctly(self):
        data_frame = read_csv_with_data_frame('data/pruebas.csv')
        data_frame_total = data_frame \
            .count()
        expected_value = 12
        self.assertEqual(expected_value, data_frame_total)

    def test_raise_exception_when_the_file_is_not_csv(self):
        with self.assertRaises(AnalysisException):
            read_csv_with_data_frame('data/pruebas.tsv')

    def test_raise_exception_when_the_file_not_exist(self):
        with self.assertRaises(AnalysisException):
            read_csv_with_data_frame('data/no-file.tsv')
