# Setup

In [1]:
%%bash
if [[ -d project-tycho-utilities ]];
then
  cd project-tycho-utilities/
  git pull
else
  git clone https://github.com/lgautier/project-tycho-utilities.git
  cd project-tycho-utilities/
fi
DBNAME=../tycho.db make all

Already up-to-date.
wget -q --show-progress http://www.healthdata.gov/sites/default/files/ProjectTycho_Level2_v1.1.0_0.zip
unzip ProjectTycho_Level2_v1.1.0_0.zip && rm ProjectTycho_Level2_v1.1.0_0.zip
Makefile:9: recipe for target 'ProjectTycho_Level2_v1.1.0.csv.gz' failed
rm ProjectTycho_Level2_v1.1.0_0.zip



     0K .......... .......... .......... .......... ..........  0%  766K 28s
    50K .......... .......... .......... .......... ..........  0%  843K 27s
   100K .......... .......... .......... .......... ..........  0% 24.3M 18s
   150K .......... .......... .......... .......... ..........  0%  922K 19s
   200K .......... .......... .......... .......... ..........  1% 20.9M 16s
   250K .......... .......... .......... .......... ..........  1% 24.0M 13s
   300K .......... .......... .......... .......... ..........  1% 4.86M 12s
   350K .......... .......... .......... .......... ..........  1% 1.16M 13s
   400K .......... .......... .......... .......... ..........  2% 27.0M 11s
   450K .......... .......... .......... .......... ..........  2% 27.9M 10s
   500K .......... .......... .......... .......... ..........  2% 21.4M 9s
   550K .......... .......... .......... .......... ..........  2% 2.43M 9s
   600K .......... .......... .......... .......... ..........  3% 41.6M 9s
 

In [2]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SQLContext, Row

# configuration
conf = pyspark.conf.SparkConf()
(conf.setMaster('local[2]')
 .setAppName('ipython-notebook')
 .set("spark.executor.memory", "2g"))

# Spark context
sc = pyspark.SparkContext(conf=conf)

# SQL context
sqlcontext = SQLContext(sc)

ModuleNotFoundError: No module named 'findspark'

---

<!-- label:spark_sql_create -->

In [None]:
import sqlite3
dbfilename = "tycho.db"
dbcon = sqlite3.connect(dbfilename)
cursor = dbcon.cursor()

In [None]:
cursor.execute("SELECT * FROM location")
location = \
    sqlcontext.createDataFrame(cursor,
                               tuple(x[0] for x in cursor.description))
location.registerTempTable("location")

sql = """
SELECT * 
FROM (SELECT * FROM disease WHERE name='PNEUMONIA') AS disease
INNER JOIN casecount
ON disease.id=casecount.disease_id"""

cursor.execute(sql)
casecount = \
    sqlcontext.createDataFrame(cursor,
                               tuple(x[0] for x in cursor.description))
casecount.registerTempTable("casecount")

---

<!-- label:spark_sqlmapreduceggplot_1_2 -->

# SQL query

In [None]:
## --- SQL ---
sql = """
SELECT state, city, date_from, event, count AS ct
FROM (SELECT * FROM casecount WHERE epiweek LIKE '1912%') AS sub
INNER JOIN location
ON location.id=sub.location_id
"""

y_1912 = sqlcontext.sql(sql)

## --- Spark ---
cases = (y_1912
         .rdd
         .map(lambda rec: ((rec.state,
	                    int(rec.date_from.split('-')[1]),
			    rec.event),
                           rec.ct))
         .reduceByKey(lambda a, b: a + b)).collect()

In [None]:
from rpy2.robjects.lib import dplyr
import rpy2.robjects.lib.ggplot2 as gg
from rpy2.robjects.packages import importr
maps = importr('maps')
states = dplyr.DataFrame(gg.map_data('state'))

from rpy2.robjects import baseenv
states_map = (dplyr.DataFrame({'state': baseenv.get('state.abb'),
                               'region': baseenv.get('state.name')})
              .mutate(region = 'tolower(region)')
              .inner_join(states, by="region"))


---

<!-- label:spark_sqlmapreduceggplot_2_2 -->
<!-- config:split-output -->

In [None]:
## --- R (from Python) ---
from rpy2.robjects import StrVector, IntVector, FactorVector, Formula
from rpy2.ipython.ggplot import image_png
from IPython.display import display

months = StrVector([str(x) for x in range(1,13)])
res = dplyr.DataFrame({'state': StrVector([x[0][0] for x in cases]),
                       'month': FactorVector([x[0][1] for x in cases],
                                             levels = months),
                       'event': FactorVector([x[0][2] for x in cases]),
                       'count': IntVector([x[1] for x in cases])})
dataf_plot = (states_map
              .left_join(res, by="state")
              .arrange('order'))

jetcols = StrVector(("#00007F", "#007FFF", "#7FFF7F", "#FF7F00", "#7F0000"))
p = (gg.ggplot(dataf_plot.filter('event=="CASES"')) +
     gg.geom_polygon(gg.aes_string(x='long', y='lat',
                                   group='group', fill='count')) +
     gg.coord_map("albers",  at0 = 45.5, lat1 = 29.5) +
     gg.scale_fill_gradientn(colors=jetcols, trans='sqrt') +
     gg.facet_wrap(facets=Formula("~month")) +
     gg.ggtitle("Cases of Pneumonia in 1912"))

display(image_png(p))

<!-- label:spark_sqlmapreduceggplot_3_2 -->
<!-- config:split-output -->

In [None]:
dataf_now = dataf_plot.filter('month %in% c(8,9)')
p = (gg.ggplot(dataf_now.filter('event=="CASES"')) +
     gg.geom_polygon(gg.aes_string(x='long', y='lat',
                                   group='group', fill='count')) +
     gg.coord_map("albers",  at0 = 45.5, lat1 = 29.5) +
     gg.scale_fill_gradientn(colors=jetcols, trans='sqrt') +
     gg.facet_wrap(facets=Formula("~month")) +
     gg.ggtitle("Cases of Pneumonia in 1912"))

display(image_png(p))

<!-- label:spark_sqlmapreduceggplot_4_2 -->
<!-- config:split-output -->

In [None]:
dataf_now = (dataf_plot
             .filter('month %in% c(8,9)',
                     'state %in% c("CT", "NY", "MA", "NJ", "NH", "VM")'))
p = (gg.ggplot(dataf_now.filter('event=="CASES"')) +
     gg.geom_polygon(gg.aes_string(x='long', y='lat',
                                   group='group', fill='count')) +
     gg.coord_map("albers",  at0 = 45.5, lat1 = 29.5) +
     gg.scale_fill_gradientn(colors=jetcols, trans='sqrt') +
     gg.facet_wrap(facets=Formula("~month")) +
     gg.ggtitle("Cases of Pneumonia in 1912"))

display(image_png(p))