# Identify Insights

When we have some available data, in many cases our ultimate goal to conduct data analyses is to answer various business questions such that the data can generate business values. With even one dataset, some business questions may be addressed and some insights could be identified.

After the quick view that discussed in the previous section, we have developed a better sense of the information and the quality of the dataset, based on which we can start doing relatively detailed and complicated analyses.

* <a href=#bookmark1>Preprocess Input Data</a>
* <a href=#bookmark2>Quick Query</a>
* <a href=#bookmark3>Ad-hoc Profiling</a>

In [1]:
# Import necessary packages
import smv
import sys
from pandas import *
from pyspark.sql import HiveContext, DataFrame
from pyspark.sql.window import Window

%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt

In [2]:
# Read the input data from a csv file
raw = openCsv("data/input/employment/CB1200CZ11.csv")

In [3]:
# self-defined dataframe functions
DataFrame.smvPdHead = lambda df, n: df.toPandas().head(n)
DataFrame.smvPdFreqHist = lambda df,col: df.toPandas()[col].value_counts().plot(kind='bar')

<a name='bookmark1'/>
## Preprocess Input Data

Assume we only have one dataset for the analysis. Before moving forward, based on the input data profile discussed in the previous section, input data need to be preprocessed to ease complicated analyses. Preprocess

From the input data profile, we know that there are suppressed employment and payroll data. One can choose to do the following:
1. Filter out the suppressed data if we are sure we don't need them later
2. Generate clean indicators for data suppression, which leaves the flexibility to apply different filters in later analysis

In [4]:
# 1. Filter out the suppressed data if we are sure we don't need them later
raw_clean = raw.where(col("EMP_F").isNull()).cache()

# check output
raw_clean.select("EMP_F").smvEdd()

EMP_F                Non-Null Count         0
EMP_F                Min Length             null
EMP_F                Max Length             null
EMP_F                Approx Distinct Count  0


In [5]:
# 2. Generate clean indicators for data suppression
raw_clean2 = raw.smvSelectPlus(
    col("EMP_F").isNull().cast(StringType()).alias("EMP_F_IND"),
    col("PAYANN_F").isNull().cast(StringType()).alias("PAYANN_F_IND")
).cache()

# check output
raw_clean2.smvConcatHist(["EMP_F_IND","PAYANN_F_IND"])

Histogram of EMP_F_IND_PAYANN_F_IND: String sort by Key
key                      count      Pct    cumCount   cumPct
false_false               7089   18.26%        7089   18.26%
false_true                2090    5.38%        9179   23.65%
true_true                29639   76.35%       38818  100.00%
-------------------------------------------------


<a name='bookmark2'/>
## Quick Query

Quickly query for some data and results to answer straightforward business questions. 

***Example:*** Suppose we would like to know how many zip codes are recorded for state California. 

However we are not sure which state number represents California. From viewing the first n rows of the data it seems that the 2-digit code within the bracket of column "GEO_TTL" looks like the state code. We can validate and look for California's ST number by the following operation:

In [6]:
raw_clean.where(instr("GEO_TTL", ",CA")>0).select("ST").distinct().show()

+---+
| ST|
+---+
| 06|
+---+



In [7]:
raw_clean.filter(col("ST") == "06").count()

1884


----
***Example:*** Quickly look at average # of paid employees per establishment in CA

In [8]:
raw_clean.filter(col("ST") == "06").select((col("EMP")/col("ESTAB")).alias("EMP_per_ESTAB")).smvEdd()

EMP_per_ESTAB        Non-Null Count         1884
EMP_per_ESTAB        Average                12.521105763266984
EMP_per_ESTAB        Standard Deviation     14.609728051190217
EMP_per_ESTAB        Min                    0.4
EMP_per_ESTAB        Max                    310.9


Remarks: We can easily get the summary stats for the average # of paid employees of a certain state.

Following the above examples, to ease the analyses and make the results more readable, we can define a new variable for state code.

In [9]:
df1 = raw_clean. \
smvSelectPlus(substring_index(substring_index(col("GEO_TTL"), ",", -1),")", 1).alias("ST_CD")). \
where(length(col("ST_CD"))<3).cache()

# check results
df1.smvHist("ST_CD")

Histogram of ST_CD: String sort by Key
key                      count      Pct    cumCount   cumPct
AK                         153    0.52%         153    0.52%
AL                         590    1.99%         743    2.51%
AR                         473    1.60%        1216    4.11%
AZ                         371    1.25%        1587    5.36%
CA                        1883    6.36%        3470   11.72%
CO                         472    1.59%        3942   13.32%
CT                         297    1.00%        4239   14.32%
DC                          69    0.23%        4308   14.55%
DE                          65    0.22%        4373   14.77%
FL                        1151    3.89%        5524   18.66%
GA                         740    2.50%        6264   21.16%
HI                         102    0.34%        6366   21.50%
IA                         738    2.49%        7104   24.00%
ID                         207    0.70%        7311   24.70%
IL                        1161    3.92%       


----
***Example:*** For each state, how many counties have more than 30,000 paid employees?

In [10]:
df1.filter(col("EMP") > 30000).smvFreqHist("ST_CD")

Histogram of ST_CD: String sorted by Frequency
key                      count      Pct    cumCount   cumPct
CA                          68   16.59%          68   16.59%
TX                          45   10.98%         113   27.56%
NY                          32    7.80%         145   35.37%
IL                          24    5.85%         169   41.22%
FL                          19    4.63%         188   45.85%
GA                          15    3.66%         203   49.51%
NJ                          14    3.41%         217   52.93%
OH                          14    3.41%         231   56.34%
VA                          14    3.41%         245   59.76%
MA                          13    3.17%         258   62.93%
PA                          13    3.17%         271   66.10%
MI                          12    2.93%         283   69.02%
AZ                          11    2.68%         294   71.71%
MD                          10    2.44%         304   74.15%
IN                           9    2.20


----
***Example:*** Find out the top 10 states with the highest average payroll per paid employee  

This kind of questions can be answered by grouping data to the desired level (state level in this case) and apply relevant aggregation functions (average, sum, count, max, min, etc.)

In [11]:
df1.groupBy("ST_CD").\
agg(avg(col("PAYANN")/col("EMP")).alias("avg_PAYANN_per_EMP")).\
orderBy(desc("avg_PAYANN_per_EMP")).smvPdHead(10)

Unnamed: 0,ST_CD,avg_PAYANN_per_EMP
0,DC,60.449275
1,CT,46.757576
2,NJ,45.247649
3,MA,43.566607
4,AK,42.084967
5,CA,41.005311
6,WY,40.40625
7,NY,39.767498
8,CO,38.40678
9,ND,38.149813



----
***Example:*** For each state, find the top 5 zipcodes with the highest payroll per employee

Smv also supports relatively complicated operations on grouped data in a convenient way. To query the above data in interest, what we will do are:    
1. calculate average payroll per employment for each county;
2. use `smvGroupBy` to create a GroupedData by state;
3. use `smvTopNRecs` to filter top 5 counties with highest payroll per employee

In [12]:
df1.select(
    "ST_CD",
    "ZIPCODE",
    "GEO_TTL",
    (col("PAYANN")/col("EMP")).alias("PAYANN_per_EMP"),
  ).\
  smvGroupBy("ST_CD").smvTopNRecs(5, col("PAYANN_per_EMP").desc()).smvPdHead(10)

Unnamed: 0,ST_CD,ZIPCODE,GEO_TTL,PAYANN_per_EMP
0,MS,39558,"39558(LAKESHORE,MS)",120.5
1,MS,38723,"38723(AVON,MS)",79.085714
2,MS,39522,"39522(STENNIS SPACE CENTER,MS)",76.52809
3,MS,39146,"39146(PICKENS,MS)",75.519824
4,MS,39173,"39173(TINSLEY,MS)",69.588235
5,MT,59053,"59053(MARTINSDALE,MT)",108.0
6,MT,59256,"59256(RAYMOND,MT)",81.5
7,MT,59103,"59103(BILLINGS,MT)",74.634825
8,MT,59936,"59936(WEST GLACIER,MT)",72.730769
9,MT,59323,"59323(COLSTRIP,MT)",69.915695


The output from this quick query is that for each state there are 5 rows representing top 5 counties with highest payroll per employee and the corresponding value of payroll per employee.

<a name='bookmark3'/>
## Ad-hoc Profiling

Often some business questions are not easily answered by a simple line of queries, and when we would like to understand the "profile" of something, we can do ad-hoc profiling, which essentially consists of a series of relatively complicated analyses.

***Example:*** Suppose we want to know for each state, what is the percentage of counties' whose payroll per employee is both above state average and country average?

#### Method 1
Before calculating the percentage of counties that satisfy the criteria, normally you have to go through 2-3 steps to append the overall and group averages to each observation:   
1. calculate the country-wide average;
2. group by state and calculate the average per state;  
3. merge the averages to the original data and calculate the differences

In [13]:
# 1.get overall average
df1_ctry_avg = df1.select(col("PAYANN")/col("EMP")).groupBy().avg().collect()[0][0]
df1_ctry_avg

35.80894619816936

In [14]:
# 2.get state average
df1_state_avg = df1.smvSelectPlus((col("PAYANN")/col("EMP")).alias("PAYANN_per_EMP")).\
                groupBy("ST_CD").agg(avg(col("PAYANN_per_EMP")).alias("avg_state_PAYANN_per_EMP")).cache()

In [15]:
# 3.append average to each input row    
df1_join = df1.smvJoinByKey(df1_state_avg, ["ST_CD"], "inner").\
           smvSelectPlus((col("PAYANN")/col("EMP")).alias("PAYANN_per_EMP")).cache()

In [16]:
# 4.calculate the difference
df2 = df1_join.select(
    "ST_CD",
    "ZIPCODE",
    "PAYANN_per_EMP",
    (col("PAYANN_per_EMP") - df1_ctry_avg).alias("diff_avg_ctry_PAYANN_per_EMP"),
    (col("PAYANN_per_EMP") - col("avg_state_PAYANN_per_EMP")).alias("diff_avg_state_PAYANN_per_EMP")
)

In [17]:
# check results
df2.smvPdHead(5)

Unnamed: 0,ST_CD,ZIPCODE,PAYANN_per_EMP,diff_avg_ctry_PAYANN_per_EMP,diff_avg_state_PAYANN_per_EMP
0,MS,38601,28.836364,-6.972583,-1.833267
1,MS,38603,30.991031,-4.817915,0.3214
2,MS,38606,31.249766,-4.559181,0.580135
3,MS,38610,25.640777,-10.168169,-5.028854
4,MS,38611,33.876659,-1.932287,3.207028


#### Method 2

An easier way is to use Spark's **window functions**, which enables to calculate a value for each input row based on a group of rows. A definition of such a group of rows is a window specification, in this case we can define 2 window specifications: one for all rows in data and one for grouping by state.

In [18]:
# 1.define two window specifications to for overall and by-state operations

# no grouping and use all rows of data
ctry_window = Window.partitionBy()  

# group by "ST_CD" and use all rows in one group (state)
state_window = Window.partitionBy("ST_CD") 

In [19]:
# 2.get overall, group averages and calculate differences in one step
df3 = df1.smvSelectPlus((col("PAYANN")/col("EMP")).alias("PAYANN_per_EMP")).select(
    "ST",
    "ST_CD",
    "ZIPCODE",
    "PAYANN_per_EMP",
    (col("PAYANN_per_EMP") - avg(col("PAYANN_per_EMP")).over(ctry_window)).alias("diff_avg_ctry_PAYANN_per_EMP"),
    (col("PAYANN_per_EMP") - avg(col("PAYANN_per_EMP")).over(state_window)).alias("diff_avg_state_PAYANN_per_EMP")
)

In [20]:
# check results
df3.filter(col("ST_CD")=="MS").smvPdHead(5)

Unnamed: 0,ST,ST_CD,ZIPCODE,PAYANN_per_EMP,diff_avg_ctry_PAYANN_per_EMP,diff_avg_state_PAYANN_per_EMP
0,28,MS,38601,28.836364,-6.972583,-1.833267
1,28,MS,38603,30.991031,-4.817915,0.3214
2,28,MS,38606,31.249766,-4.559181,0.580135
3,28,MS,38610,25.640777,-10.168169,-5.028854
4,28,MS,38611,33.876659,-1.932287,3.207028


With the window specification, the average functions return the values on every input row, which can be used together with other raw columns on the row. Besides, given the flexibility of window functions, it is possible to calculate values on different grouping methods of rows in one step. 

In [21]:
# Find the counties with payroll per employee greater than state as well as country average
df4 = df3.groupBy("ST_CD").agg(
    count("ST_CD").alias("cnt_counties"),
    sum(when((col("diff_avg_ctry_PAYANN_per_EMP")>0)&(col("diff_avg_state_PAYANN_per_EMP")>0),1).otherwise(0))
       .alias("cnt_counties_avg_ge_state_ctry")
).smvSelectPlus(
    (col("cnt_counties_avg_ge_state_ctry")/col("cnt_counties")*100).alias("pct_counties_avg_ge_state_ctry")
).cache()

df4.orderBy(desc("pct_counties_avg_ge_state_ctry")).show(5)

+-----+------------+------------------------------+------------------------------+
|ST_CD|cnt_counties|cnt_counties_avg_ge_state_ctry|pct_counties_avg_ge_state_ctry|
+-----+------------+------------------------------+------------------------------+
|   RI|          77|                            38|             49.35064935064935|
|   DC|          69|                            33|             47.82608695652174|
|   AK|         153|                            68|             44.44444444444444|
|   NH|         224|                            98|                         43.75|
|   WY|         128|                            54|                       42.1875|
+-----+------------+------------------------------+------------------------------+
only showing top 5 rows

