In [20]:
# import modules
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, substring, create_map, lit, udf, concat_ws
from pyspark.sql.types import StringType
from itertools import chain

#import created module from parent directory
import map as m

In [21]:
# create spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('Interswitch') \
    .getOrCreate()

In [22]:
# read the dataset
df = spark.read \
    .text('dec17pub.dat')

In [35]:
# show the first 20 rows
df.show(10)

+--------------------+
|               value|
+--------------------+
|00000479511071912...|
|00000479511071912...|
|00007169100494112...|
|00007169100494112...|
|00007169100494112...|
|00011017798798612...|
|00011017798798612...|
|00011020659338112...|
|00011028481568012...|
|00011032785646912...|
+--------------------+
only showing top 10 rows



In [24]:
# select specified columns
df2 = df.select(
    (substring('value', 1, 15)).alias("household_identifier"),\
    (substring('value', 18, 4)).alias("year_of_interview"),\
    (substring('value', 16, 2)).alias("month_of_interview"),\
    (substring('value', 24, 3)).alias("final_outcome_code"),\
    (substring('value', 31, 2)).alias("type_of_housing_unit"),\
    (substring('value', 61, 2)).alias("household_type"),\
    (substring('value', 33, 2)).alias("apartment_has_telephone"),\
    (substring('value', 35, 2)).alias("access_telephone_elsewhere"),\
    (substring('value', 37, 2)).alias("telephone_interview_acceptable"),\
    (substring('value', 65, 2)).alias("type_of_interview"),\
    (substring('value', 39, 2)).alias("family_income"),\
    (substring('value', 89, 2)).alias("geographic_information"),\
    (substring('value', 139, 2)).alias("race")
)

In [25]:
# show selection
df2.show()

+--------------------+-----------------+------------------+------------------+--------------------+--------------+-----------------------+--------------------------+------------------------------+-----------------+-------------+----------------------+----+
|household_identifier|year_of_interview|month_of_interview|final_outcome_code|type_of_housing_unit|household_type|apartment_has_telephone|access_telephone_elsewhere|telephone_interview_acceptable|type_of_interview|family_income|geographic_information|race|
+--------------------+-----------------+------------------+------------------+--------------------+--------------+-----------------------+--------------------------+------------------------------+-----------------+-------------+----------------------+----+
|     000004795110719|             2017|                12|               201|                   1|             1|                      1|                        -1|                             1|                2|            9| 

In [26]:
# create expression of the values using created module
outcome_expr = create_map([lit(x) for x in chain(*m.outcome_map.items())])
housing_expr = create_map([lit(x) for x in chain(*m.housing_map.items())])
housing_t_expr = create_map([lit(x) for x in chain(*m.housing_t_map.items())])
yes_no_expr = create_map([lit(x) for x in chain(*m.yes_no_map.items())])
interview_expr = create_map([lit(x) for x in chain(*m.interview_map.items())])
income_expr = create_map([lit(x) for x in chain(*m.income_map.items())])
geography_expr = create_map([lit(x) for x in chain(*m.geography_map.items())])
race_expr = create_map([lit(x) for x in chain(*m.race_map.items())])

In [27]:
# decoding the data
df3 = df2.withColumn('time_of_interview', concat_ws('/',df2.year_of_interview,df2.month_of_interview))\
    .withColumn('final_outcome_code', outcome_expr[df2['final_outcome_code']])\
    .withColumn('type_of_housing_unit', housing_expr[df2['type_of_housing_unit']])\
    .withColumn('household_type', housing_t_expr[df2['household_type']])\
    .withColumn('apartment_has_telephone', yes_no_expr[df2['apartment_has_telephone']])\
    .withColumn('access_telephone_elsewhere', yes_no_expr[df2['access_telephone_elsewhere']])\
    .withColumn('telephone_interview_acceptable', yes_no_expr[df2['telephone_interview_acceptable']])\
    .withColumn('type_of_interview', interview_expr[df2['type_of_interview']])\
    .withColumn('family_income', income_expr[df2['family_income']])\
    .withColumn('geographic_information', geography_expr[df2['geographic_information']])\
    .withColumn('race', race_expr[df2['race']])\
    .select('household_identifier','time_of_interview','final_outcome_code','type_of_housing_unit','household_type',
            'apartment_has_telephone','access_telephone_elsewhere','telephone_interview_acceptable',
            'type_of_interview','family_income','geographic_information','race')

In [28]:
# showing the dataset
df3.show(10)

+--------------------+-----------------+------------------+--------------------+--------------------+-----------------------+--------------------------+------------------------------+-----------------+------------------+----------------------+----------+
|household_identifier|time_of_interview|final_outcome_code|type_of_housing_unit|      household_type|apartment_has_telephone|access_telephone_elsewhere|telephone_interview_acceptable|type_of_interview|     family_income|geographic_information|      race|
+--------------------+-----------------+------------------+--------------------+--------------------+-----------------------+--------------------------+------------------------------+-----------------+------------------+----------------------+----------+
|     000004795110719|          2017/12|     CAPI complete|House, apartment,...|Husband/wife prim...|                    Yes|                      null|                           Yes|        Telephone|$30,000 to $34,999|               

In [29]:
# creating spark sql table
df3.createOrReplaceTempView('dec_cps')

In [30]:
spark.sql("""
    SELECT COUNT(*) as number
    FROM dec_cps
""").show()

+------+
|number|
+------+
|146456|
+------+



1.	What is the count of responders per family income range (show top 10)?

In [31]:
spark.sql("""
    SELECT family_income, COUNT(*) as number
    FROM dec_cps
    WHERE family_income IS NOT NULL
    GROUP BY family_income
    ORDER BY number DESC
    LIMIT 10
""").show()

+--------------------+------+
|       family_income|number|
+--------------------+------+
|$100,000 to $149,999| 17794|
|  $75,000 to $99,999| 16557|
|    $150,000 or more| 15704|
|  $60,000 to $74,999| 13442|
|  $50,000 to $59,999|  9971|
|  $40,000 to $49,999|  9788|
|  $30,000 to $34,999|  6743|
|  $35,000 to $39,999|  6620|
|  $20,000 to $24,999|  6312|
|  $25,000 to $29,999|  5803|
+--------------------+------+



2.	What is the count of responders per geographical division/location and race (show top 10)?

In [32]:
spark.sql("""
    SELECT geographic_information, race, COUNT(*) as number
    FROM dec_cps
    WHERE race IS NOT NULL
    GROUP BY geographic_information, race
    ORDER BY number DESC
    LIMIT 10
""").show()

+----------------------+----------+------+
|geographic_information|      race|number|
+----------------------+----------+------+
|                 South|White Only| 34827|
|                  West|White Only| 27557|
|  Midwest (formerly...|White Only| 21209|
|             Northeast|White Only| 16897|
|                 South|Black Only|  8775|
|                  West|Asian Only|  3447|
|  Midwest (formerly...|Black Only|  2005|
|             Northeast|Black Only|  1712|
|                 South|Asian Only|  1483|
|             Northeast|Asian Only|  1131|
+----------------------+----------+------+



3.	How many responders do not have telephone in their house, but can access a telephone elsewhere and telephone interview is accepted (show top 10)?

In [33]:
spark.sql("""
    SELECT COUNT(*) as number
    FROM dec_cps
    WHERE apartment_has_telephone = "No"
    AND access_telephone_elsewhere = "Yes"
    AND telephone_interview_acceptable = "Yes"
""").show()

+------+
|number|
+------+
|   633|
+------+



4.	How many responders have access to a telephone, but telephone interview is not accepted (show top 10)?

In [34]:
spark.sql("""
    SELECT COUNT(*) as number
    FROM dec_cps
    WHERE apartment_has_telephone = "Yes"
    AND access_telephone_elsewhere = "Yes"
    AND telephone_interview_acceptable = "No"
""").show()

+------+
|number|
+------+
|     0|
+------+

