In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, FloatType
from pyspark.sql.functions import col, create_map, lit, concat_ws, array, array_distinct, expr
from itertools import chain

# Create Spark session
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

# Mount the Blob Storage container
storage_account_name = "SECRET"
storage_account_access_key = "SECRET"
blob_container = "project-data"
mount_point = "/mnt/6242ProjectData"

# Check if the mount point is already mounted
if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    print("Mount point already exists.")
else:
    # Mount the Blob Storage container
    dbutils.fs.mount(
        source=f"wasbs://{blob_container}@{storage_account_name}.blob.core.windows.net",
        mount_point=mount_point,
        extra_configs={
            f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_access_key
        }
    )
    print("Mount point successfully created.")

schema = StructType([
    StructField("activity_year", StringType(), True),
    StructField("lei", StringType(), True),
    StructField("derived_msa_md", StringType(), True),
    StructField("state_code", StringType(), True),
    StructField("county_code", StringType(), True),
    StructField("census_tract", StringType(), True),
    StructField("conforming_loan_limit", StringType(), True),
    StructField("derived_loan_product_type", StringType(), True),
    StructField("derived_dwelling_category", StringType(), True),
    StructField("derived_ethnicity", StringType(), True),
    StructField("derived_race", StringType(), True),
    StructField("derived_sex", StringType(), True),
    StructField("action_taken", StringType(), True),
    StructField("purchaser_type", StringType(), True),
    StructField("preapproval", StringType(), True),
    StructField("loan_type", StringType(), True),
    StructField("loan_purpose", StringType(), True),
    StructField("lien_status", StringType(), True),
    StructField("reverse_mortgage", StringType(), True),
    StructField("open_end_line_of_credit", StringType(), True),
    StructField("business_or_commercial_purpose", StringType(), True),
    StructField("loan_amount", StringType(), True),
    StructField("combined_loan_to_value_ratio", StringType(), True), #String (replace "Exempt" with -99.000 then cast as decimal with high precision)
    StructField("interest_rate", StringType(), True), #String (replace "Exempt" with -99.000 then cast as decimal with high precision)
    StructField("rate_spread", StringType(), True), #String (replace "Exempt" with -99.000 then cast as decimal with high precision)
    StructField("hoepa_status", StringType(), True), #string
    StructField("total_loan_costs", StringType(), True), #String (replace "Exempt" with -99.000 then cast as decimal with .2f precision)
    StructField("total_points_and_fees", StringType(), True), #String (replace "Exempt" with -99.000 then cast as decimal with .2f precision)
    StructField("origination_charges", StringType(), True), #String (replace "Exempt" with -99.000 then cast as decimal with .2f precision)
    StructField("discount_points", StringType(), True), #String (replace "Exempt" with -99.000 then cast as decimal with .2f precision)
    StructField("lender_credits", StringType(), True), #String (replace "Exempt" with -99.000 then cast as decimal with high precision)
    StructField("loan_term", StringType(), True), #String (replace "Exempt" with -99.000 then cast as integer)
    StructField("prepayment_penalty_term", StringType(), True), #String (replace "Exempt" with -99.000 then cast as integer)
    StructField("intro_rate_period", StringType(), True), #String (replace "Exempt" with -99.000 then cast as integer)
    StructField("negative_amortization", StringType(), True), #String
    StructField("interest_only_payment", StringType(), True), #String
    StructField("balloon_payment", StringType(), True), #String
    StructField("other_nonamortizing_features", StringType(), True), #String
    StructField("property_value", StringType(), True), #String (replace "Exempt" with -99.000 then cast as integer)
    StructField("construction_method", StringType(), True), #String
    StructField("occupancy_type", StringType(), True), #String
    StructField("manufactured_home_secured_property_type", StringType(), True), #String
    StructField("manufactured_home_land_property_interest", StringType(), True), #String
    StructField("total_units", StringType(), True), #replace 5-24 with 15, Replace 25-49 with 37, replace 50-99 with 75, replace 100-149 with 125, replace 149 with 150, then cast as int
    StructField("multifamily_affordable_units", StringType(), True), #String (replace "Exempt" with -99.000 then cast as integer)
    StructField("income", StringType(), True), #String (replace "Exempt" with -inf then cast as integer)
    StructField("debt_to_income_ratio", StringType(), True), #Replace <20% with 20, replace 20%-<30% with 25, replace 30%-<36% with 33, replace 50-60% with 55, replace >60% with 60, replace Exempt with -99 then cast as col/100 with decimal precision of 0.00
    StructField("applicant_credit_score_type", StringType(), True), #String
    StructField("co_applicant_credit_score_type", StringType(), True), #String
    StructField("applicant_ethnicity_1", StringType(), True), #String
    StructField("applicant_ethnicity_2", StringType(), True), #String
    StructField("applicant_ethnicity_3", StringType(), True), #String
    StructField("applicant_ethnicity_4", StringType(), True), #String
    StructField("applicant_ethnicity_5", StringType(), True), #String
    StructField("co_applicant_ethnicity_1", StringType(), True), #String
    StructField("co_applicant_ethnicity_2", StringType(), True), #String
    StructField("co_applicant_ethnicity_3", StringType(), True), #String
    StructField("co_applicant_ethnicity_4", StringType(), True), #String
    StructField("co_applicant_ethnicity_5", StringType(), True), #String
    StructField("applicant_ethnicity_observed", StringType(), True), #String
    StructField("co_applicant_ethnicity_observed", StringType(), True), #String
    StructField("applicant_race_1", StringType(), True), #String
    StructField("applicant_race_2", StringType(), True), #String
    StructField("applicant_race_3", StringType(), True), #String
    StructField("applicant_race_4", StringType(), True), #String
    StructField("applicant_race_5", StringType(), True), #String
    StructField("co_applicant_race_1", StringType(), True), #String
    StructField("co_applicant_race_2", StringType(), True), #String
    StructField("co_applicant_race_3", StringType(), True), #String
    StructField("co_applicant_race_4", StringType(), True), #String
    StructField("co_applicant_race_5", StringType(), True), #String
    StructField("applicant_race_observed", StringType(), True), #String
    StructField("co_applicant_race_observed", StringType(), True), #String
    StructField("applicant_sex", StringType(), True), #String
    StructField("co_applicant_sex", StringType(), True), #String
    StructField("applicant_sex_observed", StringType(), True), #String
    StructField("co_applicant_sex_observed", StringType(), True), #String
    StructField("applicant_age", StringType(), True), #String (replace 8888 with '-99')
    StructField("co_applicant_age", StringType(), True), #String (replace 8888 and 9999 with '-99')
    StructField("applicant_age_above_62", StringType(), True), #String
    StructField("co_applicant_age_above_62", StringType(), True), #String
    StructField("submission_of_application", StringType(), True), #String
    StructField("initially_payable_to_institution", StringType(), True), #String
    StructField("aus_1", StringType(), True), #String
    StructField("aus_2", StringType(), True), #String
    StructField("aus_3", StringType(), True), #String
    StructField("aus_4", StringType(), True), #String
    StructField("aus_5", StringType(), True), #String
    StructField("denial_reason_1", StringType(), True), #String
    StructField("denial_reason_2", StringType(), True), #String
    StructField("denial_reason_3", StringType(), True), #String
    StructField("denial_reason_4", StringType(), True), #String
    StructField("tract_population", StringType(), True), #	Int
    StructField("tract_minority_population_percent", StringType(), True), # then cast as col/100 then high precision
    StructField("ffiec_msa_md_median_family_income", StringType(), True),
    StructField("tract_to_msa_income_percentage", StringType(), True),	# cast as col/100 then high precision
    StructField("tract_owner_occupied_units", StringType(), True), #	Int
    StructField("tract_one_to_four_family_homes", StringType(), True), #	Int
    StructField("tract_median_age_of_housing_units", StringType(), True) #	Int
])

allYearsAndStates = spark.read.format("parquet").schema(schema).load("/mnt/6242ProjectData/allYearsAllStates.parquet")

# Create a temporary view from the DataFrame
allYearsAndStates.createOrReplaceTempView("allYearsAndStates")


Mount point already exists.


In [None]:
from pyspark.sql.functions import when, col
analyticsDF = allYearsAndStates.\
    withColumn("combined_loan_to_value_ratio", when(col("combined_loan_to_value_ratio") == "Exempt", "-99.000").otherwise(col("combined_loan_to_value_ratio")).cast("decimal(10, 5)")).\
    withColumn("interest_rate", when(col("interest_rate") == "Exempt", "-99.000").otherwise(col("interest_rate")).cast("decimal(10, 5)")).\
    withColumn("rate_spread", when(col("rate_spread") == "Exempt", "-99.000").otherwise(col("rate_spread")).cast("decimal(10, 5)")).\
    withColumn("total_loan_costs", when(col("total_loan_costs") == "Exempt", "-99.000").otherwise(col("total_loan_costs")).cast("decimal(10, 5)")).\
    withColumn("total_points_and_fees", when(col("total_points_and_fees") == "Exempt", "-99.000").otherwise(col("total_points_and_fees")).cast("decimal(10, 5)")).\
    withColumn("origination_charges", when(col("origination_charges") == "Exempt", "-99.000").otherwise(col("origination_charges")).cast("decimal(10, 5)")).\
    withColumn("discount_points", when(col("discount_points") == "Exempt", "-99.000").otherwise(col("discount_points")).cast("decimal(10, 5)")).\
    withColumn("lender_credits", when(col("lender_credits") == "Exempt", "-99.000").otherwise(col("lender_credits")).cast("decimal(10, 5)")).\
    withColumn("loan_term", when(col("loan_term") == "Exempt", "-99.000").otherwise(col("loan_term").cast("integer")).cast("integer")).\
    withColumn("prepayment_penalty_term", when(col("prepayment_penalty_term") == "Exempt", "-99.000").otherwise(col("prepayment_penalty_term").cast("integer")).cast("integer")).\
    withColumn("intro_rate_period", when(col("intro_rate_period") == "Exempt", "-99.000").otherwise(col("intro_rate_period").cast("integer")).cast("integer")).\
    withColumn("property_value", when(col("property_value") == "Exempt", "-99.000").otherwise(col("property_value").cast("integer")).cast("integer")).\
    withColumn("total_units_val", when(col("total_units") == "5-24", "15")
                             .when(col("total_units") == "25-49", "37")
                             .when(col("total_units") == "50-99", "75")
                             .when(col("total_units") == "100-149", "125")
                             .when(col("total_units") == ">149", "150")
                             .otherwise(col("total_units")).cast("integer")).\
    withColumn("multifamily_affordable_units", when(col("multifamily_affordable_units") == "Exempt", "-99.000").otherwise(col("multifamily_affordable_units").cast("integer")).cast("integer")).\
    withColumn("income", when(col("income") == "Exempt", "-inf").otherwise(col("income").cast("integer")).cast("integer")).\
    withColumn("debt_to_income_ratio_val", when(col("debt_to_income_ratio") == "<20%", "20")
                                        .when(col("debt_to_income_ratio") == "20%-<30%", "25")
                                        .when(col("debt_to_income_ratio") == "30%-<36%", "33")
                                        .when(col("debt_to_income_ratio") == "50-60%", "55")
                                        .when(col("debt_to_income_ratio") == ">60%", "60")
                                        .when(col("debt_to_income_ratio") == "NA", "-99")
                                        .when(col("debt_to_income_ratio") == "Exempt", "-99")
                                        .otherwise(col("debt_to_income_ratio")).cast("decimal(10, 5)")).\
    withColumn("applicant_age", when(col("applicant_age") == "8888", "-99").otherwise(col("applicant_age"))).\
    withColumn("co_applicant_age", when(col("co_applicant_age").isin("8888", "9999"), "-99").otherwise(col("co_applicant_age"))).\
    withColumn("tract_minority_population_percent", (col("tract_minority_population_percent") / 100).cast("decimal(10,5)")).\
    withColumn("ffiec_msa_md_median_family_income", col("ffiec_msa_md_median_family_income").cast("integer")).\
    withColumn("tract_to_msa_income_percentage", (col("tract_to_msa_income_percentage") / 100).cast("decimal(10,5)")).\
    withColumn("tract_owner_occupied_units", col("tract_owner_occupied_units").cast("integer")).\
    withColumn("tract_one_to_four_family_homes", col("tract_one_to_four_family_homes").cast("integer")).\
    withColumn("tract_median_age_of_housing_units", col("tract_median_age_of_housing_units").cast("integer"))

In [None]:


# purchaser type
purchaser_typeMap = {
    "0": "Not applicable",
    "1": "GSE", # Fannie Mae
    "2": "HUD", # Ginnie Mae
    "3": "GSE", # Freddie Mac
    "4": "GSE", # Farmer Mac
    "5": "Private securitizer",
    "6": "Commercial bank, savings bank, or savings association",
    "71": "Credit union, mortgage company, or finance company",
    "72": "Life insurance company",
    "8": "Affiliate institution",
    "9": "Other type of purchaser"
}

mapping_expr = create_map([lit(x) for x in chain(*purchaser_typeMap.items())])

analyticsDF = analyticsDF.withColumn("purchaser_type", mapping_expr.getItem(col("purchaser_type")))

preapprovalMap = {
    "1": True,
    "2": False
}
# preapproval
analyticsDF = analyticsDF.withColumn("preapproval", (when(col('preapproval')=="1", True).otherwise(False)).cast("boolean"))

loan_typeMap = {
    "1": "Conventional",
    "2": "FHA",
    "3": "VA",
    "4": "USDA"
}

mapping_expr = create_map([lit(x) for x in chain(*loan_typeMap.items())])

analyticsDF = analyticsDF.withColumn("loan_type", mapping_expr.getItem(col("loan_type")))

hoepa_statusMap = {
    "1": "High-Cost",
    "2": "Not High-Cost",
    "3": "Not HOEPA"
}
mapping_expr = create_map([lit(x) for x in chain(*hoepa_statusMap.items())])

analyticsDF = analyticsDF.withColumn("hoepa_status", mapping_expr.getItem(col("hoepa_status")))
construction_methodMap = {
    "1": "Site-Built",
    "2": "Manufactured"
}
analyticsDF = analyticsDF.withColumn("construction_method", when(col('construction_method')=="1", "Site-Built").otherwise("Manufactured"))
occupancy_typeMap = {
    "1": "Principal Residence",
    "2": "Second Residence",
    "3": "Investment"
}
mapping_expr = create_map([lit(x) for x in chain(*occupancy_typeMap.items())])

analyticsDF = analyticsDF.withColumn("occupancy_type", mapping_expr.getItem(col("occupancy_type")))
ethnicityMap = {
    "1": "Hispanic or Latino",
    "11": "Hispanic or Latino", # Mexican
    "12": "Hispanic or Latino", # Puerto Rican
    "13": "Hispanic or Latino", # Cuban
    "14": "Hispanic or Latino", # Other Hispanic or Latino
    "2": "Not Hispanic or Latino",
    "3": "Missing", # Not Provided
    "4": "Missing", # Not applicable
    "5": "Missing" # No co-applicant
}
mapping_expr = create_map([lit(x) for x in chain(*ethnicityMap.items())])

for column in ["applicant_ethnicity_1","applicant_ethnicity_2","applicant_ethnicity_3","applicant_ethnicity_4","applicant_ethnicity_5","co_applicant_ethnicity_1","co_applicant_ethnicity_2","co_applicant_ethnicity_3","co_applicant_ethnicity_4","co_applicant_ethnicity_5"]:
    analyticsDF = analyticsDF.withColumn(column, mapping_expr.getItem(col(column)))

raceMap = {
    "1": "American Indian or Alaska Native",
    "2": "Asian",
    "21": "Asian", # "Asian Indian", 
    "22": "Asian", #"Chinese",
    "23": "Asian", #"Filipino",
    "24": "Asian", #"Japanese",
    "25": "Asian", #"Korean",
    "26": "Asian", #"Vietnamese",
    "27": "Asian", #"Other Asian",
    "3": "Black",
    "4": "Native Hawaiian or Pacific Islander", #
    "41": "Native Hawaiian or Pacific Islander", #"Native Hawaiian",
    "42": "Native Hawaiian or Pacific Islander", #"Guamanian or Chamorro",
    "43": "Native Hawaiian or Pacific Islander", #"Samoan",
    "44": "Native Hawaiian or Pacific Islander", #"Other Pacific Islander",
    "5": "White",
    "6": "Missing",
    "7": "Missing", #NA
    "8": "Missing" # No co-applicant
}
mapping_expr = create_map([lit(x) for x in chain(*raceMap.items())])

for column in ["applicant_race_1","applicant_race_2","applicant_race_3","applicant_race_4","applicant_race_5","co_applicant_race_1","co_applicant_race_2","co_applicant_race_3","co_applicant_race_4","co_applicant_race_5"]:
    analyticsDF = analyticsDF.withColumn(column, mapping_expr.getItem(col(column)))

sexMap = {
    "1": "Male",
    "2": "Female",
    "3": "Missing",
    "4": "Missing", # NA
    "5": "Missing", # No co-applicant
    "6": "Both Male and Female" # should recode to unknown or missing?
}
mapping_expr = create_map([lit(x) for x in chain(*sexMap.items())])
analyticsDF = analyticsDF.withColumn("applicant_sex", mapping_expr.getItem(col("applicant_sex")))
analyticsDF = analyticsDF.withColumn("co_applicant_sex", mapping_expr.getItem(col("co_applicant_sex")))

denial_reasonMap = {
    "1": "Debt-to-income ratio",
    "2": "Employment history",
    "3": "Credit history",
    "4": "Collateral",
    "5": "Insufficient cash (downpayment, closing costs)",
    "6": "Unverifiable information",
    "7": "Credit application incomplete",
    "8": "Mortgage insurance denied",
    "9": "Other",
    "10": "Not applicable",
}
mapping_expr = create_map([lit(x) for x in chain(*denial_reasonMap.items())])

for column in ["denial_reason_1", "denial_reason_2", "denial_reason_3", "denial_reason_4"]:
    analyticsDF = analyticsDF.withColumn(column, mapping_expr.getItem(col(column)))

paymentMaps = {
    "2": False,
    "1": True,
    "1111": False
} # for use with negative_amortization, interest_only_payment, balloon_payment, other_nonamortizing_features, #business_or_commercial_purpose
mapping_expr = create_map([lit(x) for x in chain(*paymentMaps.items())])

for column in ["negative_amortization", "interest_only_payment", "balloon_payment", "other_nonamortizing_features", "business_or_commercial_purpose"]:
    analyticsDF = analyticsDF.withColumn(column, (mapping_expr.getItem(col(column))).cast("boolean"))

    # create joined version of multi-columns:
    # applicant_ethnicitys = join applicant_ethnicity_n | after deduplicating and eliminating blanks
    # co_applicant_ethnicitys = join co_applicant_ethnicity_n | after deduplicating and eliminating blanks

# Join and deduplicate ethnicity columns for applicants
analyticsDF = analyticsDF.withColumn("applicant_ethnicitys", 
                                      concat_ws("|", 
                                                array_distinct(
                                                    array(*[expr(f"trim({x})") for x in ["applicant_ethnicity_1", "applicant_ethnicity_2", "applicant_ethnicity_3", "applicant_ethnicity_4", "applicant_ethnicity_5"] if x is not None])
                                                )
                                      ))
analyticsDF = analyticsDF.withColumn("co_applicant_ethnicitys", 
                                      concat_ws("|", 
                                                array_distinct(
                                                    array(*[expr(f"trim({x})") for x in ["co_applicant_ethnicity_1", "co_applicant_ethnicity_2", "co_applicant_ethnicity_3", "co_applicant_ethnicity_4", "co_applicant_ethnicity_5"] if x is not None])
                                                )
                                      ))

# Join and deduplicate race columns for applicants
analyticsDF = analyticsDF.withColumn("applicant_races", 
                                      concat_ws("|", 
                                                array_distinct(
                                                    array(*[expr(f"trim({x})") for x in ["applicant_race_1", "applicant_race_2", "applicant_race_3", "applicant_race_4", "applicant_race_5"] if x is not None])
                                                )
                                      )
                                    )
analyticsDF = analyticsDF.withColumn("co_applicant_races", 
                                      concat_ws("|", 
                                                array_distinct(
                                                    array(*[expr(f"trim({x})") for x in ["co_applicant_race_1", "co_applicant_race_2", "co_applicant_race_3", "co_applicant_race_4", "co_applicant_race_5"] if x is not None])
                                                )
                                      )
                                    )
analyticsDF = analyticsDF.withColumn("denial_reasons", 
                                      concat_ws("|", 
                                                array_distinct(
                                                    array(*[expr(f"trim({x})") for x in ["denial_reason_1", "denial_reason_2", "denial_reason_3", "denial_reason_4"] if x is not None])
                                                )
                                      )
                                    )

colsToDrop = [
    "submission_of_application","initially_payable_to_institution","applicant_sex_observed","co_applicant_sex_observed","applicant_race_observed","co_applicant_race_observed","applicant_ethnicity_observed","co_applicant_ethnicity_observed","applicant_ethnicity_1","applicant_ethnicity_2","applicant_ethnicity_3","applicant_ethnicity_4","applicant_ethnicity_5","co_applicant_ethnicity_1","co_applicant_ethnicity_2","co_applicant_ethnicity_3","co_applicant_ethnicity_4","co_applicant_ethnicity_5","applicant_race_1","applicant_race_2","applicant_race_3","applicant_race_4","applicant_race_5","co_applicant_race_1","co_applicant_race_2","co_applicant_race_3","co_applicant_race_4","co_applicant_race_5","applicant_sex","co_applicant_sex","denial_reason_1","denial_reason_2","denial_reason_3","denial_reason_4", "manufactured_home_secured_property_type", "manufactured_home_land_property_interest"
]

colsToKeep = [col for col in analyticsDF.columns if col not in colsToDrop]

# filtering and keeping rows
# select colsToKeep
# drop rows where loan_purpose != "1"
# drop rows where reverse_mortgage == "1"
# drop rows where open_end_line_of_credit == "1"

# If you explicitly need to select columns to keep (though not necessary for filtering), you can use:
# Number of rows
numRows = analyticsDF.count()

# Number of columns
numCols = len(analyticsDF.columns)

print("Shape of DataFrame: (Rows, Columns) = ({}, {})".format(numRows, numCols))

analyticsDF = analyticsDF.select(*colsToKeep)

# Apply the row filters again if colsToKeep is used after filtering
analyticsDF = analyticsDF.filter(
    (col("loan_purpose") == "1") & # Home Purchase - should we include refinancing? do two separate models?
    (col("reverse_mortgage") != "1") & # Not a reverse mortgage
    (col("open_end_line_of_credit") != "1") # Not an open-end line of credit
)
# Number of rows
numRows = analyticsDF.count()

# Number of columns
numCols = len(analyticsDF.columns)

print("After dropping cols and rows, shape of DataFrame: (Rows, Columns) = ({}, {})".format(numRows, numCols))


Shape of DataFrame: (Rows, Columns) = (100763685, 106)
After dropping cols and rows, shape of DataFrame: (Rows, Columns) = (40764739, 70)


In [None]:
analyticsDF.coalesce(1).write.format("parquet").save("/mnt/6242ProjectData/fullAnalyticsDF.parquet")
analyticsDFSmall = analyticsDF.filter(col("state_code").isin(["TX", "NY", "CA", "WA"])).coalesce(1)
analyticsDFSmall.write.format("parquet").save("/mnt/6242ProjectData/fullAnalyticsDFSmall.parquet")
analyticsDFSmall.write.format("csv").save("/mnt/6242ProjectData/fullAnalyticsDFSmall.csv")

In [None]:


from pyspark.sql.functions import udf

# Define UDF to try casting to float
def try_cast_to_float(value):
    try:
        return float(value)
    except ValueError:
        return None

# Register UDF
udf_cast_to_float = udf(try_cast_to_float, FloatType())

# List of columns to check
columns_to_check = [
    "loan_amount",
    "tract_population",
    "tract_minority_population_percent",
    "ffiec_msa_md_median_family_income",
    "tract_to_msa_income_percentage",
    "tract_owner_occupied_units",
    "tract_one_to_four_family_homes",
    "tract_median_age_of_housing_units",
]

# Assuming df is your DataFrame
for column in columns_to_check:
    df_with_cast = allYearsAndStates.withColumn(column + "_cast", udf_cast_to_float(allYearsAndStates[column]))
    invalid_rows = df_with_cast.filter(df_with_cast[column + "_cast"].isNull())
    print(f"Invalid values for {column}:")
    invalid_rows.select(column).show()

Invalid values for loan_amount:
+-----------+
|loan_amount|
+-----------+
+-----------+

Invalid values for tract_population:
+----------------+
|tract_population|
+----------------+
+----------------+

Invalid values for tract_minority_population_percent:
+---------------------------------+
|tract_minority_population_percent|
+---------------------------------+
+---------------------------------+

Invalid values for ffiec_msa_md_median_family_income:
+---------------------------------+
|ffiec_msa_md_median_family_income|
+---------------------------------+
+---------------------------------+

Invalid values for tract_to_msa_income_percentage:
+------------------------------+
|tract_to_msa_income_percentage|
+------------------------------+
+------------------------------+

Invalid values for tract_owner_occupied_units:
+--------------------------+
|tract_owner_occupied_units|
+--------------------------+
+--------------------------+

Invalid values for tract_one_to_four_family_homes:
