## 高德

In [None]:
import os
import json
import requests
import logging
logger = logging.getLogger("root")
logger.setLevel(logging.INFO)

GLOBAL_DIR = "global"
dict_title2coords_fp = os.path.join(GLOBAL_DIR, "TITLE2COORDS_DICT.json")
print("global dict of title to coords path: " + dict_title2coords_fp)
dict_coords2name_fp = os.path.join(GLOBAL_DIR, "COORDS2NAME_DICT.json")
print("global dict of coords to name path : " + dict_coords2name_fp)
dict_coordspair2duration_fp = os.path.join(GLOBAL_DIR, "COORDSPAIR2DURATION_DICT.json")
print("global dict of coords pair to distance path: " + dict_coordspair2duration_fp)

KEY = os.environ["GAODE_KEY"]
CITY = "北京" # specify city, otherwise search the whole country
TARGET_ADDRESS = "源创空间大厦"

TITLE2COORDS_DICT = COORDS2NAME_DICT = COORDSPAIR2DURATION_DICT = {}

def load_dict():
    global TITLE2COORDS_DICT, COORDS2NAME_DICT, COORDSPAIR2DURATION_DICT
    if os.path.exists(dict_title2coords_fp):
        TITLE2COORDS_DICT = json.load(open(dict_title2coords_fp))
    if os.path.exists(dict_coords2name_fp):
        COORDS2NAME_DICT = json.load(open(dict_coords2name_fp))
    if os.path.exists(dict_coordspair2duration_fp):
        COORDSPAIR2DURATION_DICT = json.load(open(dict_coordspair2duration_fp))
    print({"TITLE2COORDS_DICT": TITLE2COORDS_DICT, "COORDS2NAME_DICT": COORDS2NAME_DICT, "COORDSPAIR2DURATION_DICT": COORDSPAIR2DURATION_DICT})
    
def dump_dict():
    global TITLE2COORDS_DICT, COORDS2NAME_DICT, COORDSPAIR2DURATION_DICT
    json.dump(TITLE2COORDS_DICT, open(dict_title2coords_fp, "w"), ensure_ascii=False, indent=2)
    json.dump(COORDS2NAME_DICT, open(dict_coords2name_fp, "w"), ensure_ascii=False, indent=2)
    json.dump(COORDSPAIR2DURATION_DICT, open(dict_coordspair2duration_fp, "w"), ensure_ascii=False, indent=2)


# 编码
def get_addr_loc(address: str):
    # logging.info(f"getting address of <{address}>")
    if address not in TITLE2COORDS_DICT:
        logging.debug(f"fetching loc of {address} from gaode api")
        res = requests.get('https://restapi.amap.com/v3/geocode/geo', params={
            "key": KEY,
            "city": CITY,
            "address": address,
        })

        assert res.status_code == 200
        result = res.json()
        count = int(result.get('count', 0))
        if count == 0:
            logging.error("should have result")
            logging.error(result)
            TITLE2COORDS_DICT[address] = None
        else:
            if count > 1:
                logging.warning("more than 1 result, select the first")
            loc = result["geocodes"][0]["location"]
            TITLE2COORDS_DICT[address] = loc
        
    return TITLE2COORDS_DICT[address]

def get_pos_name_from_loc(loc: str):
    # logging.info(f"getting address of <{address}>")
    if not loc:
        return None
    if loc not in COORDS2NAME_DICT:
        logging.debug(f"fetching loc of {loc} from gaode api")
        res = requests.get('https://restapi.amap.com/v3/geocode/regeo', params={
            "key": KEY,
            "location": loc,
        })

        assert res.status_code == 200
        result = res.json()
        COORDS2NAME_DICT[loc] = result['regeocode']['formatted_address']
    return COORDS2NAME_DICT[loc]

# 步行
def calc_walking_dis(from_loc, to_loc):
    res = requests.get('https://restapi.amap.com/v3/direction/walking',
                   params={
                       "key": KEY,
                       "origin": from_loc,
                       "destination": to_loc,
                       "output": "json",
                   })
    result = res.json()
    count = int(result.get("count", 0))
    if count == 0:
        logging.warning("not found any walking solution")
        logging.warning(result)
        return -1
    return int(float(result["route"]["paths"][0]["duration"]) / 60) # minutes  

# 通勤计算
def calc_transit_dis_between_coords(from_coords, to_coords):
    if not to_coords:
        raise Exception("to loc must exist")
    if not from_coords:
        return None

    key = from_coords + "-" + to_coords
    
    if key not in COORDSPAIR2DURATION_DICT:
        res = requests.get('https://restapi.amap.com/v3/direction/transit/integrated',
                       params={
                           "origin": from_coords,
                           "destination": to_coords,
                           "output": "json",
                           "key": KEY,
                           "strategy": 3, # 0：最快捷模式, 1：最经济模式, 2：最少换乘模式, 3：最少步行模式, 5：不乘地铁模式
                           "extensions": "base",
                           "city": CITY
                       })
        result = res.json()
        count = int(result.get("count", 0))
        if count == 0:
            logging.info("not found any transit solution, trying walking ones")
            logging.info(result)
            COORDSPAIR2DURATION_DICT[key] = calc_walking_dis(from_coords, to_coords)
        else:
            COORDSPAIR2DURATION_DICT[key] = int(float(result["route"]["transits"][0]["duration"]) / 60) # minutes
    return COORDSPAIR2DURATION_DICT[key]    
        

# 通勤计算
def calc_transit_dis_from_poses(from_pos, to_pos):
    from_loc = get_addr_loc(from_pos)
    to_loc = get_addr_loc(to_pos)
    return calc_transit_dis_between_coords(from_loc, to_loc)

load_dict()

## analyze rents from douban

In [None]:
import os
import pandas as pd

DATA_FROM_DOUBAN_DIR = "data_from_douban"
filename = '2022-03-02-zhufang.csv'
filepath = os.path.join(DATA_FROM_DOUBAN_DIR, filename)
print("reading file from: " + filepath)
df = pd.read_csv(filepath)

# convert response_latest_time format into datetime
df.response_latest_time = pd.to_datetime(df.response_latest_time)

# filter datetime
df = df.query("'2022-02-25' < response_latest_time")

# filter personal
def filter_personal(s):
    return "女生" not in s and ("个人" in s or "直租" in s or "转租" in s)
df = df[df.post_title.apply(filter_personal)]
print("shape: ", df.shape)

# get loc from title
print("getting loc from title")
df['pos_loc'] = df.post_title.apply(get_addr_loc)
dump_dict()

# get pos from loc
print("getting pos name from loc")
df["pos_name"] = df.pos_loc.apply(get_pos_name_from_loc)
dump_dict()

# get dis from loc
print("getting distance from loc")
df["transit_minutes"] = df.pos_loc.apply(
    lambda x: calc_transit_dis_between_coords(x, get_addr_loc(TARGET_ADDRESS)))
dump_dict()

# filter distance
print("filter distance")
df = df.query("transit_minutes < 60")

# sort
print('sort')
df = df.sort_values(by=["transit_minutes"], ascending=True)

# dump
print("dump")
df.to_csv(filename.replace(".csv", "_filter.csv"), encoding="utf-8")
df

## analyze rents from some wechat group

In [None]:
import os
import re
import pandas as pd

DATA_FROM_WECHAT_DIR = "data_from_wechat"

files = [
    {
        "filename": "./北漂租房登记（表一）.xlsx",
        "column_contact": "联系方式",
        "column_budget": "价位",
        "column_area": "       区域"        
    },
    {
        "filename": "./北漂租房登记（表二）.xlsx",
        "column_contact": "联系方式",
        "column_budget": "预算",
        "column_area": "居室.1"
    }
]


file = files[0]

### read excel
filepath = os.path.join(DATA_FROM_WECHAT_DIR, file["filename"])
print("reading file from: " + filepath)
df_raw = pd.read_excel(filepath)

### drop specific nan

df = df_raw.dropna(subset=[file["column_contact"], file["column_budget"], file["column_area"]])
columns = list(filter(lambda x: not x.startswith("Unnamed"), df.columns))
df = df[columns]

### filter price

def find_price(s: str) -> int:
    s = re.search(r'\d+', str(s))
    return int(s.group()) if s else 0

df['price_base'] = df[file["column_budget"]].apply(find_price)
df = df.query("2000 <= price_base <= 3000")

### call gaode api and update distances

df2 = df.copy()
df2["work_minutes"] = df2[file["column_area"]].apply(
    lambda x: calc_transit_dis_from_poses(x, TARGET_ADDRESS))
dump_dict() # update global dict

### drop distance too far

df2 = df2.query("work_minutes < 60").copy()

### rank

df2['score'] = df2['price_base'] * df2['work_minutes']
df3 = df2
df3.sort_values(by=["score"], ascending=True, inplace=True)

### render
df3.style.background_gradient(cmap="Reds")

