## Data Assessment using Pyspark

I was given a Technical Assessment by **Interswitch** to analyse the United States Census Bureau's 2017 Basic Monthly CPS, using Apache Spark (with preferably Python or Scala).

The instructions are as follows:    
Step 1: Visit the United States Census Bureau website and access the 2017 Basic Monthly CPS page.    
Step 2: Download the DOS/Windows for December zip file and extract the dat file. It contains data captured per respondent.    
Step 3: Using the data dictionary file, extract the following information, and show a sample of 10 records only:
1.	Full household identifier.   
2.	Time of interview in YYYY/MMM format.
3.	Final outcome of the survey.
4.	Type of housing unit.
5.	Household type.
6.	Apartment/Household has a telephone.
7.	Apartment/Household can access a telephone elsewhere.
8.	Is telephone interview acceptable for the responder.
9.	Type of interview.
10.	Family income range.
11.	Geographical division/location.
12.	Race.     


Step 4: Using the custom data extracted in step 3, answer the below questions:
1.	What is the count of responders per family income range (show top 10)?
2.	What is the count of responders per geographical division/location and race (show top 10)?
3.	How many responders do not have telephone in their house, but can access a telephone elsewhere and telephone interview is accepted (show top 10)?
4.	How many responders can access to a telephone, but telephone interview is not accepted (show top 10)?     


**Note:** *Where a value is encoded, always return the actual (decoded) value.


#### Column names and their locations in the data dictionary

1.	Full household identifier.  1-15 + 71 - 75
2.	Time of interview in YYYY/MMM format. 18-21 / 16-17
3.	Final outcome of the survey.24-26
4.	Type of housing unit. 31 - 32
5.	Household type. 61 - 62
6.	Apartment/Household has a telephone. 33 - 34
7.	Apartment/Household can access a telephone elsewhere. 35 - 36
8.	Is telephone interview acceptable for the responder. 37 - 38
9.	Type of interview.   65 - 66
10.	Family income range.  39 - 40
11.	Geographical division/location. 91 - 91
12.	Race.   139 â€“ 140


In [None]:
# import neccessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, substring
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

In [None]:
# Create Spark Session
spark = SparkSession.builder.appName("US_Census").getOrCreate()

In [None]:
# Define the schema based on the data dictionary
schema = StructType([
    StructField("household_identifier_1", StringType(), True),
    StructField("household_identifier_2", StringType(), True),
    StructField("Time_of_interview_mm", StringType(), True),
    StructField("Time_of_interview_yy", StringType(), True),
    StructField("Final_outcome_of_the_survey", StringType(), True),
    StructField("Type_of_housing_unit", StringType(), True),
    StructField("Household_type", StringType(), True),
    StructField("Apartment_Household_has_a_telephone", StringType(), True),
    StructField("Apartment_Household_can_access_a_telephone_elsewhere", StringType(), True),
    StructField("Is_telephone_interview_acceptable_for_the_responder", StringType(), True),
    StructField("Type_of_interview", StringType(), True),
    StructField("Family_income_range", StringType(), True),
    StructField("Geographical_division_location", StringType(), True),
    StructField("race", StringType(), True)
])

In [None]:
# Read the .dat file as a text file
data = spark.read.text("dec17pub.dat")

In [None]:
# Use withColumn to create new columns based on substring ranges
extracted_df = data.withColumn("household_identifier_1", substring(data.value, 1, 15)) \
    .withColumn("household_identifier_2", substring(data.value, 71, 5)) \
    .withColumn("Time_of_interview_mm", substring(data.value, 16, 2)) \
    .withColumn("Time_of_interview_yy", substring(data.value, 18, 4)) \
    .withColumn("Final_outcome_of_the_survey", substring(data.value, 24, 3)) \
    .withColumn("Type_of_housing_unit", substring(data.value, 31, 2)) \
    .withColumn("Household_type", substring(data.value, 61, 2)) \
    .withColumn("Apartment_Household_has_a_telephone", substring(data.value, 33, 2)) \
    .withColumn("Apartment_Household_can_access_a_telephone_elsewhere", substring(data.value, 35, 2)) \
    .withColumn("Is_telephone_interview_acceptable_for_the_responder", substring(data.value, 37, 2)) \
    .withColumn("Type_of_interview", substring(data.value, 65, 2)) \
    .withColumn("Family_income_range", substring(data.value, 39, 2)) \
    .withColumn("Geographical_division_location", substring(data.value, 91, 1)) \
    .withColumn("race", substring(data.value, 139, 2))

In [None]:
# Show a sample of 10 records
extracted_df.show(10, truncate=False)

### Converting Spark DataFrame to Pandas DataFrame

In [None]:
import pandas as pd

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = extracted_df.toPandas()

# Show a sample of 10 records
pandas_df.head(10)

In [None]:
# Drop the 'value' column
pandas_df = pandas_df.drop('value', axis=1)
pandas_df

In [None]:
# remove the negative signs (-) from all values in all columns of the DataFrame
pandas_df = pandas_df.replace('-', '', regex=True)
pandas_df.head()

In [None]:
# concatenate the household_identifier_1 and household_identifier_2
# create a new column named full_household_identifier, 
pandas_df['full_household_identifier'] = pandas_df.apply(lambda row: row['household_identifier_1'] + row['household_identifier_2'], axis=1)
pandas_df.head()

In [None]:
# concatenate the Time_of_interview_mm and Time_of_interview_yy columns with a '-' inbetween
pandas_df['time_of_interview'] = pandas_df['Time_of_interview_yy'].astype(str) + '-' + pandas_df['Time_of_interview_mm'].astype(str)
pandas_df.head()

In [None]:
# Drop unneccessary columns
columns_to_drop = ['household_identifier_1', 'household_identifier_2', 'Time_of_interview_mm', 'Time_of_interview_yy']
pandas_df = pandas_df.drop(columns=columns_to_drop)
pandas_df.head()

In [None]:
# Define the desired column order
desired_column_order = [
    'full_household_identifier',
    'time_of_interview',
    'Final_outcome_of_the_survey',
    'Type_of_housing_unit',
    'Household_type',
    'Apartment_Household_has_a_telephone',
    'Apartment_Household_can_access_a_telephone_elsewhere',
    'Is_telephone_interview_acceptable_for_the_responder',
    'Type_of_interview',
    'Family_income_range',
    'Geographical_division_location',
    'race'
]

# Reindex the DataFrame with the desired column order
pandas_df = pandas_df.reindex(columns=desired_column_order)
pandas_df.head()

In [None]:
# Convert columns to integers
columns_to_convert = [
    'Final_outcome_of_the_survey',
    'Type_of_housing_unit',
    'Household_type',
    'Apartment_Household_has_a_telephone',
    'Apartment_Household_can_access_a_telephone_elsewhere',
    'Is_telephone_interview_acceptable_for_the_responder',
    'Type_of_interview',
    'Family_income_range',
    'Geographical_division_location',
    'race'
]

# Convert values in the specified columns to integers
pandas_df[columns_to_convert] = pandas_df[columns_to_convert].astype(int)

pandas_df.head()

### Using the custom data extracted to answer the following questions:
1.	What is the count of responders per family income range (show top 10)?
2.	What is the count of responders per geographical division/location and race (show top 10)?
3.	How many responders do not have telephone in their house, but can access a telephone elsewhere and telephone interview is accepted?
4.	How many responders can access to a telephone, but telephone interview is not accepted?     


**Note:** *Where a value is encoded, always return the actual (decoded) value.


### Question 1

In [None]:
# 1. Count of responders per family income range (show top 10)
# Decode family income range
income_range_decode = {
    1: 'LESS THAN $5,000',
    2: '$5,000 TO 7,499',
    3: '$7,500 TO 9,999',
    4: '$10,000 TO 12,499',
    5: '$12,500 TO 14,999',
    6: '$15,000 TO 19,999',
    7: '$20,000 TO 24,999',
    8: '$25,000 TO 29,999',
    9: '$30,000 TO 34,999',
    10: '$35,000 TO 39,999',
    11: '$40,000 TO 49,999',
    12: '$50,000 TO 59,999',
    13: '$60,000 TO 74,999',
    14: '$75,000 TO 99,999',
    15: '$100,000 TO 149,999',
    16: '150,000 OR MORE'
}

# Count responders per family income range
responder_counts = pandas_df.groupby('Family_income_range').size().reset_index(name='Responder_Count')

# Rename family income range with decoded values
responder_counts['Family_income_range'] = responder_counts['Family_income_range'].map(income_range_decode)

# Sort values in descending order and show top 10
responder_counts_top10 = responder_counts.sort_values(by='Responder_Count', ascending=False).head(10)

responder_counts_top10

### Question 2

In [None]:
# 2. Count of responders per geographical division/location and race (show top 10)
# Decode values
decode_geographical_division = {
    1: 'NEW ENGLAND',
    2: 'MIDDLE ATLANTIC',
    3: 'EAST NORTH CENTRAL',
    4: 'WEST NORTH CENTRAL',
    5: 'SOUTH ATLANTIC',
    6: 'EAST SOUTH CENTRAL',
    7: 'WEST SOUTH CENTRAL',
    8: 'MOUNTAIN',
    9: 'PACIFIC'
}

decode_race = {
    1: 'White Only',
    2: 'Black Only',
    3: 'American Indian, Alaskan Native Only',
    4: 'Asian Only',
    5: 'Hawaiian/Pacific Islander Only',
    6: 'White-Black',
    7: 'White-AI',
    8: 'White-Asian',
    9: 'White-HP',
    10: 'Black-AI',
    11: 'Black-Asian',
    12: 'Black-HP',
    13: 'AI-Asian',
    14: 'AI-HP',
    15: 'Asian-HP',
    16: 'W-B-AI',
    17: 'W-B-A',
    18: 'W-B-HP',
    19: 'W-AI-A',
    20: 'W-AI-HP',
    21: 'W-A-HP',
    22: 'B-AI-A',
    23: 'W-B-AI-A',
    24: 'W-AI-A-HP',
    25: 'Other 3 Race Combinations',
    26: 'Other 4 and 5 Race Combinations'
}

# Count responders per geographical division/location and race
responder_counts = pandas_df.groupby(['Geographical_division_location', 'race']).size().reset_index(name='Responder_Count')

# Rename geographical division/location and race with decoded values
responder_counts['Geographical_division_location'] = responder_counts['Geographical_division_location'].map(decode_geographical_division)
responder_counts['race'] = responder_counts['race'].map(decode_race)

# Sort values in descending order and show top 10
responder_counts_top10 = responder_counts.sort_values(by='Responder_Count', ascending=False).head(10)

responder_counts_top10

### Question 3

In [None]:
# 3. Responders without a telephone in their house, but can access a telephone elsewhere and telephone interview is accepted (show top 10)
# Decode values
decode_telephone = {1: 'YES', 2: 'NO'}

# Filter responders who do not have a telephone in their house but can access a telephone elsewhere and accept telephone interview
filtered_responders = pandas_df[(pandas_df['Apartment_Household_has_a_telephone'] == 2) & 
                                (pandas_df['Apartment_Household_can_access_a_telephone_elsewhere'] == 1) & 
                                (pandas_df['Is_telephone_interview_acceptable_for_the_responder'] == 1)]

# Count responders
top10_responders = filtered_responders.shape[0]

# Create a new DataFrame to display the result
output_data = pd.DataFrame({'Count_of_Responders': [top10_responders]})

# Decode the values in the new DataFrame
output_data['Apartment_has_A_TELEPHONE'] = decode_telephone[2]
output_data['Apartment_has_A_TELEPHONE_ELSEWHERE'] = decode_telephone[1]
output_data['IS_A_TELEPHONE_INTERVIEW_ACCEPTABLE'] = decode_telephone[1]

output_data

### Question 4

In [None]:
# 4. Responders who can access a telephone, but telephone interview is not accepted (show top 10)
# Filter responders who can access a telephone but do not accept telephone interviews
filtered_responders = pandas_df[(pandas_df['Apartment_Household_can_access_a_telephone_elsewhere'] == 1) & 
                                (pandas_df['Is_telephone_interview_acceptable_for_the_responder'] == 2)]

# Count responders
top10_responders = filtered_responders.shape[0]

# Create a new DataFrame to display the result
output_data = pd.DataFrame({'Count_of_Responders': [top10_responders]})

# Decode the values in the new DataFrame
output_data['Apartment_has_A_TELEPHONE'] = decode_telephone[1]
output_data['Apartment_has_A_TELEPHONE_ELSEWHERE'] = decode_telephone[1]
output_data['IS_A_TELEPHONE_INTERVIEW_ACCEPTABLE'] = decode_telephone[2]

output_data