In [8]:
import logging
import pandas as pd
from sodapy import Socrata
from datetime import datetime
import pymongo
import expiringdict
import utils

# download data
client0 = Socrata("data.lacity.org", None)
results = client0.get("yru6-6re4",where="arst_date > '2019-11-30T00:00:00.000'") # wait to be confirmed
df = pd.DataFrame.from_records(results)
df['arst_date'] = pd.to_datetime(df['arst_date'])
#df['temp'] = df['time'].apply(lambda x: datetime.strptime(x, '%H%M').time())
df['age'] = pd.to_numeric(df['age'])

# connect with mongodb
client = pymongo.MongoClient()
logger = logging.Logger(__name__)
utils.setup_logger(logger, 'db.log')
RESULT_CACHE_EXPIRATION = 15 

def upsert_crime(df):
    """
    Update MongoDB database `energy` and collection `energy` with the given `DataFrame`.
    """
    db = client.get_database("crime")
    collection = db.get_collection("crime")
    update_count = 0
    for record in df.to_dict('records'):
        result = collection.replace_one(
            filter = {'rpt_id': record['rpt_id']},    # locate the document if exists
            replacement = record,                         # latest document
            upsert=True)
        if result.matched_count > 0:
            update_count += 1
    logger.info("rows={}, update={}, ".format(df.shape[0], update_count) +
                "insert={}".format(df.shape[0]-update_count))



In [19]:
def fetch_all_crime():
    db = client.get_database("crime")
    collection = db.get_collection("crime")
    ret = list(collection.find())
    logger.info(str(len(ret)) + ' documents read from the db')
    return ret


_fetch_all_crime_as_df_cache = expiringdict.ExpiringDict(max_len=1,
                                                       max_age_seconds=RESULT_CACHE_EXPIRATION)


In [22]:
def fetch_all_crime_as_df(allow_cached=False):
    """Converts list of dicts returned by `fetch_all_bpa` to DataFrame with ID removed
    Actual job is done in `_worker`. When `allow_cached`, attempt to retrieve timed cached from
    `_fetch_all_bpa_as_df_cache`; ignore cache and call `_work` if cache expires or `allow_cached`
    is False.
    """
    def _work():
        data = fetch_all_crime()
        if len(data) == 0:
            return None
        df = pd.DataFrame.from_records(data)
        df.drop('_id', axis=1, inplace=True)
        return df

    if allow_cached:
        try:
            return _fetch_all_crime_as_df_cache['cache']
        except KeyError:
            pass
    ret = _work()
    _fetch_all_crime_as_df_cache['cache'] = ret
    return ret


if __name__ == '__main__':
    print(fetch_all_crime_as_df())

2019-12-13 03:36:17,539 [fetch_all_crime]: 990 documents read from the db
        rpt_id  arst_date  time area    area_desc    rd  age sex_cd  \
0    190819078 2019-12-01  1300   08      West LA  0817   30      M   
1      5812713 2019-12-02  0930   15  N Hollywood  1523   54      M   
2    191120321 2019-12-03  1450   11    Northeast  1105   59      M   
3      5814534 2019-12-04  1140   03    Southwest  0311   21      M   
4      5792472 2019-12-05  1700   12  77th Street  1258   46      M   
..         ...        ...   ...  ...          ...   ...  ...    ...   
985    5816831 2019-12-07  0257   14      Pacific  1401   23      M   
986    5816592 2019-12-07  0230   01      Central  0191   24      M   
987    5816959 2019-12-07  1250   01      Central  0127   23      F   
988    5816911 2019-12-07  0830   04   Hollenbeck  0411   45      F   
989    5816871 2019-12-07  0650   01      Central  0119   31      M   

    descent_cd chrg_grp_cd          grp_description arst_typ_cd       cha