In [2]:
import os, threading, concurrent.futures, sys, pickle, time, gc
import pandas as pd, numpy as np
from sodapy          import Socrata
from tqdm.notebook   import tqdm
from dotenv          import load_dotenv
import plotly.express as px

load_dotenv()
pd.set_option('display.max_rows', 200)

Link to ACRIS download on Open NYC:
https://data.cityofnewyork.us/City-Government/ACRIS-Personal-Property-Master/sv7x-dduq

Link to ACRIS Document:
https://data.cityofnewyork.us/api/assets/F5C33BBE-4CF8-44EF-9160-BC3B83893C51?download=true

|Record Type|Dataset Title|Url|Description|
|:--|:--|:--|:--|
|Master record|ACRIS - Real Property Master|http://data.cityofnewyork.us/City-Government/ACRIS-Real-Property-Master/bnx9-e6tj |Document Details for Real Property Related Documents Recorded in ACRIS|
|Lot(property) record|ACRIS - Real Property Legals|http://data.cityofnewyork.us/City-Government/ACRIS-Real-Property-Legals/8h5j-fqxa |Property Details for Real Property Related Documents Recorded in ACRIS|
|Party record|ACRIS - Real Property Parties| http://data.cityofnewyork.us/City-Government/ACRIS-Real-Property-Parties/636b-3b5g |Party Names for Real Property Related Documents Recorded in ACRIS |
|Cross-reference record|ACRIS - Real Property References|http://data.cityofnewyork.us/City-Government/ACRIS-Real-Property-References/pwkr-dpni |Document Cross References for Real Property Related Documents Recorded in ACRIS |
|Remarks record|ACRIS - Real Property Remarks|http://data.cityofnewyork.us/City-Government/ACRIS-Real-Property-Remarks/9p4w-7npp |Document Remarks for Real Property Related Documents Recorded in ACRIS |
|Acris Party Types|ACRIS - Document Control Codes|https://data.cityofnewyork.us/City-Government/ACRIS-Document-Control-Codes/7isb-wh4c | ACRIS Document Type and Class Code mappings for Codes in the ACRIS Real and Personal Property Master Datasets|
|Energy Consumption|Local Law 84: Energy and Water Consumption|https://data.cityofnewyork.us/Environment/Energy-and-Water-Data-Disclosure-for-Local-Law-84-/qb3v-bbre | Data and metrics on water and energy consumption in buildings over 25,000 ft2|


Party 1: Borrower, Party 2: Lender

# Load ACRIS Data

In [None]:
class ACRIS_EXTRACT:
    
    def __init__(self, load_data=False):
        '''
        Class extracts, pickles and loads ACRIS data for the following ACRIS tables:
        Master - Document Details for Real Property Related Documents Recorded in ACRIS
        Property - Property Details for Real Property Related Documents Recorded in ACRIS
        Party - Party Names for Real Property Related Documents Recorded in ACRIS
        Energy - Local Law 84 Data and metrics on water and energy consumption in buildings over 25,000 ft2.
        
        Details can be found here https://data.cityofnewyork.us/api/assets/F5C33BBE-4CF8-44EF-9160-BC3B83893C51?download=true
        or from the NYC Open Data Site, https://opendata.cityofnewyork.us/
        
        NOTE: LL84 data is being supplied by Alejandro @ NYU. Extraction from NYC Open Data has been commented out
        
        Params:
        :load_data: - Boolean, if True, will automatically begin extracting data from ACRIS on init.
        '''

        self.datasets = {
            #DB Name, ACRIS ID, approximate n_records for progress bar
            'master'  : {'acris_id': 'bnx9-e6tj', 'n_records': 15200000,  'data': None},
            'property': {'acris_id': '8h5j-fqxa', 'n_records': 20000000,  'data': None},
            'party'   : {'acris_id': '636b-3b5g', 'n_records': 40000000,  'data': None},
#             'energy'  : {'acris_id': 'qb3v-bbre', 'n_records': 19404,     'data': None}
        }
        
        self.base_query_kwargs={
            # System fields, such as updated date. Limit by records sinnce 2010
            'exclude_system_fields': False,
            'where': "good_through_date > '2010-01-01'"
        }
        
        self.dataset_query_kwargs = {
            # per dataset query filters
            'master': {
#                 'doc_type': 'MTGE',
                'select': ','.join(['document_id', 'doc_type', 'document_amt', 'recorded_datetime']),
                **self.base_query_kwargs
            },
            'property': {
#                 'property_type': 'AP',
                'select': ','.join(['document_id', 'street_number', 'street_name', 'borough', 'block', 'lot', 'property_type', 'good_through_date']),
                **self.base_query_kwargs
            },
            'party': {
                'select': ','.join(['document_id', 'party_type', 'name', 'address_1', 'address_2', 'city', 'state', 'zip', 'country', 'good_through_date']),
                **self.base_query_kwargs
            },
            'energy': {
                'largest_property_use_type': 'Multifamily Housing',
                'exclude_system_fields': False
            }
        }
        
        # Used if extraction is interrupted to save locally
        self.checkpoint_data = None
        
        self.client = Socrata(
            'data.cityofnewyork.us',
            os.getenv("SOCRATA_KEY"),
            username=os.getenv("SOCRATA_USER"), 
            password=os.getenv("SOCRATA_PASS")
        )
        self.client.timeout = 60
        
        if load_data:
            self.load_data()
    
    def retrieve_acris_records(self, acris_dataset_id, query_kwargs):
        '''
        This method is called by individual workers from the batch_retrieve_acris_records method.
        '''
        return self.client.get(acris_dataset_id, **query_kwargs)
    
    def batch_retrieve_acris_records(self, dataset_dict, query_kwargs, n_threads=5, offset=0, checkpoint_after_rounds=10):
        '''
        This method spins up multiple parallel workers to extract ACRIs data.
        Method also allows for checkpointing of data incase of interruption.
        '''
        
        finished = False
        records  = []
        
        # Limit of ACRIS records per request
        records_per_request = 1000
        
        p_bar = tqdm(total=dataset_dict['n_records'])
        with concurrent.futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
            futures = []
            query_kwargs['limit'] = records_per_request
            
            # initial thread send
            for n in range(n_threads):
                query_kwargs['offset'] = offset
                futures.append(executor.submit(self.retrieve_acris_records, dataset_dict['acris_id'], query_kwargs))
                offset += records_per_request
                
            # as threads come in, process result and resend. End when all workers finish and nothing left in queue
            while len(futures) > 0:
                # Loop through futures and process if f.done()
                for i, f in enumerate(futures):
                    
                    if f.done():
                        # remove from futures queue
                        del futures[i]
                        
                        # Save records to class checkpoint in case of interruption
                        if offset % (checkpoint_after_rounds * records_per_request) == 0:
                            self.checkpoint_data = records

                        try:
                            # Check if results are empty
                            results = f.result()                            
                            if len(results) == 0:
                                finished = True # flags other threads not to resend
                                continue
                            
                            # save results
                            records.extend(results)
                            p_bar.update(len(results))

                            # resend thread for new batch
                            if not finished:
                                query_kwargs['offset'] = offset
                                futures.append(executor.submit(self.retrieve_acris_records, dataset_dict['acris_id'], query_kwargs))
                                offset += len(results)

                        except ValueError:
                            print('ValueError:', ValueError)
                            continue
                        except NameError:
                            print('NameError:', NameError)
                            continue
        
        # Combine all data into Dataframe
        records_df = pd.DataFrame.from_records(records)
        
        return records_df
    
    def load_data(self):
        '''
        For each dataset in self.datasets, checks if there is a local file named acris_{dataset}.csv.
        If data not local, extracts each dataset from ACRIS. Uses batch_retrieve_acris_records for extraction.
        '''
        for dataset in self.datasets.keys():
            
            start_time = time.time()
            print(f"Loading dataset {dataset}. Approx ~{self.datasets[dataset]['n_records']} records.")
            
            # Check if we already have data saved locally
            if os.path.isfile(f'./acris_{dataset}.csv'):
                self.datasets[dataset]['data'] = pd.read_csv(f'./acris_{dataset}.csv')

            else: # Download from ACRIS
                
                database_dict = self.datasets[dataset]
                query_kwargs  = self.dataset_query_kwargs[dataset]
    
                try:
                    self.datasets[dataset]['data'] = self.batch_retrieve_acris_records(database_dict, query_kwargs)
                except:
                    # Save checkpoint data to pickle file
                    self.pickle_checkpoint_data()
                    
                    import traceback
                    traceback.print_exc()
                
                # Save records as CSV
                self.datasets[dataset]['data'].to_csv(f'./acris_{dataset}.csv')
            
            # Logging statistics
            memory_usage = round(self.datasets[dataset]['data'].memory_usage().sum() / 1000000, 2)
            time_elapsed = round(time.time() - start_time, 2)
            print(f'{dataset} loaded in {time_elapsed} seconds. {memory_usage}MB memory.')

    def combine_datasets(self):
        '''
        The data is extracted individually by dataset and stored independent of one another.
        This method joins the master, property and party datsets together into a unified table. 
        This method can take over an hour.
        It also creates a BBL column.
        '''
        master = self.datasets['master']['data']
        props  = self.datasets['property']['data']
        party  = self.datasets['party']['data']
        # Left out energy data per use case.
                  
        # Create BBL Column
        props['BBL'] = props.borough.astype(str) + props.block.astype(str).str.zfill(5) + props.lot.astype(str).str.zfill(4)
        
        mp = master.merge(props, on='document_id', suffixes=('', '_prop'))
        mpp = mp.merge(party, on='document_id', suffixes=('', '_party'))
        
        # Save consolidated file locally
        mpp.to_csv(f'./acris_consolidated.csv')
                  
    def pickle_checkpoint_data(self, batch_size=None):
        '''
        This method is mainly used in the event that there is an error or interruption in the ACRIS extraction
        It saves the data from memory (self.checkpoint) to a batch of pickle files. 
        '''
                  
        if batch_size is None:
            batch_size = len(self.checkpoint_data) // 4
        
        n_batches = len(self.checkpoint_data) // batch_size + 1
        
        for batch_i in tqdm(range(n_batches+1)):
            data = acris.checkpoint_data[batch_i*batch_size : (batch_i+1)*batch_size]
            with open(f'./checkpoint_{batch_i}.pickle', 'wb') as pkl:
                pickle.dump(data, pkl)
                
    def load_pickled_data_into_checkpoint(self):
        '''
        This method is used when there was an error extracting ACRIS data and the checkpointed data was saved locally
        as pickle files. Use this method to loop through local pickle files and load records.
        '''
        records = []
        
        # hard code highest pickle file name + 1 (e.g. checkpoint_5.pickle = range(6))
        for i in tqdm(range(119)):
            records.extend(pd.read_pickle(f'./checkpoint_{i}.pickle'))
        
        self.checkpoint = records[:]
        
    def commit_checkpoint_data_into_data_record(self, dataset):
        '''
        After loading picked data from load_pickled_data_into_checkpoint, use this method to 
        commit that data to a dataset. For example:
        self.load_pickled_data_into_checkpoint()
        self.commit_checkpoint_data_into_data_record('master')
        
        Data daved to self.dataset.master.data
        '''
        
        if dataset not in self.datasets.keys():
            raise 'Dataset not found'
        
        self.datasets[dataset]['data']      = pd.DataFrame.from_records(self.checkpoint)
        self.datasets[dataset]['n_records'] = len(self.datasets[dataset]['data'])

In [None]:
acris = ACRIS_EXTRACT(load_data=True)
acris.combine_datasets()

# Combine ACRIS with LL84 Data

In [None]:
# Acris Data (Supplied from ACRIS_EXTRACT)
acris_data = pd.read_csv('acris_consolidated.csv')

# LL84 Data (Supplied by Alejandro @ NYU. This can also be supplied by ACRIS_EXTRACT)
ll = pd.read_excel('LL84_2020.xlsx') \
     .drop_duplicates(subset=['Property Id']) \
     .set_index('Property Id')

In [6]:
# Extract just BBLS for matching with ACRIS Data
ll_bbls = (ll['BBL - 10 digits']
            .replace('Not Available', '0'.zfill(10))
            .apply(lambda r: str(r).split(';'))
            .explode()
            .drop_duplicates()
            .rename(columns={'BBL - 10 digits': 'BBL'}
            .reset_index()
          )

# Print out how many BBL matches between LL84 & ACRIS datasets
ll_bbls.isin(acris_data.BBL.unique()).sum() / ll_bbls.nunique()

In [None]:
# Filter out ACRIS records not found in the LL84 BBL's
# ~ 35 min
acris_data = acris_data.loc[acris_data.BBL.isin(ll_bbls)]

In [None]:
# Merge ACRIS with LL84 BBLs (quicker to start here and then merge rest of LL84 Data)
data = acris_data.merge(ll_bbls, on='BBL')

# Merge with full LL84 file
data = data.merge(ll, left_on='Property Id', right_index=True)

In [None]:
# Save Consolidated File to Local
data.to_csv('LL84 and ACRIS.csv')

# Format Output

In [8]:
df = pd.read_csv('LL84 and ACRIS.csv', low_memory=False)

cols_to_remove = [
    *df.columns[df.columns.str.contains(':')].tolist(),
    *df.columns[df.columns.str.contains('good_through')].tolist(),
    'Order'
]
df = df.drop(columns=cols_to_remove)

df = df.rename(columns={'BBL': 'doc_bbl'})

In [9]:
# Filter requested cols
display_cols = [
# LL 84
    'Property Id',
#     'doc_bbl',
#     'Property Name',
# #      'Parent Property Id',
#     'BBL - 10 digits',
#     'NYC Building Identification Number (BIN)',
#     'Parent Property Name',
#     'City Building',
#     'Email',
#     'Address 1 (self-reported)',
#     'Borough',
#     'Postal Code',
#     'DOF Gross Floor Area (ft¬≤)',
#     'Self-Reported Gross Floor Area (ft¬≤)',
#     'Primary Property Type - Self Selected',
#     'List of All Property Use Types at Property',
#     'Largest Property Use Type',
#     'Largest Property Use Type - Gross Floor Area (ft¬≤)',
#     '2nd Largest Property Use Type',
#     '2nd Largest Property Use - Gross Floor Area (ft¬≤)',
#     '3rd Largest Property Use Type',
#     '3rd Largest Property Use Type - Gross Floor Area (ft¬≤)',
#     'Year Built',
#     'Number of Buildings',
#     'Occupancy',
#     'Latitude',
#     'Longitude',
#     'Community Board',
#     'Council District',
#     'Census Tract',
#     'NTA',
#     'Fuel Oil #1 Use (kBtu)',
#     'Fuel Oil #2 Use (kBtu)',
#     'Fuel Oil #4 Use (kBtu)',
#     'Fuel Oil #5 & 6 Use (kBtu)',
#     'Diesel #2 Use (kBtu)',
# #   'Kerosene Use (kBtu)',
#     'Propane Use (kBtu)',
#     'District Steam Use (kBtu)',
# #     'District Hot Water Use (kBtu)',
# #     'District Chilled Water Use (kBtu)',
#     'Natural Gas Use (kBtu)',
# #     'Natural Gas Use (therms)',
# #     'Weather Normalized Site Natural Gas Use (therms)',
# #     'Electricity Use - Grid Purchase (kBtu)',
#     'Electricity Use - Grid Purchase (kWh)',
#     'Total GHG Emissions (Metric Tons CO2e)',
#     'Direct GHG Emissions (Metric Tons CO2e)',
#     'Indirect GHG Emissions (Metric Tons CO2e)',
#     'ENERGY STAR Score',
#     'Site EUI (kBtu/ft¬≤)',
    
#     Doc cols
    'document_id',
    'doc_type',
    'document_amt',
    'recorded_datetime',
    
#     Party cols
    'party_type',
    'name',
    'address_1',
    'zip',
]

docs = df[display_cols]
docs.head().T

Unnamed: 0,0,1,2,3,4
Property Id,10432710,10432710,10432710,10432710,10432710
document_id,2017010400912027,2017010400912027,2017010400912027,2017010400912026,2017010400912026
doc_type,MTGE,MTGE,MTGE,MTGE,MTGE
document_amt,8.8367e+06,8.8367e+06,8.8367e+06,811483,811483
recorded_datetime,2017-01-10T00:00:00.000,2017-01-10T00:00:00.000,2017-01-10T00:00:00.000,2017-01-10T00:00:00.000,2017-01-10T00:00:00.000
party_type,2,2,1,1,1
name,WOMEN'S HOUSING AND ECONOMIC DEVELOPMENT CORPO...,BFC BRONX COMMONS LLC,BRONX COMMONS LLC,BRONX COMMONS LIHTC LLC,BRONX COMMONS LLC
address_1,50 EAST 168TH STREET,"150 MYRTLE AVENUE, SUITE 2","150 MYRTLE AVENUE, SUITE 2","150 MYRTLE AVENUE, SUITE 2","150 MYRTLE AVENUE, SUITE 2"
zip,10452,11201,11201,11201,11201


In [10]:
# Pull only MTGE type Docs
filtered_docs = docs[(docs.doc_type=='MTGE')] \
         .sort_values(by=['Property Id', 'recorded_datetime'], ascending=False)

In [11]:
# Pull N documents per Property ID & Party Type.
# Example, for each property, pull most recent 5 docs for party 1 and 5 docs for party 2
n_docs = 5
filtered_docs['doc_idx'] = filtered_docs.groupby(['Property Id', 'party_type']).cumcount().add(1)
filtered_docs = filtered_docs[filtered_docs.doc_idx <= n_docs]
filtered_docs = filtered_docs.set_index(['Property Id', 'party_type', 'doc_idx']).unstack(level=[0]).T.unstack(level=[0])
filtered_docs.head()

party_type,1,1,1,1,1,1,1,1,1,1,...,2,2,2,2,2,2,2,2,2,2
doc_idx,1,1,1,1,1,1,1,2,2,2,...,4,4,4,5,5,5,5,5,5,5
Unnamed: 0_level_2,document_id,doc_type,document_amt,recorded_datetime,name,address_1,zip,document_id,doc_type,document_amt,...,name,address_1,zip,document_id,doc_type,document_amt,recorded_datetime,name,address_1,zip
Property Id,Unnamed: 1_level_3,Unnamed: 2_level_3,Unnamed: 3_level_3,Unnamed: 4_level_3,Unnamed: 5_level_3,Unnamed: 6_level_3,Unnamed: 7_level_3,Unnamed: 8_level_3,Unnamed: 9_level_3,Unnamed: 10_level_3,Unnamed: 11_level_3,Unnamed: 12_level_3,Unnamed: 13_level_3,Unnamed: 14_level_3,Unnamed: 15_level_3,Unnamed: 16_level_3,Unnamed: 17_level_3,Unnamed: 18_level_3,Unnamed: 19_level_3,Unnamed: 20_level_3,Unnamed: 21_level_3
7365,2005111500571007,MTGE,51175900.0,2005-11-30T00:00:00.000,DOLP 1155 PROPERTIES II LLC,"C/O THE DURST ORGANIZATION INC.,",10036,FT_1080004897508,MTGE,85000000.0,...,BANK OF NEW YORK,,0.0,FT_1560000023756,MTGE,10000000.0,1983-02-14T00:00:00.000,BANK OF NEW YORK,,0.0
8139,2015040700622001,MTGE,75000000.0,2015-04-14T00:00:00.000,DOLP 655 PROPERTIES II LLC,C/O ROYAL REALTY CORP.,10036,2003012402183009,MTGE,6284310.0,...,AETNA LF INS CO,,,FT_1290008664329,MTGE,0.0,1966-09-13T00:00:00.000,AETNA LF INS CO,,
8604,2003090801097004,MTGE,11666700.0,2003-10-01T00:00:00.000,DOLP 114 PROPERTIES II LLC,"C/O THE DURST ORGANIZATION, INC.",10017,FT_1500000336050,MTGE,25000000.0,...,BANK OF NEW YORK,48 WALL STREET,10015.0,FT_1600000290660,MTGE,25000000.0,1989-02-17T00:00:00.000,BANK OF NEW YORK,48 WALL ST,10015.0
8841,2005111500571013,MTGE,34398100.0,2005-11-30T00:00:00.000,DOLP 733 PROPERTIES II LLC,"C/O THE DURST ORGANIZATION INC.,",10036,FT_1360000392936,MTGE,1647900.0,...,CHEMICAL BANK,277 PARK AVE,10172.0,FT_1320008681632,MTGE,0.0,1967-11-02T00:00:00.000,NY LF INS CO,,
11809,2006112900186009,MTGE,246384000.0,2007-01-29T00:00:00.000,4TS II LLC,"C/O THE DURST ORGANIZATION, INC.",10036,FT_1750006219875,MTGE,236500000.0,...,UBS PRINCIPAL FINANCE LLC,299 PARK AVENUE,10171.0,FT_1450007124845,MTGE,90000000.0,2000-04-20T00:00:00.000,NEW YORK STATE URBANDEVELOPMENT CORPORATION,633 THIRD AVENUE,10017.0


In [24]:
# Alejandro requested that extracted docs simply appended to end of his LL84 XLS
# Instead of adding cols back, we'll just reload his file
ll = pd.read_excel('LL84_2020.xlsx')
ll = ll.merge(filtered_docs, how='left', left_on='Property Id', right_index=True)
ll.to_csv('~/Desktop/merged_ll84_parties.csv')