In [1]:
#importing the required libraries and initializing Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

In [2]:
# Path to your .dat file
file_path = "dec17pub.dat"

# Read the .dat file as a DataFrame
raw_data = spark.read.text(file_path)

# Split the lines using a delimiter (modify as needed)
delimiter = "\t"  # Example delimiter
split_columns = split(raw_data.value, delimiter).alias("columns")

# Select and process the columns from the split data
processed_data = raw_data.select(split_columns)

# Show the processed data (you can perform further transformations)
processed_data.show()

+--------------------+
|             columns|
+--------------------+
|[0000047951107191...|
|[0000047951107191...|
|[0000716910049411...|
|[0000716910049411...|
|[0000716910049411...|
|[0001101779879861...|
|[0001101779879861...|
|[0001102065933811...|
|[0001102848156801...|
|[0001103278564691...|
|[0001103399354531...|
|[0001103399354531...|
|[0001103399354531...|
|[0001103399354531...|
|[0001103438685671...|
|[0001103438685671...|
|[0001103594243391...|
|[0001103594243391...|
|[0001104154004291...|
|[0001104154004291...|
+--------------------+
only showing top 20 rows



In [3]:
#checking number of records

num_rows = processed_data.count()
print("Number of records:", num_rows)

Number of records: 146456


There are 146,456 records in this dataset.

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.functions import when, col

# Define the schema based on the updated column descriptions
schema = StructType([
    StructField("hs_ID", StringType(), False),
    StructField("month", IntegerType(), False),
    StructField("year", IntegerType(), False),
    StructField("fin_out", StringType(), False),
    StructField("hs_unt", IntegerType(), False),
    StructField("hs_tel", IntegerType(), False),
    StructField("else_tel", IntegerType(), False),
    StructField("tel_intv", IntegerType(), False),
    StructField("fam_inc", IntegerType(), False),
    StructField("hs_type", IntegerType(), False),
    StructField("intv_type", IntegerType(), False), 
    StructField("REG", IntegerType(), False),
    StructField("DIV", StringType(), False),
    StructField("race", StringType(), False)
    # Add more fields according to your actual schema
])

# Define a UDF to extract values from text rows
def extract_values(text_row):
    values = [
        text_row[0][0:15],  # HRHHID
        int(text_row[0][15:17]),  # HRMONTH
        int(text_row[0][17:21]),  # HRYEAR4
        text_row[0][23:26],  # HUFINAL
        int(text_row[0][30:32]),  # HEHOUSUT
        int(text_row[0][33:34]),  # HETELHHD
        int(text_row[0][35:36]),  # HETELAVL
        int(text_row[0][37:38]),  # HEPHONEO
        int(text_row[0][38:40]),  # HEFAMINC
        int(text_row[0][60:62]),  # HRHTYPE 
        int(text_row[0][65:66]),  # HUINTTYP
        int(text_row[0][88:90]),  #REG
        text_row[0][90:91],       #DIV
        text_row[0][138:140]  # PTDTRACE
        # Add more value extractions based on your actual schema
    ]
    
    return values

# Register the UDF
extract_udf = udf(extract_values, schema)
# Assuming your preprocessed_data DataFrame has a column named "text_row"
processed_data = processed_data.withColumn("extracted_values", extract_udf(col("columns")))

# Define column names
# Define column names
column_names = ["hs_ID", "month", "year", "fin_out", "hs_unt",
                "hs_tel", "else_tel", "tel_intv", "fam_inc", "hs_type", "intv_type", "REG", "DIV",
                "race"]

# Select the extracted values and alias the columns
selected_df = processed_data.select(*[col("extracted_values")[col_name].alias(col_name) for col_name in column_names])
# Create a new column "house_tel" using values from the dictionary


print(selected_df)
# Show the resulting DataFrame
selected_df.show(10)


DataFrame[hs_ID: string, month: int, year: int, fin_out: string, hs_unt: int, hs_tel: int, else_tel: int, tel_intv: int, fam_inc: int, hs_type: int, intv_type: int, REG: int, DIV: string, race: string]
+---------------+-----+----+-------+------+------+--------+--------+-------+-------+---------+---+---+----+
|          hs_ID|month|year|fin_out|hs_unt|hs_tel|else_tel|tel_intv|fam_inc|hs_type|intv_type|REG|DIV|race|
+---------------+-----+----+-------+------+------+--------+--------+-------+-------+---------+---+---+----+
|000004795110719|   12|2017|    201|     1|     1|       1|       1|      9|      1|        2|  3|  6|   1|
|000004795110719|   12|2017|    201|     1|     1|       1|       1|      9|      1|        2|  3|  6|   1|
|000071691004941|   12|2017|    201|     1|     1|       1|       1|     11|      1|        1|  3|  6|   1|
|000071691004941|   12|2017|    201|     1|     1|       1|       1|     11|      1|        1|  3|  6|   1|
|000071691004941|   12|2017|    201|     1

In [5]:
from pyspark.sql.functions import col, when, create_map, lit

# Define mappings
hs_unt_mapping = {0: "OTHER UNIT", 1: "HOUSE, APARTMENT, FLAT"}
hs_tel_mapping = {1: "YES", 2: "NO"}
else_tel_mapping = {1: "YES", 2: "NO"}
tel_intv_mapping = {1: "YES", 0: "NO"}

intv_type_mapping = {1: "YES", 2: "NO"}

# Apply mappings using the when function
mapped_df = selected_df.select(
    "hs_ID",
    "month",
    "year",
    "fin_out",
    when(col("hs_unt") == 0, "OTHER UNIT").otherwise("HOUSE, APARTMENT, FLAT").alias("hs_unt"),
    when(col("hs_tel") == 1, "YES").otherwise("NO").alias("hs_tel"),
    when(col("else_tel") == 1, "YES").otherwise("NO").alias("else_tel"),
    when(col("tel_intv") == 1, "YES").otherwise("NO").alias("tel_intv"),
    #fam_income_map[col("fam_income")].alias("fam_income"),
    #hs_type_map[col("hs_type")].alias("hs_type"),
    when(col("intv_type") == 1, "YES").otherwise("NO").alias("intv_type"),
    #region_map[col("REG")].alias("REG"),
    #division_map[col("DIV")].alias("DIV"),
    "fam_inc",
    "hs_type",
    "REG",
    "DIV",
    "race"
)

# Show the mapped DataFrame
mapped_df.show()

+---------------+-----+----+-------+--------------------+------+--------+--------+---------+-------+-------+---+---+----+
|          hs_ID|month|year|fin_out|              hs_unt|hs_tel|else_tel|tel_intv|intv_type|fam_inc|hs_type|REG|DIV|race|
+---------------+-----+----+-------+--------------------+------+--------+--------+---------+-------+-------+---+---+----+
|000004795110719|   12|2017|    201|HOUSE, APARTMENT,...|   YES|     YES|     YES|       NO|      9|      1|  3|  6|   1|
|000004795110719|   12|2017|    201|HOUSE, APARTMENT,...|   YES|     YES|     YES|       NO|      9|      1|  3|  6|   1|
|000071691004941|   12|2017|    201|HOUSE, APARTMENT,...|   YES|     YES|     YES|      YES|     11|      1|  3|  6|   1|
|000071691004941|   12|2017|    201|HOUSE, APARTMENT,...|   YES|     YES|     YES|      YES|     11|      1|  3|  6|   1|
|000071691004941|   12|2017|    201|HOUSE, APARTMENT,...|   YES|     YES|     YES|      YES|     11|      1|  3|  6|   1|
|000110177987986|   12|2

In [6]:
from pyspark.sql.functions import col, concat, expr, lit
import calendar

# Add a new column with concatenated year and month in "YYYY/MMM" format
intv_time = mapped_df.withColumn(
    "intv_date",
    concat(
        col("year"), lit("/"), 
        expr("substring('JanFebMarAprMayJunJulAugSepOctNovDec', (month - 1) * 3 + 1, 3)")
    )
)

new_df = intv_time.withColumn("reg/div", concat("REG", lit("/"), "DIV"))

# Drop year and month columns
new_df = new_df.drop("year", "month", "REG", "DIV")
# Show the resulting DataFrame
new_df.show(10)

# Write the DataFrame to a CSV file with overwrite mode
new_df.write.mode("overwrite").csv("path/to/output/folder")


+---------------+-------+--------------------+------+--------+--------+---------+-------+-------+----+---------+-------+
|          hs_ID|fin_out|              hs_unt|hs_tel|else_tel|tel_intv|intv_type|fam_inc|hs_type|race|intv_date|reg/div|
+---------------+-------+--------------------+------+--------+--------+---------+-------+-------+----+---------+-------+
|000004795110719|    201|HOUSE, APARTMENT,...|   YES|     YES|     YES|       NO|      9|      1|   1| 2017/Dec|    3/6|
|000004795110719|    201|HOUSE, APARTMENT,...|   YES|     YES|     YES|       NO|      9|      1|   1| 2017/Dec|    3/6|
|000071691004941|    201|HOUSE, APARTMENT,...|   YES|     YES|     YES|      YES|     11|      1|   1| 2017/Dec|    3/6|
|000071691004941|    201|HOUSE, APARTMENT,...|   YES|     YES|     YES|      YES|     11|      1|   1| 2017/Dec|    3/6|
|000071691004941|    201|HOUSE, APARTMENT,...|   YES|     YES|     YES|      YES|     11|      1|   1| 2017/Dec|    3/6|
|000110177987986|    201|HOUSE, 

## QUESTION 1

In [7]:
# Group by "fam_income" and calculate the count of responders
family_income_counts = new_df.groupBy("fam_inc").count()
# Show the resulting DataFrame
family_income_counts.show()

+-------+-----+
|fam_inc|count|
+-------+-----+
|     -1|20391|
|     12| 9971|
|      1| 3136|
|     13|13442|
|      6| 4518|
|     16|15704|
|      3| 2277|
|      5| 2614|
|     15|17794|
|      9| 6743|
|      4| 3161|
|      8| 5803|
|      7| 6312|
|     10| 6620|
|     11| 9788|
|     14|16557|
|      2| 1625|
+-------+-----+



In [8]:
unique_values = new_df.select("fam_inc").distinct()
# Show the unique values
unique_values.show()

+-------+
|fam_inc|
+-------+
|     -1|
|     12|
|      1|
|     13|
|      6|
|     16|
|      3|
|      5|
|     15|
|      9|
|      4|
|      8|
|      7|
|     10|
|     11|
|     14|
|      2|
+-------+



Where;
-1 = Outlier

1 = LESS THAN $5,000

2 = 5,000 TO 7,499

3 = 7,500 TO 9,999

4 = 10,000 TO 12,499

5 = 12,500 TO 14,999

6 = 15,000 TO 19,999

7 = 20,000 TO 24,999

8 = 25,000 TO 29,999

9 = 30,000 TO 34,999

10 = 35,000 TO 39,999

11 = 40,000 TO 49,999

12  = 50,000 TO 59,999

13 = 60,000 TO 74,999

14 = 75,000 TO 99,999

15 = 100,000 TO 149,999

16 = 150,000 OR MORE

## Question 2

In [11]:
from pyspark.sql import functions as F

# Group by "reg_div_concat" and "race" and calculate the count of responders
grouped_df = new_df.groupBy("reg/div", "race").agg(F.count("*").alias("responder_count"))

# Order the DataFrame by "responder_count" in descending order
ordered_df = grouped_df.orderBy(F.desc("responder_count"))

# Select the top 10 rows
top_10_responder_counts = ordered_df.limit(10)

# Show the top 10 rows
top_10_responder_counts.show()

+-------+----+---------------+
|reg/div|race|responder_count|
+-------+----+---------------+
|    3/5|   1|          16999|
|    4/8|   1|          14343|
|    4/9|   1|          13214|
|    2/3|   1|          11325|
|    3/7|   1|          11248|
|    2/4|   1|           9884|
|    1/2|   1|           8487|
|    1/1|   1|           8410|
|    3/6|   1|           6580|
|    3/5|   2|           4899|
+-------+----+---------------+



In [12]:
unique_location = new_df.select("reg/div").distinct()
# Show the unique values
unique_location.show()

+-------+
|reg/div|
+-------+
|    3/7|
|    2/4|
|    3/6|
|    3/5|
|    2/3|
|    4/9|
|    1/2|
|    4/8|
|    1/1|
+-------+



In [13]:
unique_race = new_df.select("race").distinct().sort("race")
# Show the unique values
unique_race.show(30)

+----+
|race|
+----+
|   1|
|   2|
|   3|
|   4|
|   5|
|   6|
|   7|
|   8|
|   9|
|  -1|
|  10|
|  11|
|  12|
|  13|
|  14|
|  15|
|  16|
|  17|
|  18|
|  19|
|  20|
|  21|
|  23|
|  24|
|  25|
|  26|
+----+



## Question 3

In [16]:
# Assuming "df" is your DataFrame
filtered_df = new_df.filter(
    (new_df.hs_tel == "NO")  # No telephone access in the house
    & (new_df.else_tel == "YES")  # Can access telephone elsewhere
    & (new_df.tel_intv == "YES")  # Telephone interview accepted
)

# Count the responders matching the criteria
count = filtered_df.count()

# Show the result
print("Number of responders:", count)

Number of responders: 636


## Question 4

In [18]:
# Assuming "df" is your DataFrame
new_filtered_df = new_df.filter(
    ((new_df.hs_tel == "YES") | (new_df.else_tel == "YES"))  # Telephone access in the house or elsewhere
    & (new_df.tel_intv == "NO")  # Telephone interview not accepted
)

# Count the responders matching the criteria
count = new_filtered_df.count()

# Show the result
print("Number of responders:", count)

Number of responders: 25795


In [21]:
unique_tel_intv = new_df.select("tel_intv").distinct()
# Show the unique values
unique_tel_intv.show()

+--------+
|tel_intv|
+--------+
|     YES|
|      NO|
+--------+

