In [101]:
import requests
from retrying import retry
from ediblepickle import checkpoint
from urllib.parse import quote
from typing import Union
from datetime import datetime
import pandas as pd
from keys import *
from sklearn.base import TransformerMixin, BaseEstimator

keys = getKeys()
'''
try:
    import lightrun
    lightrun.enable(company_key=keys['LightRunCompanyKey'])
except ImportError as e:
    print("Error importing Lightrun: ", e)
'''

'\ntry:\n    import lightrun\n    lightrun.enable(company_key=keys[\'LightRunCompanyKey\'])\nexcept ImportError as e:\n    print("Error importing Lightrun: ", e)\n'

In [55]:
@retry(stop_max_attempt_number=5)
@checkpoint(key=lambda args, kwargs: quote(args[0]) + '.pkl', work_dir='Saved Results/PropertyDetail/')
def get_PropertyDetail(property_id : str) -> dict:
    if not isinstance(property_id, str):
        try:
            property_id = str(property_id)
        except:
            raise Exception('Could not convert input to string.')

    url = "https://us-real-estate.p.rapidapi.com/v2/property-detail"

    querystring = {
        "property_id": property_id
    }

    headers = {
        "X-RapidAPI-Key": keys['USRealEstate'],
        "X-RapidAPI-Host": "us-real-estate.p.rapidapi.com"
    }

    response = requests.request("GET", url, headers=headers, params=querystring)
    return response.json()

@retry(stop_max_attempt_number=5)
def get_PropertyForSaleByZipcode(zipcode : str, 
        property_type : str = 'single_family',
        n_results : int = 100
    ) -> dict:

    url = "https://us-real-estate.p.rapidapi.com/v2/for-sale-by-zipcode"

    # We need to make a loop here and iterate the offset until we hit the end or the limit.
    # This is going to take a lot of API calls.

    # This can be increased to 200 once we get the paid plan.
    limit = min(42, n_results)

    '''
    Other query string parameters:
    sort = (default: relevant)|newest|lowest_price|highest_price|open_house_date|price_reduced_date|largest_sqft|lot_size|sold_date
    price_min/max = $ USD
    beds_min/max = #
    bath_min/max = #
    property_type = multi_family|single_family|mobile|land|farm (I think we should just use : 'single_family')
    '''

    querystring = {
        "zipcode":zipcode,
        "offset":"0",
        "limit":str(limit),
        "property_type":property_type
    }

    headers = {
        "X-RapidAPI-Key": keys['USRealEstate'],
        "X-RapidAPI-Host": "us-real-estate.p.rapidapi.com"
    }

    output = []

    '''
    while(True):
        response = requests.request("GET", url, headers=headers, params=querystring)
        output.append(response.json())

        if len(response) < limit
    '''

    response = requests.request("GET", url, headers=headers, params=querystring)
    return response.json()

@retry(stop_max_attempt_number=5)
def get_PropertySoldByZipcode(zipcode : str, 
        n_results : int,
        property_type : str = 'single_family'
    ) -> dict:
    
    '''
    NOTE: This does not seem to have a limit arguement. I do not know how this works with offset?
    '''

    url = "https://us-real-estate.p.rapidapi.com/v2/sold-homes-by-zipcode"

    # This can be increased to 200 once we get the paid plan.
    offset = min(42, n_results)

    '''
    Other query string parameters:
    sort = (default: relevant)|newest|lowest_price|highest_price|open_house_date|price_reduced_date|largest_sqft|lot_size|sold_date
    price_min/max = $ USD
    beds_min/max = #
    bath_min/max = #
    property_type = multi_family|single_family|mobile|land|farm (I think we should just use : 'single_family')
    '''

    querystring = {
        "zipcode":zipcode,
        "offset":offset,
        "property_type":property_type
    }

    headers = {
        "X-RapidAPI-Key": keys['USRealEstate'],
        "X-RapidAPI-Host": "us-real-estate.p.rapidapi.com"
    }

    output = []

    response = requests.request("GET", url, headers=headers, params=querystring).json()

    return response

def get_LocationSuggest(search_keyword : str, 
        return_all : bool = False
    ) -> dict:

    url = "https://us-real-estate.p.rapidapi.com/location/suggest"

    querystring = {"input":search_keyword}

    headers = {
        "X-RapidAPI-Key": keys['USRealEstate'],
        "X-RapidAPI-Host": "us-real-estate.p.rapidapi.com"
    }

    response = requests.request("GET", url, headers=headers, params=querystring)
    response_json = response.json()

    return response_json if return_all else response_json['data'][0]


def get_PropertyForSaleByArea(
        city : str = '',
        state : str = '',
        n_results : int = 100 # How many houses do you want to get back.
        # Should make a method that if n_results is None, it pulls back everything.
    ) -> dict:

    # This can be increased to 200 once we move to the paid version.
    limit = min(42, n_results)

    url = "https://us-real-estate.p.rapidapi.com/v2/for-sale"

    querystring = {
        "state_code":state,
        "city":city,
        "offset":"0",
        "limit":str(limit),
        "sort":"newest"
    }

    headers = {
        "X-RapidAPI-Key": keys['USRealEstate'],
        "X-RapidAPI-Host": "us-real-estate.p.rapidapi.com"
    }

    response = requests.request("GET", url, headers=headers, params=querystring).json()
    total_houses_available = int(response['data']['home_search']['total'])
    total_houses_in_request = int(response['data']['home_search']['count'])

    print(f'Returning {str(min(total_houses_available, n_results))} out of a possible {str(total_houses_available)}.')

    geo_to_return = response['data']['geo'] # We know this is the same at each iteration.
    houses_to_return = response['data']['home_search']['results']

    # Are the returned 'geo' values the same, or do they change when there are offests?
    # I want this function to be pure, just returning the raw json info.
    # I am thinking we return a dict with two values:
    # - meta_data : ['data']['geo']
    # - houses : ['data']['home_search']['results']

    houses_remaining = min(total_houses_available, n_results) - len(houses_to_return)

    while houses_remaining > 0:
        querystring['offset'] = str(int(querystring['offset']) + limit)
        querystring['limit'] = str(min(limit, houses_remaining))

        response = requests.request("GET", url, headers=headers, params=querystring).json()
        houses_to_return.extend(response['data']['home_search']['results'])
        houses_remaining -= len(response['data']['home_search']['results'])

    return {
        'houses' : houses_to_return,
        'geo' : geo_to_return
    }

We are going to be digesting two areas of the API return:

    - v['data']['geo']
    
    - v['data']['home_search']['results']

In [15]:
# This will be taking in the following: tt['data']['geo']
class geo_data():
    '''
    This is going to be used to organize the meta information about each query.
    I need to think where it is most appropriate to do this.
    '''
    def __init__(self, stats : dict):
        self.zip_info = self._parse_areas(stats.get('recommended_zips', {}).get('geos'))
        self.city_info = self._parse_areas(stats.get('recommended_cities', {}).get('geos'))
        self.county_info = self._parse_areas(stats.get('recommended_counties', {}).get('geos'))
        self.neighborhood_info = self._parse_areas(stats.get('recommended_neighborhoods', {}).get('geos'))
        self.market_stats = self._parse_statistics(stats.get('geo_statistics', {}).get('housing_market'))

    def __repr__(self) -> str:
        pass

    def _parse_areas(self, geos : dict) -> dict:
        return None if geos is None else {
            v.get(v.get('geo_type', 'slug_id'), '_parse_areas_FAILED') : {
                'slug_id' : v.get('slug_id'),
                'median_listing_price' : v.get('geo_statistics', {}).get('housing_market', {}).get('median_listing_price'),
                'state_code' : v.get('state_code'),
                'city_code' : v.get('city'),
                'geo_type' : v.get('geo_type')
            } for v in geos
        }
    
    def _parse_statistics(self, geo_stats : dict) -> dict:
        return None if geo_stats is None else {
            'median_days_on_market' : geo_stats.get('median_days_on_market'),
            'median_sold_price' : geo_stats.get('median_sold_price'),
            'median_price_per_sqft' : geo_stats.get('median_price_per_sqft'),
            'median_listing_price' : geo_stats.get('median_listing_price'),
            'month_to_month_metrics' : geo_stats.get('month_to_month'),
            'by_prop_type' : {
                ht.get('type') : {
                    k : v for k, v in ht.get('attributes', {}).items()
                } for ht in geo_stats.get('by_prop_type', {})
            }
        }

In [81]:
# This will be taking in the following: tt['data']['home_search']['results'][n]
class house():
    '''
    This is going to be the class that houses (hehe) all the house data. Each house will have its own instance.
    When we use the API, there is a lot of data reutned nested in a number of dictionaries. This will take the 'juicy' bit.
    The idea for this class is that it will hold all the needed info for:
         1) the GUI, address, google street view, other photos. This will probably be a flask application to start, but we are 
            far from even thinking about that.
         2) the MODEL, tags, list_prices, other flags. What if we created a word cloud and have the user select key words for 
            their house until they have selected some flat number or % contribution to model from the tags TBD. There will be 
            dates there, we will use days old (or something similar) for the model training, while the actual took will use zero, 
            as the user is entering 100% correct info. This may or may not be a good idea, as it might have unintended implications
            within the model.

    Interior functions:
        Date Cleaning
        Location Cleaning
        Description Cleaning
    '''
    def __init__(self, listing : list):
        self.reference_info = { # This is stuff not going into the model
            'id' : listing['property_id'],
            'photos' : list(set([listing['primary_photo']['href']] + [l['href'] for l in listing['photos']]))
        }

        self.raw_last_update = listing['last_update_date']
        self.raw_list_date = listing['list_date']
        self.tags = listing['tags']
        self.list_price = listing['list_price']
        self.new_construction = listing.get('flags', {}).get('is_new_construction', False) or False

        self.raw_location = listing['location']
        self.raw_description = listing['description']

        self._clean_dates()
        self._clean_location()
        self._clean_description()

    def __repr__(self) -> str:
        pass
        
    def _convert_date(self, date : str) -> datetime:
        return datetime.strptime(date, '%Y-%m-%d')
    
    def _clean_dates(self) -> None:
        last_update_date_parsed = self.raw_last_update.split('T')
        list_date_parsed = self.raw_list_date.split('T')
        self.last_update = self._convert_date(last_update_date_parsed[0]) if len(last_update_date_parsed) == 2 else None
        self.list_date = self._convert_date(list_date_parsed[0]) if len(list_date_parsed) == 2 else None
        self.last_update_delta = None if self.last_update is None else max((datetime.now() - self.last_update).days, 0)
        self.list_date_delta = None if self.list_date is None else max((datetime.now() - self.list_date).days, 0)
        
    def _clean_location(self) -> None:
        self.reference_info.update({
            'zip_code' : self.raw_location.get('address', {}).get('postal_code'),
            'state' : self.raw_location.get('address', {}).get('state'),
            'google_map_street_view' : self.raw_location.get('street_view_url'),
            'fips_code' : self.raw_location.get('county', {}).get('fips_code'),
            'county' : self.raw_location.get('county', {}).get('county')
        })

        lat_long = self.raw_location.get('address', {}).get('coordinate')
        self.lat_long = (None, None) if lat_long is None else (lat_long.get('lat'), lat_long.get('lon'))     

    def _clean_description(self) -> None:
        self.baths_full = self.raw_description.get('baths_full') or 0
        self.baths_3qtr = self.raw_description.get('baths_3qtr') or 0
        self.baths_half = self.raw_description.get('baths_half') or 0
        self.baths_1qtr = self.raw_description.get('baths_1qtr') or 0
        self.year_built = self.raw_description.get('year_built')
        self.lot_sqft = self.raw_description.get('lot_sqft')
        self.sqft = self.raw_description.get('sqft')
        self.garage = self.raw_description.get('garage') or 0
        self.stories = self.raw_description.get('stories') or 1
        self.beds = self.raw_description.get('beds')
        self.type = self.raw_description.get('type')

    def _validate(self):
        '''
        This will be used to flag anything the looks strange (missing values, etc)
        '''

In [None]:
def format_as_houses(data : dict) -> dict:
    '''
    I am going to try and make the outputs of all the API's to be the same.
    In my head they should have similar structure, maybe sold houses are differently structured, we will have to see.

    This will also transform the house type to a pandas dataframe.

    As I do this, I am reminded that we should be thinking again and planning on using pipes.
    If we are going to use pipes, we need to think about HOW we are going to implement this from the top down.
    Where does the pipeline start? After querying the data? I think thats a fair place to start, but it forces out hands to ensure
    the outputs of the functions to be the same.
    '''
    output = {
        'houses' : [house(h) for h in data['houses']],
        'geo' : geo_data(data['geo'])
    }


    pass

In [102]:
class ToDataFrame(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X) -> pd.DataFrame:
        return pd.DataFrame({
            'Days_listed' : X.list_date_delta,
            'Days_updated' : X.last_update_delta,
            'baths_full' : X.baths_full,
            'baths_3qtr' : X.baths_3qtr,
            'baths_half' : X.baths_half,
            'baths_1qtr' : X.baths_1qtr,
            'year_built' : X.year_built,
            'lot_sqft' : X.lot_sqft,
            'sqft' : X.sqft,
            'garage' : X.garage,
            'stories' : X.stories,
            'beds' : X.beds,
            'type' : X.type,
            'tags' : X.tags,
            'lot_long' : X.lat_long,
            'new_construction' : X.new_construction
        })

In [91]:
test = house(tt['houses'][0])

In [109]:
max((datetime.now() - test.last_update).days, 0)

0

In [None]:
tt = get_PropertyForSaleByArea(city='seattle', state='WA', n_results=100)

In [None]:
from geopy.geocoders import Nominatim
geolocator = Nominatim(user_agent="geoapiExercises")

In [1]:
tt = geolocator.geocode('Chicago')

NameError: name 'geolocator' is not defined

In [97]:
a = {'bb':5,
'cc' : {
    'dd' : 4,
    'ee' : 7
}, 'xxx' : 1}

In [98]:
pd.DataFrame(a)

Unnamed: 0,bb,cc,xxx
dd,5,4,1
ee,5,7,1
