# Loading Data

## Local csv file

In [None]:
region_mapping_df = spark.read.csv('../data/state_region_map.csv', header=True)

## Local parquet file

In [None]:
disease_df = spark.read.parquet('../data/chronic_disease_indicators')

## [Parquet file from s3]

We first we need to add hadoop-aws and aws-java-sdk jars

1. Download for proper version of hadoop and java you have installed
2. Link them to venv_spark_workshop/lib/python3.7/site-packages/pyspark/jars
3. Set up your aws credentials by exporting AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
4. Debug... this can be a challenge and might be easier if you setup pyspark yourself rather than installing with pip

In [None]:
# public_finance_data = spark.read.csv('s3a://deutsche-boerse-xetra-pds/2019-02-24', header=True)

# Summarizing Data

__If our data doesn't fit in memory, how do we get a sense for what's in it?__

Spark data frame documentation: https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html

In [None]:
disease_df.printSchema()

In [None]:
region_mapping_df.rdd.getNumPartitions()

In [None]:
disease_df.rdd.getNumPartitions()

### Let's look more into [partitions](1_Partitioning.ipynb)

## Looking at subsets of the data

Get the first(ish) row as a dictionary

In [None]:
disease_df.head().asDict()

Show first 20 rows with formatting (only use with few columns)

In [None]:
region_mapping_df.show()

Calling sample allows for creating a sample data frame for prototyping

In [None]:
sampled_df = disease_df.sample(withReplacement=False, fraction=0.001, seed=1199)

In [None]:
sampled_df.count()

## Looking at simple aggregates of the data

In [None]:
import pyspark.sql.functions as F

In [None]:
disease_df.count()

In [None]:
locations = disease_df.select("LocationAbbr").distinct().toPandas()

In [None]:
locations.head()

Lets look at frequency of topics by aggregating and collecting to Pandas

In [None]:
%%time
questions_per_topic = disease_df.groupby(
    'Topic'
).agg(
    F.countDistinct("QuestionID").alias('n_questions')
)

In [None]:
%%time
pd_questions_per_topic = questions_per_topic.toPandas()

In [None]:
len(pd_questions_per_topic)

In [None]:
pd_questions_per_topic.set_index('Topic').sort_values('n_questions', ascending=False)

### Let's look more into [lazy execution](2_Execution.ipynb)

## Joining data frames

In [None]:
region_mapping_df.columns

In [None]:
disease_region_df = disease_df.join(
    region_mapping_df,
    on=disease_df['LocationAbbr']==region_mapping_df['State Code'],
    how='left_outer'
).select(
    disease_df['*'], region_mapping_df['Region'], region_mapping_df['Division']
)

In [None]:
disease_region_df.head().asDict()

In [None]:
adjusted_binge_drinking_reponses = disease_region_df.filter(
    "QuestionID='ALC2_2' and StratificationCategoryID1='OVERALL' and DataValueTypeID='AGEADJPREV'"
)

In [None]:
adjusted_binge_drinking_reponses.select('Question').head().asDict()

In [None]:
division_label = 'Region'

In [None]:
binge_by_region = adjusted_binge_drinking_reponses.groupby(
    [division_label, 'YearStart']
).agg(
    F.mean('DataValue').alias('average_binge_drinking'),
    F.stddev_samp('DataValue').alias('std_binge_drinking'),
).toPandas()

In [None]:
binge_by_region.head()

Get ready to plot by setting inline plotting!

In [None]:
%matplotlib inline

In [None]:
binge_by_region.set_index('YearStart').sort_index()\
    .groupby(division_label)\
    .average_binge_drinking.plot(legend=True);