In [128]:
import quandl
import pgeocode
import re
import numpy as np
import pandas as pd
import time
import threading

# logging warnings and errors
import logging
import sys
from io import StringIO

import numpy as np

In [49]:
logging.basicConfig(filename='log_file.log',
                    level=logging.DEBUG,
                    format='%(asctime)s %(message)s',
                    datefmt='%m/%d/%Y %I:%M:%S %p',
                    filemode='a')
logger = logging.getLogger()

In [50]:
states = ["AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DC", "DE", "FL", "GA", 
          "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
          "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", 
          "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
          "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"]

In [116]:
def check_state(search_str):
    search_str_list = [x.strip() for x in search_str.split(';')]
    for x in search_str_list:
        if x in states:
            return x
        
def check_county(search_str):
    search_str_list = [x.strip() for x in search_str.split(';')]
    for x in search_str_list:
        if 'county' in x.lower():
            return x       
        
def check_city(search_str):
    search_str_list = [x.strip() for x in search_str.split(';')]
    if len(search_str_list) == 1:
        return np.nan
    if 'county' not in search_str_list[-1].lower():
        return search_str_list[-1]
    
def check_metro(search_str):
    search_str_list = [x.strip() for x in search_str.split(';')]
    if len(search_str_list) <= 3:
        return np.nan
    if 'county' not in search_str_list[2].lower():
        return search_str_list[2]    
    
def quandl_data(indicator_id, region_id=None, api_key=None):
    try:
        quandl.ApiConfig.api_key = api_key
        return quandl.get_table('ZILLOW/DATA', indicator_id=indicator_id, region_id=region_id, paginate=True)
    except Exception as e:
        logger.error(f"Failed to retrieve data using API key '{key}': {e}")         

In [227]:
def track_sleep_sec(calls_made, calls_last_10s, calls_last_10m,last_10s_time,last_10m_time):
    if calls_made >= max_calls_per_day:
        logger.warning('API call limit reached for the day')
        return

    if calls_last_10s >= max_calls_per_10_seconds:
        elapsed_time = time.time() - last_10s_time
        if elapsed_time < 10:
            logger.warning('Number of API calls per 10 Seconds limit reached')
            return(10 - elapsed_time)
        calls_last_10s = 0

    if calls_last_10m >= max_calls_per_10_minutes:
        elapsed_time = time.time() - last_10m_time
        if elapsed_time < 600:
            logger.warning('Number of API calls per 10 Minutes limit reached')
            return(600 - elapsed_time)
        calls_last_10m = 0
    return 0

SyntaxError: duplicate argument 'calls_last_10m' in function definition (1096591641.py, line 1)

In [224]:
def get_data(tracked_indicators, api_key):
    calls_made = 0
    calls_last_10s = 0
    calls_last_10m = 0
    lock = threading.Semaphore(1)
    
    loops_made = 0
    max_no_loop = (len(tracked_indicators)+1)*(len(region_ids)+1)
    

    
    # create empty dataframe 
    df_columns = quandl_data(tracked_indicators.iloc[0,0],tracked_indicators.iloc[0,1],api_key).columns
    df_data = pd.DataFrame(columns=df_columns,index=None)
    
    for index, row in tracked_indicators.iterrows():
        ind = row[tracked_indicators.columns[0]]
        region = row[tracked_indicators.columns[1]]
        
        lock.acquire()

        try:
            sleep_sec = track_sleep_sec(calls_made, calls_last_10s, calls_last_10m,last_10s_time,last_10m_time)
            time.sleep(sleep_sec)
            print(f'Indicator: {ind} - Region: {region} - Ingesting.... calls made: {calls_made} sleep_sec: {sleep_sec}')

            data = quandl_data(ind, region,api_key)
            print(data.empty)
            if data.empty is False:
                data.reset_index(inplace=True)
                data = data[['indicator_id', 'region_id','date','value']]
                df_data = pd.concat([df_data, data], axis=0, ignore_index=True)
                logger.info(f'Indicator: {ind} - Region: {region} - Successfully Ingested')

            calls_made += 1
            calls_last_10s += 1
            calls_last_10m += 1
            loops_made += 1

            last_10s_time = time.time()
            last_10m_time = time.time()

            if loops_made == max_no_loop:
                return df_data

        except Exception as e:
            logger.error(f'Region: {region} - Indicator: {ind} - {e}')

        finally:
            lock.release()


In [209]:
indicators_file = 'zillow_indicators.csv'
inicators_track = 'zillow_indicators_ingest.csv'
api_keys = ['xuisyPUDscg1rq-HiMz7']

# api_keys = ['xuisyPUDscg1rq-HiMz7','prdogxsj8me2Mma8Xu5h']


In [210]:
max_calls_per_day = 50000
max_calls_per_10_seconds = 300
max_calls_per_10_minutes = 2000
concurrency_limit = 1

In [140]:
# get indicators
df_ind = quandl.get_table("ZILLOW/INDICATORS",paginate=True)

# get regions by zip
df_regions = quandl.get_table("ZILLOW/REGIONS",paginate=True)
df_regions_zip = df_regions[df_regions['region_type']=='zip']

df_regions_zip = df_regions_zip.copy()
df_regions_zip['region_str_len'] = df_regions_zip.apply(lambda x: len(x['region'].split(';')),axis=1)
df_regions_zip['zip'] = df_regions_zip.apply(lambda x: re.search('(\d{5})',x['region']).group(),axis=1)
df_regions_zip['state'] = df_regions_zip.apply(lambda x: check_state(x['region']),axis=1)
df_regions_zip['county'] = df_regions_zip.apply(lambda x: check_county(x['region']),axis=1)
df_regions_zip['city'] = df_regions_zip.apply(lambda x: check_city(x['region']),axis=1)
df_regions_zip['metro'] = df_regions_zip.apply(lambda x: check_metro(x['region']),axis=1)

In [141]:
region_ids = df_regions_zip['region_id']
indicator_ids = df_ind['indicator_id']

In [142]:
# save indicator
df_ind.to_csv(indicators_file)

In [143]:
# select tracked indicator 
indicators = pd.read_csv(inicators_track)
tracked_ind = indicators[indicators['ingest']=='Y']['indicator_id']

In [87]:
# get the number of splits 
n_splits = len(api_keys)

# split the dataframe into n smaller dataframes
region_split = np.array_split(region_ids, n_splits)

# print the resulting dataframes
for i, split_df in enumerate(region_split):
    print(f"Split {i+1}:")
    print(region_split)
    
    # repeat the values in column 'tracked_ind' to match the length of region_ids
    tracked_ind_repeated = tracked_ind.loc[df_1.index.repeat(len(df_2))]
    tracked_ind_df = tracked_ind_repeated.reset_index(drop=True)
    

8.84604

In [134]:
# get the number of splits 
n_splits = len(api_keys)

# split the dataframe into n smaller dataframes
region_split = np.array_split(region_ids, n_splits)


  sub_arys.append(_nx.swapaxes(sary[st:end], axis, 0))


None
24894     78996
24895     78995
24896     78993
24897     78991
24898     78990
          ...  
89296    100004
89297    100003
89298    100002
89299    100001
89300    100000
Name: region_id, Length: 15796, dtype: object

In [211]:
# repeat the values in column 'region_id' to match the length of tracked_ind
region_ids_repeated = region_ids.loc[df_2.index.repeat(len(df_1))]
region_ids_df = region_ids_repeated.reset_index(drop=True)

# concatenate the two dataframes
tracked_indicators = pd.concat([tracked_ind_df, region_ids_df], axis=1)

In [225]:
get_data(tracked_indicators,api_keys[0])

Indicator: ZSFH - Region: 99999 - Ingesting.... calls made: 0 sleep_sec: 0
False
  indicator_id region_id       date          value
0         ZSFH     99999 2023-01-31  442972.823112
1         ZSFH     99999 2022-12-31  447792.833388
2         ZSFH     99999 2022-11-30  452608.490554
3         ZSFH     99999 2022-10-31  552233.000000
4         ZSFH     99999 2022-09-30  569345.000000
Indicator: ZSFH - Region: 99999 - Successfully Ingested
Indicator: ZSFH - Region: 99999 - Ingesting.... calls made: 1 sleep_sec: 0
False
  indicator_id region_id       date          value
0         ZSFH     99999 2023-01-31  442972.823112
1         ZSFH     99999 2022-12-31  447792.833388
2         ZSFH     99999 2022-11-30  452608.490554
3         ZSFH     99999 2022-10-31  552233.000000
4         ZSFH     99999 2022-09-30  569345.000000
Indicator: ZSFH - Region: 99999 - Successfully Ingested
Indicator: ZSFH - Region: 99999 - Ingesting.... calls made: 2 sleep_sec: 0
False
  indicator_id region_id       da

False
  indicator_id region_id       date          value
0         ZSFH     99999 2023-01-31  442972.823112
1         ZSFH     99999 2022-12-31  447792.833388
2         ZSFH     99999 2022-11-30  452608.490554
3         ZSFH     99999 2022-10-31  552233.000000
4         ZSFH     99999 2022-09-30  569345.000000
Indicator: ZSFH - Region: 99998 - Successfully Ingested
Indicator: ZSFH - Region: 99998 - Ingesting.... calls made: 20 sleep_sec: 0
False
  indicator_id region_id       date          value
0         ZSFH     99999 2023-01-31  442972.823112
1         ZSFH     99999 2022-12-31  447792.833388
2         ZSFH     99999 2022-11-30  452608.490554
3         ZSFH     99999 2022-10-31  552233.000000
4         ZSFH     99999 2022-09-30  569345.000000
Indicator: ZSFH - Region: 99998 - Successfully Ingested
Indicator: ZSFH - Region: 99998 - Ingesting.... calls made: 21 sleep_sec: 0
False
  indicator_id region_id       date          value
0         ZSFH     99999 2023-01-31  442972.823112
1   

KeyboardInterrupt: 