__Description__: Insight analysis of the United States Census Bureau's 2017 Basic Monthly CPS for December 2017.

__Setting up__

In [None]:
#installing dependencies

#!pip install pyspark

In [None]:
# Importing dependencies

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, substring, trim, lit, when, date_format, concat_ws
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql import DataFrame
from typing import Dict
import re
import pandas as pd
import numpy as np

In [None]:

var_names = ["HRHHID", "HRMONTH", "HRYEAR4", "HRHHID2", "HUFINAL", "HEHOUSUT", "HRHTYPE", "HETELHHD", "HETELAVL", "HEPHONEO", "HUINTTYP", "HEFAMINC", "GEDIV", "PTDTRACE"]



In [None]:
# Creating the Spark session
spark = SparkSession.builder.appName("CPSdec2017Analysis").getOrCreate()

In [None]:
# Loading in the data- a .dat file- which is in a fixed-width format, using read.text
file_path = "/content/drive/MyDrive/my assessments/Interswitch assessment/dec17pub/dec17pub.dat"
lines = spark.read.text(file_path).select("value")

In [None]:
# Extracting needed fields
# first, defining needed field names and their starting and ending positions
fields = [
    ("HRHHID", 0, 15),            #household identifier1
    ("HRMONTH", 15, 17),          # month
    ("HRYEAR4", 17, 21),          # Year
    ("HRHHID2", 70, 75),          # Household identifier2
    ("HUFINAL", 23, 26),          # Final outcome
    ("HEHOUSUT", 30, 32),         # Type of housing unit
    ("HRHTYPE", 60, 62),          # Household type
    ("HETELHHD", 32, 34),         # Telephone in household?
    ("HETELAVL", 34, 36),         # Telephone accessible elsewhere?
    ("HEPHONEO", 36, 38),         # Telephone interview acceptable?
    ("HUINTTYP", 64, 66),         # Type of interview
    ("HEFAMINC", 38, 40),         # Family income
    ("GEDIV", 90, 91),            # Division - geographical
    ("PTDTRACE", 138, 140)        # Race
]


# collecting needed fields using substring and trim
cps_data = []
for field, start, end in fields:
    cps_data.append(substring(trim(col("value")), start + 1, end - start).alias(field))

# Now, creating the dataframe
cps_2017_df = lines.select(*cps_data)

# Combining the Year and month and applying dat_format to convert it into YYYY/MMM format
cps_2017_df = cps_2017_df.withColumn("YEAR_MONTH", date_format(concat_ws("-", col("HRYEAR4"), col("HRMONTH"), lit("01")), "yyyy/MMM"))

# Combining "HRHHID" and "HRHHID2" to get full_household_id
cps_2017_df = cps_2017_df.withColumn("FULL_HOUSEHOLD_ID", concat_ws("", col("HRHHID"), col("HRHHID2")))

# dropping individual columns used in the concatenation
columns_to_drop = ["HRYEAR4", "HRMONTH", "HRHHID", "HRHHID2"]
cps_2017_df = cps_2017_df.drop(*columns_to_drop)

# Re-arranging columns
new_column_order = ["FULL_HOUSEHOLD_ID", "YEAR_MONTH"] + [col_name for col_name in cps_2017_df.columns if col_name not in ("FULL_HOUSEHOLD_ID", "YEAR_MONTH")]
cps_2017_df = cps_2017_df.select(*new_column_order)

# finally, renaming the columns for clarity
mapping_for_renaming = {
    "FULL_HOUSEHOLD_ID":"household_identifier",
    "YEAR_MONTH":"time_of_interview",
    "HUFINAL":"survey_final_outcome",
    "HEHOUSUT":"type_housing_unit",
    "HRHTYPE":"household type",
    "HETELHHD":"telephone_in_household",
    "HETELAVL":"telephone_accessible_elsewhere",
    "HEPHONEO":"telephone_interview_acceptable",
    "HUINTTYP":"interview_type",
    "HEFAMINC":"family_income_range",
    "GEDIV":"geographical_division",
    "PTDTRACE":"race"
    }

for old_col_name, new_col_name in mapping_for_renaming.items():
    cps_2017_df = cps_2017_df.withColumnRenamed(old_col_name, new_col_name)

# Showing the first 10 rows
cps_2017_df.show(10)

#filtered_df = cps_2017_df.filter(col("race") == 26)

#filtered_df.show(10)

+--------------------+-----------------+--------------------+-----------------+--------------+----------------------+------------------------------+------------------------------+--------------+-------------------+---------------------+----+
|household_identifier|time_of_interview|survey_final_outcome|type_housing_unit|household type|telephone_in_household|telephone_accessible_elsewhere|telephone_interview_acceptable|interview_type|family_income_range|geographical_division|race|
+--------------------+-----------------+--------------------+-----------------+--------------+----------------------+------------------------------+------------------------------+--------------+-------------------+---------------------+----+
|00000479511071906011|         2017/Dec|                 201|                1|             1|                     1|                            -1|                             1|             2|                  9|                    6|   1|
|00000479511071906011|         2

In [None]:
# Looking at the number of rows in the data

cps_2017_df.count()

146456

__Before answering the questions__

what to note:


* First, from the data dictionary, it can be deduced that any value outside those stated to be valid is an "invalid entry".

 So, in answering the questions, where necessary, invalid entries will be filtered out and only valid entries will be considered.


*   Second, decoding will be done based on fields required in the questions.





The questions:

1. What is the count of responders per family income range (show all)?
2. What is the count of responders per geographical division/location and race
3. How many responders do not have telephone in their house, but can access (a telephone elsewhere and telephone interview is accepted? show top 10)?
4. How many responders can access a telephone, but telephone interview is not accepted?

 From the questions listed above, the fields required are:


*   family_income_range
*   geographical_division
*   race
*   telephone_in_household
*   telephone_accessible_elsewhere
*   telephone_interview_acceptable


So, some more data preprocessing before answering the questions- __let's decode these fields__




In [None]:
# family_income_range dictionary

income_range_mapping = {
    "1": "LESS THAN $5,000",
    "2": "$5,000 TO $7,499",
    "3": "$7,500 TO $9,999",
    "4": "$10,000 TO $12,499",
    "5": "$12,500 TO $14,999",
    "6": "$15,000 TO $19,999",
    "7": "$20,000 TO $24,999",
    "8": "$25,000 TO $29,999",
    "9": "$30,000 TO $34,999",
    "10": "$35,000 TO $39,999",
    "11": "$40,000 TO $49,999",
    "12": "$50,000 TO $59,999",
    "13": "$60,000 TO $74,999",
    "14": "$75,000 TO $99,999",
    "15": "$100,000 TO $149,999",
    "16": "$150,000 OR MORE"
    }

# geographical_division

division_mapping = {
    "1": "NEW ENGLAND",
    "2": "MIDDLE ATLANTIC",
    "3": "EAST NORTH CENTRAL",
    "4": "WEST NORTH CENTRAL",
    "5": "SOUTH ATLANTIC",
    "6": "EAST SOUTH CENTRAL",
    "7": "WEST SOUTH CENTRAL",
    "8": "MOUNTAIN",
    "9": "PACIFIC"
}

# race

race_mapping = {
    "1": "White Only",
    "2": "Black Only",
    "3": "American Indian, Alaskan Native Only",
    "4": "Asian Only",
    "5": "Hawaiian/Pacific Islander Only",
    "6": "White-Black",
    "7": "White-AI",
    "8": "White-Asian",
    "9": "White-HP",
    "10": "Black-AI",
    "11": "Black-Asian",
    "12": "Black-HP",
    "13": "AI-Asian",
    "14": "AI-HP",
    "15": "Asian-HP",
    "16": "W-B-AI",
    "17": "W-B-A",
    "18": "W-B-HP",
    "19": "W-AI-A",
    "20": "W-AI-HP",
    "21": "W-A-HP",
    "22": "B-AI-A",
    "23": "W-B-AI-A",
    "24": "W-AI-A-HP",
    "25": "Other 3 Race Combinations",
    "26": "Other 4 and 5 Race Combinations"
}

# Single "YES", "No" mapping for the telephone_in_household and telephone_accessible questions- for all telephone-related questions

telephone_mapping = {"1": "YES", "2": "NO"}


# One decoding function
def decode_encoded_values(df: DataFrame, column_name: str, mapping: Dict[str, str]) -> DataFrame:
    category_udf = udf(lambda x: mapping.get(x.strip(), "INVALID ENTRY"), StringType())   # If the value is not in the mapping, tag as "invalid entry"
    return df.withColumn(column_name, category_udf(trim(col(column_name))))

In [None]:
cps_2017_df.printSchema

<bound method DataFrame.printSchema of DataFrame[household_identifier: string, time_of_interview: string, survey_final_outcome: string, type_housing_unit: string, household type: string, telephone_in_household: string, telephone_accessible_elsewhere: string, telephone_interview_acceptable: string, interview_type: string, family_income_range: string, geographical_division: string, race: string]>

In [None]:
# Applying the function for the mappings

# family_income_range
cps_2017_df = decode_encoded_values(cps_2017_df, "family_income_range", income_range_mapping)

# geographical_division
cps_2017_df = decode_encoded_values(cps_2017_df, "geographical_division", division_mapping)

# geographical_division
cps_2017_df = decode_encoded_values(cps_2017_df, "race", race_mapping)

# telephone_in_household
cps_2017_df = decode_encoded_values(cps_2017_df, "telephone_in_household", telephone_mapping)

# telephone_accessible_elsewhere
cps_2017_df = decode_encoded_values(cps_2017_df, "telephone_accessible_elsewhere", telephone_mapping)

# telephone_interview_acceptable
cps_2017_df = decode_encoded_values(cps_2017_df, "telephone_interview_acceptable", telephone_mapping)

# Show the DataFrame
cps_2017_df.show(10)


+--------------------+-----------------+--------------------+-----------------+--------------+----------------------+------------------------------+------------------------------+--------------+-------------------+---------------------+-------------+
|household_identifier|time_of_interview|survey_final_outcome|type_housing_unit|household type|telephone_in_household|telephone_accessible_elsewhere|telephone_interview_acceptable|interview_type|family_income_range|geographical_division|         race|
+--------------------+-----------------+--------------------+-----------------+--------------+----------------------+------------------------------+------------------------------+--------------+-------------------+---------------------+-------------+
|00000479511071906011|         2017/Dec|                 201|                1|             1|                   YES|                 INVALID ENTRY|                           YES|             2| $30,000 TO $34,999|   EAST SOUTH CENTRAL|   White On

__Insight analysis__: Answering the questions

__Question 1: What is the count of responders per family income range (show all)?__

To do this, we group by family_income_range and take the count of responders in each income range. First, let's exclude invaliid entries

In [None]:
cps_incomerange_valid = cps_2017_df.filter(col("family_income_range") != "INVALID ENTRY")

family_income_range_count = cps_incomerange_valid.groupBy("family_income_range").count()

family_income_range_count.show(truncate=False) # To show all, truncate=False


+--------------------+-----+
|family_income_range |count|
+--------------------+-----+
|$35,000 TO $39,999  |6620 |
|$5,000 TO $7,499    |1625 |
|$30,000 TO $34,999  |6743 |
|$7,500 TO $9,999    |2277 |
|$25,000 TO $29,999  |5803 |
|$20,000 TO $24,999  |6312 |
|$10,000 TO $12,499  |3161 |
|$50,000 TO $59,999  |9971 |
|$40,000 TO $49,999  |9788 |
|LESS THAN $5,000    |3136 |
|$12,500 TO $14,999  |2614 |
|$75,000 TO $99,999  |16557|
|$60,000 TO $74,999  |13442|
|$100,000 TO $149,999|17794|
|$150,000 OR MORE    |15704|
|$15,000 TO $19,999  |4518 |
+--------------------+-----+



__Question 2__: What is the count of responders per geographical division/location and race

Here, we group by both geographical division/location and race and then we count. We also first filter out invalid entries.

In [None]:
from pyspark.sql import functions as F

division_race_valid = cps_2017_df.filter((col("geographical_division") != "INVALID ENTRY") & (col("race") != "INVALID ENTRY"))

division_race_count = division_race_valid.groupBy("geographical_division", "race").count()

division_race_count_top_10 = division_race_count.orderBy(F.desc("count")).limit(10)

division_race_count_top_10.show()

+---------------------+----------+-----+
|geographical_division|      race|count|
+---------------------+----------+-----+
|       SOUTH ATLANTIC|White Only|16999|
|             MOUNTAIN|White Only|14343|
|              PACIFIC|White Only|13214|
|   EAST NORTH CENTRAL|White Only|11325|
|   WEST SOUTH CENTRAL|White Only|11248|
|   WEST NORTH CENTRAL|White Only| 9884|
|      MIDDLE ATLANTIC|White Only| 8487|
|          NEW ENGLAND|White Only| 8410|
|   EAST SOUTH CENTRAL|White Only| 6580|
|       SOUTH ATLANTIC|Black Only| 4899|
+---------------------+----------+-----+



__Question 3: How many responders do not have telephone in their house, but can access
a telephone elsewhere and telephone interview is accepted?__

In [None]:
df_filtered = cps_2017_df.filter(         # Not forgeting to filter out invalid entries
    (col("telephone_in_household") != "INVALID ENTRY") &
    (col("telephone_accessible_elsewhere") != "INVALID ENTRY") &
    (col("telephone_interview_acceptable") != "INVALID ENTRY")
)


telephone_responders_filtered = df_filtered.filter(
    (col("telephone_in_household") == "NO") &
    (col("telephone_accessible_elsewhere") == "YES") &
    (col("telephone_interview_acceptable") == "YES")
)

# Computing the count
responders_count = telephone_responders_filtered.count()

# Printing result
print(responders_count, "responders do not have a telephone in their house but can access elsewhere and telephone interview is accepted:")


633 responders do not have a telephone in their house but can access elsewhere and telephone interview is accepted:


__Question 4__: How many responders can access a telephone, but telephone interview is not accepted?__

In [None]:
filtered_df = cps_2017_df.filter(
    (col("telephone_in_household") != "INVALID ENTRY") &
    (col("telephone_accessible_elsewhere") != "INVALID ENTRY") &
    (col("telephone_interview_acceptable") != "INVALID ENTRY")
)

filtered_responders = filtered_df.filter(
    (col("telephone_in_household") == "YES") |
    (col("telephone_accessible_elsewhere") == "YES") &
    (col("telephone_interview_acceptable") == "NO")
)

num_of_responders = filtered_responders.count()

print(num_of_responders, "responders can access a telephone but telephone interview is not accepted:")


4231 responders can access a telephone but telephone interview is not accepted:
