# The prediction task is to determine whether a person makes over $50K a year.

This data was extracted from the 1994 Census bureau database. (Data Mining and Visualization, Silicon Graphics). 

#!wget https://raw.githubusercontent.com/lemire/RealisticTabularDataSets/master/census-income/census-income_srt.csv.gz # Source

# Description of fnlwgt (final weight)

The weights on the Current Population Survey (CPS) files are controlled to independent estimates of the civilian 
noninstitutional population of the US. These are prepared monthly for us by Population Division here at the Census Bureau. 
We use 3 sets of controls. These are:

1.A single cell estimate of the population 16+ for each state.

2.Controls for Hispanic Origin by age and sex.

3.Controls by Race, age and sex.

We use all three sets of controls in our weighting program and "rake" through them 6 times so that by the end 
we come back to all the controls we used. The term estimate refers to population totals derived from CPS by 
creating "weighted tallies" of any specified socio-economic characteristics of the population. 
People with similar demographic characteristics should have similar weights. There is one important caveat to 
remember about this statement. That is that since the CPS sample is actually a collection of 51 state samples, 
each with its own probability of selection, the statement only applies within state.

In [1]:
# Required packages and environment varibles to import before running the code.

import os
import time
import findspark
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3.6'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6'
#/opt/cloudera/parcels/SPARK2/lib/spark2
findspark.init('/opt/cloudera/parcels/SPARK2/lib/spark2')

In [2]:
# path of the dataset = /home/nuveprotech/work/census-income_srt.csv.gz.
# copy this dataset to hadoop file system (hdfs).
# hadoop fs -copyFromLocal census-income_srt.csv.gz /user/nuveprotech/

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pyspark-df-overview").getOrCreate()

In [3]:
df = spark.read.csv("hdfs://ip-10-0-1-20.ec2.internal:8020/user/nuveprotech/census-income_srt.csv.gz", header=True)

In [4]:
df.printSchema()

root
 |-- 22: string (nullable = true)
 |--  Private: string (nullable = true)
 |--  33: string (nullable = true)
 |--  19: string (nullable = true)
 |--  Some college but no degree: string (nullable = true)
 |--  575: string (nullable = true)
 |--  College or university: string (nullable = true)
 |--  Divorced: string (nullable = true)
 |--  Retail trade: string (nullable = true)
 |--  Sales: string (nullable = true)
 |--  Asian or Pacific Islander: string (nullable = true)
 |--  All other: string (nullable = true)
 |--  Female: string (nullable = true)
 |--  No13: string (nullable = true)
 |--  Not in universe14: string (nullable = true)
 |--  Children or Armed Forces: string (nullable = true)
 |--  016: string (nullable = true)
 |--  017: string (nullable = true)
 |--  018: string (nullable = true)
 |--  Head of household: string (nullable = true)
 |--  Midwest: string (nullable = true)
 |--  Minnesota: string (nullable = true)
 |--  Child 18+ ever marr RP of subfamily: string (null

In [5]:
import pyspark.sql.types as t
census_schema = t.StructType([
      t.StructField('age', t.IntegerType(), True)
    , t.StructField('workclass', t.StringType(), True)
    , t.StructField('fnlwgt', t.IntegerType(), True)
    , t.StructField('education', t.StringType(), True)
    , t.StructField('education-num', t.IntegerType(), True)
    , t.StructField('marital-status', t.StringType(), True)
    , t.StructField('occupation', t.StringType(), True)
    , t.StructField('relationship', t.StringType(), True)
    , t.StructField('race', t.StringType(), True)
    , t.StructField('sex', t.StringType(), True)
    , t.StructField('capital-gain', t.DoubleType(), True)
    , t.StructField('capital-loss', t.DoubleType(), True)
    , t.StructField('hours-per-week', t.IntegerType(), True)
    , t.StructField('native-country', t.StringType(), True)
    , t.StructField('label', t.StringType(), True)
])

In [6]:
df = spark.read.csv("hdfs://ip-10-0-1-20.ec2.internal:8020/user/nuveprotech/census-income_srt.csv.gz", header=True, schema=census_schema)

In [7]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- label: string (nullable = true)



In [8]:
df.count()

199522

In [9]:
df = df.drop('fnlwgt')

In [10]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- label: string (nullable = true)



In [11]:
from pyspark.sql.functions import count, avg, desc

In [12]:
df.groupBy(['education']). \
agg(
    count('*').alias('qty'), 
    avg('age').alias('avg_age')
).orderBy(desc('qty')). \
show()

+---------+------+------------------+
|education|   qty|           avg_age|
+---------+------+------------------+
|        0|100684|  30.4666481268126|
|        2|  8756| 42.35929648241206|
|       26|  7887| 36.99987320907823|
|       19|  5412| 33.11326681448632|
|       29|  5105| 31.13379040156709|
|       36|  4145| 38.13148371531966|
|       34|  4025| 37.75453416149068|
|       10|  3683| 40.27694814010318|
|       16|  3445| 40.43570391872279|
|       23|  3392| 39.56367924528302|
|       12|  3340| 40.41017964071856|
|       33|  3325| 39.17834586466165|
|        3|  3195|40.583098591549295|
|       35|  3168|40.339962121212125|
|       38|  3003| 40.14085914085914|
|       31|  2699|  39.9284920340867|
|       32|  2398| 36.92577147623019|
|       37|  2234|37.978961504028646|
|        8|  2151|40.201301720130175|
|       42|  1918| 34.06308654848801|
+---------+------+------------------+
only showing top 20 rows



In [13]:
df.createOrReplaceTempView("census")

In [14]:
s = spark.sql("""
SELECT 
    education, 
    COUNT(*) AS qty, 
    AVG(age) AS avg_age
FROM census
GROUP BY education
""")
s.show()

+---------+----+------------------+
|education| qty|           avg_age|
+---------+----+------------------+
|       26|7887| 36.99987320907823|
|       28|1661| 39.00842865743528|
|        6| 441| 40.01133786848072|
|       23|3392| 39.56367924528302|
|       12|3340| 40.41017964071856|
|       11| 637| 42.24960753532182|
|       16|3445| 40.43570391872279|
|        3|3195|40.583098591549295|
|        4|1364|40.894428152492665|
|        7| 731| 43.83036935704514|
|       14| 932|37.832618025751074|
|       44|1592| 34.88693467336683|
|        9| 738|  42.0609756097561|
|       43|1382|48.490593342981185|
|       36|4145| 38.13148371531966|
|       29|5105| 31.13379040156709|
|       46|  36|31.305555555555557|
|       15| 815| 36.32515337423313|
|       22| 411|36.695863746958636|
|       32|2398| 36.92577147623019|
+---------+----+------------------+
only showing top 20 rows



In [15]:
def my_query(field):
    return df.groupBy([field]). \
    agg(
        count('*').alias('qty'), 
        avg('age').alias('avg_age')
    ).orderBy(desc('qty'))

In [16]:
print(my_query('workclass').show())

+--------------------+------+------------------+
|           workclass|   qty|           avg_age|
+--------------------+------+------------------+
|     Not in universe|100245|30.512973215621727|
|             Private| 72027| 36.93916170325017|
| Self-employed-no...|  8445| 44.92161042036708|
|    Local government|  7784| 41.90506166495375|
|    State government|  4227|39.911757747811684|
| Self-employed-in...|  3265|45.864931087289435|
|  Federal government|  2925|42.342564102564104|
|        Never worked|   439|19.888382687927106|
|         Without pay|   165| 38.68484848484849|
+--------------------+------+------------------+

None


In [17]:
df.select('age', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week').describe().show()

+-------+----+-------------+------------+------------+--------------+
|summary| age|education-num|capital-gain|capital-loss|hours-per-week|
+-------+----+-------------+------------+------------+--------------+
|  count|   0|            0|           0|           0|             0|
|   mean|null|         null|        null|        null|          null|
| stddev|null|         null|        null|        null|          null|
|    min|null|         null|        null|        null|          null|
|    max|null|         null|        null|        null|          null|
+-------+----+-------------+------------+------------+--------------+



In [18]:
df.select('workclass', 'education', 'marital-status').describe().show()

+-------+-------------------+------------------+------------------+
|summary|          workclass|         education|    marital-status|
+-------+-------------------+------------------+------------------+
|  count|             199522|            199522|            199522|
|   mean|               null|11.306517577009052| 55.42430408676737|
| stddev|               null| 14.45422987661719|274.89468181014723|
|    min| Federal government|                 0|                 0|
|    max|        Without pay|                 9|              9999|
+-------+-------------------+------------------+------------------+



In [19]:
df.freqItems(['marital-status']).show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|marital-status_freqItems                                                                                                                                                                                                                                                                                                                                                                                                                             |
+-----------------------------------------------------------------------------------------------------------------------

In [20]:
from pyspark.sql.functions import isnan, when, count, col

In [21]:
# All columns
# cols = df.columns
# Selected columns
cols = ['workclass', 'education-num', 'occupation', 'hours-per-week', 'native-country']

# https://stackoverflow.com/a/44631639/570393
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in cols]).show()

+---------+-------------+----------+--------------+--------------+
|workclass|education-num|occupation|hours-per-week|native-country|
+---------+-------------+----------+--------------+--------------+
|   199522|       199522|    199522|        199522|        199522|
+---------+-------------+----------+--------------+--------------+



In [22]:
# Total rows
print('total rows: %s' % df.count())

# After droping NA records
print('only complete rows: %s' % df.dropna().count())

total rows: 199522
only complete rows: 0


In [23]:
def show_df(df, field='occupation'):
    df.groupBy(field).count().show()

In [24]:
show_df(df)

+--------------------+------+
|          occupation| count|
+--------------------+------+
| College or unive...|  5687|
|         High school|  6892|
|     Not in universe|186943|
+--------------------+------+



In [25]:
# Fill with a fixed value
new_df = df.fillna({'occupation': 'Other-service'})

# Count 
show_df(new_df)

+--------------------+------+
|          occupation| count|
+--------------------+------+
| College or unive...|  5687|
|         High school|  6892|
|     Not in universe|186943|
+--------------------+------+



In [26]:
# This is distributed
df_spark = df.groupBy('workclass').agg(count('*').alias('counts')).orderBy('counts')
# df_spark.show()

# This is running on driver
df_wk = df_spark.toPandas()

In [27]:
df_wk.to_csv('output.csv')