# 06_merge_monthly.ipynb

In [1]:
import pandas as pd
import geopandas as gpd
from dask.distributed import Client
import dask.dataframe as dd
from dask_jobqueue import SLURMCluster
import matplotlib.pyplot as plt
import os
import contextily as cx
from build_utilities import generate_variable_names, aggregate_crime_to_case_month
import dask_geopandas
INPUT_DATA_EVICTIONS = "../../data/02_intermediate/evictions.csv"
INPUT_DATA_TRACTS = "../../data/02_intermediate/tracts.csv"
INPUT_DATA_TAX_PARCELS = "../../data/02_intermediate/tax_parcels.gpkg"
INPUT_DATA_ZESTIMATES = "../../data/02_intermediate/zestimates.csv"
INPUT_DATA_CRIME = "../../data/01_raw/crime_incidents"
OUTPUT_DATA_UNRESTRICTED = "../../data/03_cleaned/unrestricted_monthly.parquet"
OUTPUT_DATA_ZILLOW = "../../data/03_cleaned/zestimates_analysis.csv"
OUTPUT_DATA_CRIME = "../../data/03_cleaned/crime_analysis_monthly.parquet"
OUTPUT_TABLES = "../../output/summary_statistics/tables/"
VERBOSE = True
N_PARTITIONS = 1
value_vars_to_concat = []  # A list of DataFrames, where each DataFrame contains the panel data for a single outcome variable and has case_number as its index.


import os
os.environ['USE_PYGEOS'] = '0'
import geopandas

In a future release, GeoPandas will switch to using Shapely by default. If you are using PyGEOS directly (calling PyGEOS functions on geometries from GeoPandas), this will then stop working and you are encouraged to migrate from PyGEOS to Shapely 2.0 (https://shapely.readthedocs.io/en/latest/migration_pygeos.html).
  import geopandas as gpd


## 1. Loading Evictions Data

In [None]:
# Load evictions data.
with open(INPUT_DATA_EVICTIONS, 'r') as file:
    all_column_names = set(file.readline().replace("\"", "").replace("\n", "").split(","))
to_drop = {'Accuracy Score', 'Accuracy Type', 'Number', 'Street', 'Unit Type', 'Unit Number',
           'State', 'Zip', 'Country', 'Source', 'Census Year', 'State FIPS', 'County FIPS',
           'Place Name', 'Place FIPS', 'Census Tract Code', 'Census Block Code', 'Census Block Group',
           'Metro/Micro Statistical Area Code', 'Metro/Micro Statistical Area Type',
           'Combined Statistical Area Code', 'Metropolitan Division Area Code', 'court_location',
           'defendant', 'defendant_atty', 'defendant_atty_address_apt',
           'defendant_atty_address_city', 'defendant_atty_address_name', 'defendant_atty_address_state',
           'defendant_atty_address_street', 'defendant_atty_address_zip', 'docket_history', 'judgment_for',
           'judgment_total', 'plaintiff', 'plaintiff_atty', 'plaintiff_atty_address_apt',
           'plaintiff_atty_address_city', 'plaintiff_atty_address_name', 'plaintiff_atty_address_state',
           'plaintiff_atty_address_street', 'plaintiff_atty_address_zip', 'Metropolitan Division Area Name',
           'property_address_city', 'property_address_state', 'property_address_street',
           'property_address_zip'}
evictions_df = pd.read_csv(INPUT_DATA_EVICTIONS, usecols=set(all_column_names) - set(to_drop))
sample_restriction_table_index = []
sample_restriction_table_values = []

# Restrict to cases in Boston.
boston_mask = ((evictions_df['County'] == "Suffolk County") & (~evictions_df['City'].isin(["Chelsea", "Revere", "Winthrop"])))
if VERBOSE:
    print(f"Restricting to {boston_mask.sum()} observations which are from Boston.")
evictions_df = evictions_df.loc[boston_mask, :]
original_N = len(evictions_df)
if VERBOSE:
    print(f"Beginning with {original_N} observations.")
sample_restriction_table_index.append("Case Filed in Boston")
sample_restriction_table_values.append(original_N)

# Drop cases missing latest_docket_date.
mask = evictions_df['latest_docket_date'].notna()
if VERBOSE:
    print(
        f"Dropping {evictions_df['latest_docket_date'].isna().sum()} observations where latest_docket_date is missing.")
evictions_df = evictions_df.loc[mask, :]
sample_restriction_table_index.append("Non-missing latest docket date")
sample_restriction_table_values.append(len(evictions_df))

# Drop malformed addresses.
if VERBOSE:
    print(f"Dropping {evictions_df['property_address_full'].str.contains('span, span span').sum()} observations which "
          f"have malformed addresses.")
evictions_df = evictions_df.loc[~evictions_df['property_address_full'].str.contains("span, span span"), :]

# Drop missing addresses.
no_address_info_mask = (evictions_df['property_address_full'].isna())
if VERBOSE:
    print(
        f"Dropping {no_address_info_mask.sum()} rows missing property_address_full")
evictions_df = evictions_df.loc[~no_address_info_mask, :]
sample_restriction_table_index.append("Non-missing property address")
sample_restriction_table_values.append(len(evictions_df))

# Add file month and year to dataset.
evictions_df.loc[:, 'file_month'] = pd.to_datetime(evictions_df['file_date']).dt.strftime('%Y-%m')
evictions_df.loc[:, 'file_year'] = pd.to_datetime(evictions_df['file_date']).dt.year

# Add latest docket month and year to dataset.
evictions_df.loc[:, 'latest_docket_month'] = pd.to_datetime(evictions_df['latest_docket_date']).dt.strftime('%Y-%m')
evictions_df.loc[:, 'latest_docket_year'] = pd.to_datetime(evictions_df['latest_docket_date']).dt.year

# Drop cases concluded in April 2020 or later.
pre_pandemic_months = ['2019-04',
 '2019-05',
 '2019-06',
 '2019-07',
 '2019-08',
 '2019-09',
 '2019-10',
 '2019-11',
 '2019-12',
 '2020-01',
 '2020-02',
 '2020-03']
pre_pandemic_mask = evictions_df['latest_docket_month'].isin(pre_pandemic_months)
evictions_df = evictions_df.loc[pre_pandemic_mask, :]
if VERBOSE:
    print(f"Dropping {(~pre_pandemic_mask).sum()} cases which concluded after pandemic began")
sample_restriction_table_index.append("Case concluded before April 2020")
sample_restriction_table_values.append(len(evictions_df))


# Drop cases resolved via mediation.
mediated_mask = evictions_df['disposition_found'] == "Mediated"
if VERBOSE:
    print(f"Dropping {mediated_mask.sum()} cases resolved through mediation.")
evictions_df = evictions_df.loc[~mediated_mask, :]
sample_restriction_table_index.append("Case not resolved through mediation")
sample_restriction_table_values.append(len(evictions_df))

# Drop cases where disposition found is other.
disposition_found_other_mask = evictions_df['disposition_found'] == "Other"
if VERBOSE:
    print(f"Dropping {disposition_found_other_mask.sum()} cases where disposition_found is \"Other\"")
evictions_df = evictions_df.loc[~disposition_found_other_mask, :]
# Drop rows which contain inconsistent values of disposition_found and judgment_for_pdu.
# First, we drop cases where disposition_found is "Defaulted" but judgment_for_pdu is "Defendant"
inconsistent_mask_1 = ((evictions_df['disposition_found'] == "Defaulted") & (evictions_df['judgment_for_pdu'] == "Defendant"))
if VERBOSE:
    print(f"Dropping {inconsistent_mask_1.sum()} observations where disposition_found is \"Defaulted\" but judgment_for_pdu is \"Defendant\".")
evictions_df = evictions_df.loc[~inconsistent_mask_1, :]
          
# Next, we drop cases where disposition_found is "Dismissed" yet judgment_for_pdu is "Plaintiff"
inconsistent_mask_2 = ((evictions_df['disposition_found'] == "Dismissed") & (evictions_df['judgment_for_pdu'] == "Plaintiff"))
if VERBOSE:
    print(f"Dropping {inconsistent_mask_2.sum()} observations where disposition_found is \"Dismissed\" but judgment_for_pdu is \"Plaintiff\".")
evictions_df = evictions_df.loc[~inconsistent_mask_2, :]
sample_restriction_table_index.append("Succesfully scraped judgment")
sample_restriction_table_values.append(len(evictions_df))

# Clean the values in the judgment_for_pdu variable.
judgment_for_pdu_replacement_dict = {"unknown": "Unknown",
                                     "plaintiff": "Plaintiff",
                                     "defendant": "Defendant"}
evictions_df.loc[:, "judgment_for_pdu"] = (evictions_df.loc[:, "judgment_for_pdu"]
                                           .replace(judgment_for_pdu_replacement_dict))

# Replace missing values in money judgment column with zeroes.
evictions_df.loc[:, 'judgment'] = evictions_df['judgment'].fillna(0)

# Rename duration to case_duration.
evictions_df = evictions_df.rename(columns={'duration': 'case_duration'})

Restricting to 6856 observations which are from Boston.
Beginning with 6856 observations.
Dropping 784 observations where latest_docket_date is missing.
Dropping 1 observations which have malformed addresses.
Dropping 0 rows missing property_address_full
Dropping 2719 cases which concluded after pandemic began
Dropping 1651 cases resolved through mediation.
Dropping 12 cases where disposition_found is "Other"
Dropping 0 observations where disposition_found is "Defaulted" but judgment_for_pdu is "Defendant".
Dropping 0 observations where disposition_found is "Dismissed" but judgment_for_pdu is "Plaintiff".


In [None]:
sample_restriction_table = pd.DataFrame()
sample_restriction_table["Restriction"] = sample_restriction_table_index
sample_restriction_table["Observations"] = sample_restriction_table_values                         
sample_restriction_table = sample_restriction_table.set_index("Restriction")

# Export to LaTeX.
filename = os.path.join(OUTPUT_TABLES, "sample_restriction.tex")
sample_restriction_table.style.format(formatter="{:,.0f}").to_latex(filename, hrules=True)
sample_restriction_table

Unnamed: 0_level_0,Observations
Restriction,Unnamed: 1_level_1
Case Filed in Boston,6856
Non-missing latest docket date,6072
Non-missing property address,6071
Case concluded before April 2020,3352
Case not resolved through mediation,1701
Succesfully scraped judgment,1689


## 2. Merging Evictions With Census Tract Characteristics

In [None]:
# Merge with census tract characteristics.
evictions_df = evictions_df.rename(columns={'Full FIPS (tract)': 'tract_geoid'})
evictions_tracts_df = evictions_df.merge(pd.read_csv(INPUT_DATA_TRACTS, dtype={'tract_geoid': float}),
                                  on='tract_geoid',
                                  how='left',
                                  validate='m:1').set_index('case_number')
if VERBOSE:
    print(f"Successfully merged {evictions_tracts_df['med_hhinc2016'].notna().sum()} observations with census tracts.")

Successfully merged 1689 observations with census tracts.


## 4. Merge Evictions with Crimes

In [None]:
# Request computing resources.
cluster = SLURMCluster(queue='batch',
                       cores=32,
                       memory='230 GB',
                       walltime='00:20:00',
                      scheduler_options={'dashboard_address': '8787'} )
cluster.scale(jobs=1)


Perhaps you already have a cluster running?
Hosting the HTTP server on port 39453 instead


In [None]:
client = Client(cluster)

In [None]:
# Create a GeoSeries containing eviction Points as its geometry, case_number as its index, and no other columns.
evictions_gdf = gpd.GeoDataFrame(evictions_df, geometry=gpd.points_from_xy(evictions_df['Longitude'], evictions_df['Latitude']))[['geometry', 'case_number']]
evictions_gdf = evictions_gdf.set_crs("EPSG:4326", allow_override=True).to_crs('EPSG:26986')

# Read crime data as Dask DataFrame, then compute back to DataFrame.
crime_df = (dd.read_csv(INPUT_DATA_CRIME + "/*.csv", dtype={'REPORTING_AREA': 'object', 'SHOOTING': 'object'})
                .dropna(subset=['Long', 'Lat', 'OCCURRED_ON_DATE'])  # Drop crimes missing latitude, longitude, or date, as they cannot be merged with panel.
                .rename(columns={'OCCURRED_ON_DATE': 'month_of_crime_incident'})
                .drop(columns=['OFFENSE_CODE_GROUP', 'OFFENSE_DESCRIPTION', 'DISTRICT', 'REPORTING_AREA', 'SHOOTING', 'YEAR', 'MONTH',
                               'DAY_OF_WEEK', 'HOUR', 'UCR_PART', 'STREET', 'Location'])  # Drop unneeded columns
                .compute())
                # Must call compute here and then briefly convert back to in-memory dataset because dask_geopandas.points_from_xy not working.
crime_df.loc[:, 'INCIDENT_NUMBER'] = 1  # Replace column with 1s so we can count crimes using sum function.
# Keep track of the month of crime incident in YYYY-MM format.
crime_df.loc[:, 'month_of_crime_incident'] = pd.to_datetime(crime_df['month_of_crime_incident'].str[:10]).dt.to_period("M").astype(str)
# Convert DataFrame to GeoDataFrame.
crime_gdf = gpd.GeoDataFrame(crime_df, geometry=gpd.points_from_xy(crime_df['Long'], crime_df['Lat']))
crime_gdf = crime_gdf.set_crs("EPSG:4326", allow_override=True).to_crs("EPSG:26986")  # Convert to the correct CRS.
# Convert GeoDataFrame to Dask GeoDataFrame.
crime_dgdf = dask_geopandas.from_geopandas(crime_gdf, npartitions=N_PARTITIONS).repartition(partition_size='5 MB')


# Create lists containing each group of offense codes.
# Group 1
wealthy_area_crimes = [613, 3801, 3410]



# Group 2
poor_area_crimes = [2646, 1849, 1842]


columns_for_each_outcome = []
for offense_group_num, offense_group in enumerate(['all',
                                                   wealthy_area_crimes,
                                                   poor_area_crimes]):
    for radius in [500]:
        # Create a new GeoDataFrame with geometry equal to circles around each eviction with the current radius.
        current_evictions_gdf = evictions_gdf.copy()
        current_evictions_gdf.geometry = current_evictions_gdf.geometry.buffer(radius)
        current_evictions_dgdf = dask_geopandas.from_geopandas(current_evictions_gdf, npartitions=N_PARTITIONS).repartition(partition_size='5 MB')

        # Merge evictions with crimes that fall into radius.
        current_evictions_crime_dgdf = dask_geopandas.sjoin(crime_dgdf, current_evictions_dgdf, how='inner', predicate='within')
        current_evictions_crime_dgdf = current_evictions_crime_dgdf.drop(columns=['geometry','index_right'])
        
        
        if offense_group != 'all':  # If we are summing a specific subcategory of crimes
            mask = current_evictions_crime_dgdf['OFFENSE_CODE'].isin(offense_group)
            # Multiply mask by 'INCIDENT_NUMBER' to zero out crimes in different offense groups.
            current_evictions_crime_dgdf['INCIDENT_NUMBER'] = current_evictions_crime_dgdf['INCIDENT_NUMBER'] * mask

            
        # Aggregate crimes to case-month level.
        current_panel_long = (current_evictions_crime_dgdf
                              .groupby(['case_number', 'month_of_crime_incident'])
                              .aggregate({'INCIDENT_NUMBER': 'sum'})
                              .reset_index()
                              .rename(columns={'INCIDENT_NUMBER': 'crime_incidents'})
                              .compute())

        current_panel_wide = pd.pivot(current_panel_long, index=['case_number'], columns=['month_of_crime_incident'],
                                      values='crime_incidents').reset_index().set_index('case_number')
        current_panel_wide.columns = [f'{column}_group_{offense_group_num}_crimes_{radius}m' for column in current_panel_wide.columns]
        columns_for_each_outcome.append(current_panel_wide.columns)
        value_vars_to_concat.append(dd.from_pandas(current_panel_wide, npartitions=N_PARTITIONS))
    

## 5. Producing the Unrestricted Dataset

In [None]:
unrestricted_df = dd.multi.concat([dd.from_pandas(evictions_tracts_df, npartitions=N_PARTITIONS)] + value_vars_to_concat, axis=1).compute()
# For evictions not matched to any crimes, fill NA values with 0.
for group_of_columns in columns_for_each_outcome:
    unrestricted_df[group_of_columns] = unrestricted_df[group_of_columns].fillna(0)
unrestricted_df.to_parquet(OUTPUT_DATA_UNRESTRICTED)

## 6. Producing the Samples Used in Analysis

In [None]:
# # Drop cases resolved via volntary dismissal (dropped by plaintiff). 
# voluntary_dismissal_mask = unrestricted_df['disposition'].str.contains("R 41(a)(1) Voluntary Dismissal on", na=False, regex=False)
# if VERBOSE:
#     print(f"Droppping {voluntary_dismissal_mask.sum()} cases resolved through voluntary dismissal.")
# unrestricted_df = unrestricted_df.loc[~voluntary_dismissal_mask, :]

In [None]:
# Generate a variable indicating judgment in favor of defendant.
unrestricted_df.loc[:, 'judgment_for_defendant'] = 0
defendant_won_mask = ((unrestricted_df['disposition_found'] == "Dismissed") |
                      (unrestricted_df['judgment_for_pdu'] == "Defendant") |
                      (unrestricted_df['disposition'].str.contains('R 41(a)(1) Voluntary Dismissal', regex=False)))
unrestricted_df.loc[defendant_won_mask, 'judgment_for_defendant'] = 1

# Generate a variable indicating judgement in favor of plaintiff.
unrestricted_df.loc[:, 'judgment_for_plaintiff'] = 1 - unrestricted_df['judgment_for_defendant']


### 6b. Producing the Crime Sample

In [None]:
crime_df = unrestricted_df.copy()
crime_df.to_parquet(OUTPUT_DATA_CRIME)