### United States Census Bureau's 2017 Basic Monthly CPS, using Apache Spark

### **STEPS 1 - 2 :** Download the DOS/Windows for Dec. zip file, extract file.
+ Visit https://www.census.gov/data/datasets/2017/demo/cps/cps-basic-2017.html to download the zip file.
+ The downloaded file *dec17pub.zip*,is  available in the project root folder.
+ If not extracted, it will be extracted into the root folder
+ Load essential pyspark libraries and initialize spark context and session.
+ Display spark engine version as a validation for an active session.

In [9]:
import pathlib
import zipfile

zip_file = pathlib.Path('dec17pub.zip')
file_path = pathlib.Path('dec17pub.dat')
file_name = 'dec17pub.dat'

if not file_path.is_file():
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        # Check if the file exists in the ZIP archive
        if file_name in zip_ref.namelist():
            # Extract the file to a current location
            zip_ref.extract(file_name)
            print(f'\n{file_name} has been extracted.')


dec17pub.dat has been extracted.


In [10]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, format_number

spark = SparkSession.builder.appName('Data Engineer - Take Home Project').getOrCreate()
print(f'\nThe spark version is : {spark.version}')


The spark version is : 3.5.1


### **STEP 3 :** Showing a sample of *DOS/Windows for Dec* zip file
+ Open the file and map columns from each line to corresponding variables.
+ Add each mapped line to a List.
+ Create a dataframe *df_Master* from the List.
+ Display sample records from the file. 

In [11]:
# Create list to hold records from file
rows = list()

# Open the file for read operation
with open('dec17pub.dat','r') as file:
    
    # Read each line and map the columns to variables
    for line in file:
        full_household_identifier = line[:15]
        time_of_interview = line[17:21] + '/' + line[15:17]
        final_outcome_of_survey = line[23:26]
        type_of_housing_unit = line[31:32]
        household_type = line[61:62]
        household_has_telephone = line[33:34]
        household_can_access_telephone = line[35:36]
        is_telephone_interview_acceptable = line[37:38]
        type_of_interview = line[65:66]
        family_income_range = line[39:40]
        division_location =  line[90:91]
        race =  line[138:140]

        #Create a record from above variables and add record to a List
        item = (full_household_identifier,
                time_of_interview,
                final_outcome_of_survey,
                type_of_housing_unit,
                household_type,
                household_has_telephone,
                household_can_access_telephone,
                is_telephone_interview_acceptable,
                type_of_interview,
                family_income_range,    
                division_location,
                race)
        rows.append(item)

# Create a dataframe and display 3 sample records
from schema import master_schema
df_master = spark.createDataFrame(rows, master_schema)

print(f'\n\nA Sample Record from {file_name}')
df_master.show(1) 



A Sample Record from dec17pub.dat
+-------------------------+-----------------+----------------------------+-------------------------+-------------------+----------------------------+-----------------------------------+--------------------------------------+----------------------+------------------------+----------------------+---------+
|full_household_identifier|time_of_interview|final_outcome_of_survey_code|type_of_housing_unit_code|household_type_code|household_has_telephone_code|household_can_access_telephone_code|is_telephone_interview_acceptable_code|type_of_interview_code|family_income_range_code|division_location_code|race_code|
+-------------------------+-----------------+----------------------------+-------------------------+-------------------+----------------------------+-----------------------------------+--------------------------------------+----------------------+------------------------+----------------------+---------+
|          000004795110719|          2017/12| 

### **STEP 4 :** Answer to Questions 1 - 4 


#### **Question 1 :** What is the count of responders per family income range (show all)?
+ Create dataframe **df_family_income**, use it to decode Family Income Range
+ Select only required fields from *df_Master* for a faster runtime
+ Join the dataframes and generate the result
+ Please note - *there are null values in the outcome of this join based on findings from deeper analysis*

In [12]:
# Load schema and data for family income data structure
from schema import family_income_range_schema, family_income_range_data

df_family_income = spark.createDataFrame(family_income_range_data, family_income_range_schema)

# Create the join and run the analysis
question_1 = df_master.select('family_income_range_code')\
    .join(df_family_income, df_master.family_income_range_code == df_family_income.family_code, 'left')\
        .groupBy('family_income_range').count().orderBy('count', ascending = False)

# Format and display the result, include count of available records
question_1 = question_1.withColumn('count', format_number('count', 0))
num_of_records = question_1.count()

print(f'\n\nThe Count of Responders per Family Income Range')
question_1.toDF('FAMILY_INCOME_RANGE','COUNT_OF_RESPONDERS').show(num_of_records)



The Count of Responders per Family Income Range
+-------------------+-------------------+
|FAMILY_INCOME_RANGE|COUNT_OF_RESPONDERS|
+-------------------+-------------------+
|   LESS THAN $5,000|             33,315|
|    12,00 TO 14,999|             20,408|
|   15,000 TO 19,999|             20,222|
|   10,000 TO 12,999|             19,718|
|     7,500 TO 9,999|             15,719|
|     5,000 TO 7,499|             11,596|
|      30,000 TO 34,|              6,743|
|               NULL|              6,620|
|   20,000 TO 24,999|              6,312|
|   25,000 TO 29,999|              5,803|
+-------------------+-------------------+



#### **Question 2 :** What is the count of responders per geographical division/location and race (show top 10)?
+ Create dataframe **df_geo_location** and **df_race** to decode Location and Race
+ Select only required fields from *df_Master* for a faster execution time 
+ Join the dataframes and generate the result
+ Observation - *there are null values in the outcome of this join based on missing ref values discovered during analysis*

In [13]:
from schema import division_location_schema, race_schema, division_location_data, race_data

df_div_location = spark.createDataFrame(division_location_data, division_location_schema)
df_race = spark.createDataFrame(race_data, race_schema)

# Create the join and run the analysis
question_2 = df_master.select('division_location_code','race_code')\
    .join(df_div_location, df_master.division_location_code == df_div_location.div_loc_code, 'left')\
        .join(df_race, df_master.race_code == df_race.race_code, 'left')\
            .groupBy('division_location','race').count().orderBy('count', ascending = False)

# Format and display Top 10 only
question_2 = question_2.withColumn('count', format_number('count',0))

print(f'\n\nThe Count of Top 10 Responders per Geographical_Division_Location and Race')
question_2.toDF('DIVISION_LOCATION', 'RACE', 'COUNT_OF_RESPONDERS').show(10)



The Count of Top 10 Responders per Geographical_Division_Location and Race
+------------------+--------+-------------------+
| DIVISION_LOCATION|    RACE|COUNT_OF_RESPONDERS|
+------------------+--------+-------------------+
|    SOUTH ATLANTIC|    NULL|             27,609|
|           PACIFIC|    NULL|             20,659|
|          MOUNTAIN|    NULL|             18,470|
|WEST SOUTH CENTRAL|    NULL|             16,498|
|EAST NORTH CENTRAL|    NULL|             15,296|
|WEST NORTH CENTRAL|    NULL|             13,052|
|   MIDDLE ATLANTIC|    NULL|             12,756|
|       NEW ENGLAND|    NULL|             11,281|
|EAST SOUTH CENTRAL|    NULL|             10,345|
|           PACIFIC|Asian-HP|                 70|
+------------------+--------+-------------------+
only showing top 10 rows



#### **Question 3 :** How many responders do not have telephone in their house, but can access a telephone elsewhere and telephone interview is accepted?
+ Use *df_Master* for fast execution, no decoding is required

In [14]:
question_3 = df_master.where(
    (col('household_has_telephone_code') == lit('2')) &
    (col('household_can_access_telephone_code')  == lit('1')) &
    (col('is_telephone_interview_acceptable_code') == lit('1'))
).count()

print(f'\nThe answer to Question (3) is : {question_3}')


The answer to Question (3) is : 635


#### **Question 4 :** How many responders can access a telephone, but telephone interview is not accepted?
+ Select only required fields from *df_Master* for fast execution time 
+ Observation - data values for '*Is telephone interview acceptable*' shows values (0,1) instaed of (1,2) as expected.

In [15]:
question_4 = df_master.where(
        (col('household_can_access_telephone_code')  == lit('1')) &
        (col('is_telephone_interview_acceptable_code') == lit('2'))
    ).count()

print(f'\nThe answer to Question (4) is : {question_4}')


The answer to Question (4) is : 0


#### Clean Project Folder
+ The unzipped file is so large (128MB), so let us delete it since we have it's zipped version

In [16]:
file_path.unlink()
print(f'\n{file_name} has been deleted succesfully' )


dec17pub.dat has been deleted succesfully
