In [101]:
import requests
import json
import math
import datetime
import time
from pprint import pprint

from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


def getAdsData(params: str):
    """Get ad data from page"""
    # Search URL
    url = "https://www.donedeal.ie/search/api/v4/find/"

    # POST Request
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json",
    }

    adsData = requests.post(
        url, data=params, headers=headers, verify=False, allow_redirects=False
    )
    return adsData.json()


# Display available ad keys
# for key in ads_data[0].keys():
#     print(f"'{key}':{ads_data[0][key]}")

# def write_available_optionsfile(fout: str= "optional_keys.txt") -> None:
#     keys = {key:"False" for key in ads_data[0].keys()}
#     with open(fout, "w") as f:
#         f.write(json.dumps(keys))
# write_available_optionsfile("optional_keys2.txt")

# def load_optionsfile(fname: str= "optional_keys.txt") -> dict:
#     with open(fname) as f_in:
#         options = json.load(f_in)
#     options = {key:eval(value) for key,value in options.items()} #eval str s to True or False
#     return options

# options = load_optionsfile("optional_keys.txt")


options = {
    "id": True,
    "userId": False,
    "dealerId": False,
    "state": True,
    "age": True,
    "publishDate": True,
    "header": True,
    "currency": True,
    "price": True,
    "county": True,
    "section": False,
    "emailResponse": False,
    "phoneResponse": False,
    "wanted": False,
    "forSale": False,
    "c2b": False,
    "mediaCount": True,
    "friendlyUrl": True,
    "displayAttributes": True,
    "photos": False,
    "oldPriceView": False,
    "keyInfo": False,
    "imageAlt": False,
    "financeSummary": False,
    "deliveryAvailable": False,
    "herdwatchVerified": False,
    "bumpable": False,
    "spotlightable": False,
    "spotlight": False,
    "userSaved": False,
    "adAnalytics": False,
    "priceOnRequest": False,
    "dealer": True,
    "meritsV2": False,
    "greenlightVerified": False,
    "greenlightBadgeUrl": False,
}

selected_keys = [key for key, value in options.items() if value]


def simplify_display_attributes(att: list) -> dict:
    return {line["displayName"]: line["value"] for line in att}


# dist_attr = ads_data[0]['displayAttributes']
# pprint(dist_attr)
# pprint(simplify_display_attributes(dist_attr))


def simplify_dealer_attributes(att: dict) -> dict:
    """Cut down the number of entries and append dealer to each"""
    selected_keys = [
        "establishedYear",
        "franchiseCount",
        "franchiseType",
        "franchisesDisplay",
        "latitude",
        "longitude",
        "name",
        "totalAds",
    ]
    return {f"dealer_{k}": v for k, v in att.items() if k in selected_keys}


# dealer_attr = ads_data[0]['dealer']
# pprint(dealer_attr)
# pprint(simplify_dealer_attributes(dealer_attr))


def convert_price_toEUR(
    price: str, currency: str, GBP_ex_rate: float = 1.1311222
) -> float:
    """Convert price strings to euro integers"""
    priceNum = int(price.replace(",", ""))

    if currency == "EUR":
        return priceNum

    if currency == "GBP":
        priceNum = math.ceil(priceNum * GBP_ex_rate)
        return priceNum

    else:
        raise ValueError(f"Unknown Currency detected: {currency}")


# currencies = set([ad['currency'] for ad in clean_ad_data])
# print(currencies)
#
# ad = ads_data[0]
# print(f"Currency: {ad['currency']}")
# print(f"Price: {ad['price']}")
# ad['price_EUR'] = convert_price_toEUR(ad['price'], ad['currency'])
# print(f"Price: €{ad['price_EUR']}")


def convert_mileage_tokm(mileage_str: str) -> float:
    """Convert mileage strings to ints and convert to kilometres"""
    if len(mileage_str) == 0:
        return -1

    if mileage_str[:-2] == "mi":
        multiplier = 1.60934
    else:
        multiplier = 1

    mileage_str = mileage_str[:-3]

    kmVal = int(mileage_str.replace(",", ""))

    # If the user has given a value of 120 miles, they mean 120,000 miles
    # If mileage is over 1 million, divide by 10
    if kmVal < 1000:
        kmVal = kmVal * 1000
    elif kmVal > 1000000:
        kmVal = kmVal // 10

    return math.ceil((kmVal * multiplier) / 100) * 100


# mileage = clean_ad_data[0]['Mileage']
# pprint(mileage)
# print(convert_mileage_tokm(mileage))


def standardise_wsuffix(
    value_str: str, suffix_dict: dict, ret_type: type
) -> float | int:
    """Take a str, recognise the suffix, removing it and applying correct linear multiplier to scale to SI unit"""
    for key, scalar in suffix_dict.items():
        if value_str == "":
            return ret_type(0)
        if value_str.endswith(key):
            return ret_type(float(value_str[: -1 * len(key)]) * scalar)
            # if int(v) == v: return int(v) #check if whole number, 10.0 == 10
            # else: return v #else return float, 10.2 != 10
    raise ValueError(
        f"Suffix not recognised: value_str: '{value_str}', suffix_dict: {suffix_dict}, ret_type: {ret_type}"
    )


def clean_ad(ad: dict) -> dict:
    ad = {k: v for k, v in ad.items() if k in selected_keys}

    ad = ad | simplify_display_attributes(ad["displayAttributes"])
    ad.pop("displayAttributes", None)

    try:
        ad = ad | simplify_dealer_attributes(ad["dealer"])
        # ad.pop("dealer", None)
        ad["dealer"] = True
    except KeyError:
        ad["dealer"] = False

    try:
        ad["Price [EUR]"] = convert_price_toEUR(ad["price"], ad["currency"])
        ad.pop("price", None)
    except KeyError:
        ad["Price [EUR]"] = -1

    ad["Mileage [km]"] = convert_mileage_tokm(ad["Mileage"])
    ad.pop("Mileage", None)

    suffix_dicts = {
        "Power": {"suffix_dict": {"hp": 1}, "ret_type": int},
        "Engine Size (Litres)": {"suffix_dict": {" litre": 1}, "ret_type": int},
        "Acceleration (0-100 km/h)": {
            "suffix_dict": {" sec": 1},
            "ret_type": float,
        },
        "age": {
            "suffix_dict": {
                " day": 1,
                " days": 1,
                " hours": 1.0 / 24,
                " hour": 1.0 / 24,
                " mins": 1.0 / 3600,
                " min": 1.0 / 3600,
            },
            "ret_type": lambda x: round(x, 6),
        },
        "Battery Capacity": {"suffix_dict": {" kWh": 1}, "ret_type": float},
        "Battery Range": {"suffix_dict": {" km": 1}, "ret_type": int},
        "Battery Range (NEDC)": {"suffix_dict": {" km": 1}, "ret_type": int},
        "Battery Range (WLTP)": {"suffix_dict": {" km": 1}, "ret_type": int},
    }

    for val_name in suffix_dicts.keys():
        try:
            ad[val_name] = standardise_wsuffix(ad[val_name], **suffix_dicts[val_name])
        except KeyError:
            pass

    rename_dict = {
        "Power": "Power [hp]",
        "Engine Size (Litres)": "Engine Size [Litres]",
        "age": "Ad Age [days]",
        "Battery Capacity": "Battery Capacity [kWh]",
        "Battery Range (NEDC)": "Battery Range (NEDC) [km]",
        "Battery Range (WLTP)": "Battery Range (WLTP) [km]",
        "Battery Range": "Battery Range [km]"

    }

    for key, new_key in rename_dict.items():
        try:
            ad[new_key] = ad.pop(key)
        except KeyError:
            pass

    # tests = [
    # {'value_str': '190hp', **suffix_dicts['Power'], 'answer': 190},
    # {'value_str': '2.0 litre', **suffix_dicts['Engine Size (Litres)'], 'answer': 2.0}
    # ]

    # for test in tests:
    #     assert standardise_wsuffix(test['value_str'], test['suffix_dict'], test['ret_type']) == test['answer']
    # print(f"Passed {len(tests)} tests on {[test['value_str'] for test in tests]}")

    # tests = [
    #     {"val_name": "Power", "val": "190hp", "answer": 190},
    #     {"val_name": "Engine Size (Litres)", "val": "2.0 litre", "answer": 2.0},
    #     {"val_name": "Acceleration (0-100 km/h)", "val": "4.9 sec", "answer": 4.9},
    #     {"val_name": "age", "val": "339 days", "answer": round(339.0, 6)},
    #     {"val_name": "age", "val": "23 hours", "answer": round(23.0 / 24, 6)},
    #     {"val_name": "age", "val": "30 mins", "answer": round(30.0 / 3600, 6)},
    #     {"val_name": "Battery Capacity", "val": "8.8 kWh", "answer": 8.8},
    #     {"val_name": "Battery Range", "val": "50 km", "answer": 50},
    #     {"val_name": "Battery Range (NEDC)", "val": "50 km", "answer": 50},
    #     {"val_name": "Battery Range (WLTP)", "val": "50 km", "answer": 50},
    # ]

    # for test in tests:
    #     try:
    #         assert standardise_val(test["val_name"], test["val"]) == test["answer"]
    #     except AssertionError:
    #         print(f"Failed test: {test}")
    # newline_tab = "\n\t - "
    # print(
    #     f"Passed {len(tests)} tests on: {newline_tab}{newline_tab.join([test['val_name'] for test in tests])}"
    # )

    return ad


# params = gen_params_str()
# ads_data = getAdsData(params)["ads"]
# clean_ad_data = [clean_ad(ad) for ad in ads_data]


def remove_repeated_ids(ad_data):
    seen_ids = list()
    for i in range(len(ad_data) - 1, -1, -1):
        if ad_data[i]["id"] not in seen_ids:
            seen_ids.append(ad_data[i]["id"])
        else:
            # print(f"Deleting repeated id: {ad_data[i]['id']} at index: {i}")
            del ad_data[i]
    return ad_data


def robust_unique_id_check(ad_data, VERBOSE=True):
    unique_ids = set([ad["id"] for ad in ad_data])
    num_repeated_entries = len(ad_data) - len(unique_ids)

    if num_repeated_entries == 0:
        if VERBOSE:
            print("No repeated entries found")
        return ad_data

    if VERBOSE:
        print(
            f"{num_repeated_entries} repeated entries found of {len(ad_data)} entries pulled"
        )
    ad_data = remove_repeated_ids(ad_data)

    if len(set([ad["id"] for ad in ad_data])) == len(ad_data):
        if VERBOSE:
            print("Repeated ids removed. Remaining ids confirmed to be unique")
        return ad_data
    else:
        print(
            f"{len(ad_data) - len(set([ad['id'] for ad in ad_data]))} repeated ids remaining after cleaning"
        )
        return ad_data


def save_to_timestamped_json(data, fname: str) -> str:
    fout = f"{fname}_{datetime.datetime.now().strftime('%y%m%d-%H%M%S')}.json"
    with open(fout, "w") as fp:
        json.dump(data, fp)
    return fout


In [97]:
def gen_params_str(
    params: dict,
    start_index: int = 0,
    num_results: int = 30,
) -> str:
    """
    Params = {
    'mandatory': {'parentValue':'Audi', 'childValues':''},
    'optional' : {'price_from':0, 'price_to':10E9, 'year_from':'1900', 'year_to':'2024'}
    }
    """

    head_params = {
        "adType": "forsale",
        "max": start_index + num_results,
        "start": start_index,
        "section": "cars",
    }
    mandatory_params = {
        "parentName": "make",
        "parentValue": "Audi",
        "childName": "model",
        "childValues": [""],
    }
    optional_params = {
        "price_from": "",
        "price_to": "",
        "year_from": "2023",
        "year_to": "",
    }

    param_str = f"""{str(head_params)[:-1]}, "dependant":[{{{str(mandatory_params)[1:]}], {str(optional_params)[1:]}"""
    param_str = param_str.replace("'", '"').replace(" ", "")

    return param_str


# "{"adType":"forsale","max":3,"section":"cars","dependant":[{"parentName":"make","parentValue":"Audi","childName":"model","childValues":[""]}]}"


In [98]:
def collect_DoneDeal_Data():
    start_t = time.time()
    ad_data = list()
    start_index = 0
    num_results = 30
    i = 0
    raw_ad_data = list()

    while True:
        params = gen_params_str(
            dict(), start_index=start_index, num_results=num_results
        )
        # print(f"Param String: {params}")
        raw_ad_data.append(getAdsData(params)["ads"])
        if i > 0:
            if raw_ad_data[i] == raw_ad_data[i - 1]:
                print("EXIT: NewData is same as previous data")
                break

        ad_data = [*ad_data, *[clean_ad(ad) for ad in raw_ad_data[i]]]
        if len(raw_ad_data[i]) < 30:
            print("EXIT: Less than 30 items remaining indicating this is the last page")
            break
        elapsed_secs = round(time.time() - start_t, 1)
        print(
            f"{len(ad_data)} search results pulled. {elapsed_secs}s elapsed. {int(round(len(ad_data)/elapsed_secs,0))} results per second",
            end="\r",
            flush=True,
        )
        start_index += num_results
        i += 1
    unique_ad_data = robust_unique_id_check(ad_data, VERBOSE=True)
    return unique_ad_data


In [4]:
# TODO Generalise parameter generation, extract search settings from URL?
# TODO Proper handing of request response codes from DoneDeal Query
# TODO Develop update capability. IE when run check for the latest ad in the db and find any new ads since then. Ensuring to remove older duplicates
# TODO VAT + VRT Calculation for non IE registered Cars
# TODO NCT expiry to NCT months remaining calculation
# TODO Calculate model year from reg
# TODO Drop EURO NCAP Safety Rating column
# TODO Join dealer, and franchise type to become Private /Independent /Franchise
# TODO Remove zero entries from dealer established year
# TODO Add invalid data column to eliminate irregular information.


# DONE Clean up of Power entry
# DONE Clean up of Age entry
# DONE Clean up of Engine size
# DONE Drop old price entry
# DONE Clean up of Acceleration entry
# DONE Battery info cleanup
# DONE Consolidate conversions into function
# DONE Add units to each entry, days, road tax, power,


In [102]:
def main() -> None:
    ad_data = collect_DoneDeal_Data()
    print(f"Completed collecting data. {len(ad_data)} entries found")

    fname = save_to_timestamped_json(ad_data, "ad_data")
    print(f"Search results written to {fname}")

    # fname = 'ad_data_230408-213459.json'
    with open(fname, "r") as fp:
        ad_data = json.load(fp)

    print(
        f"Completed loading data. {len(ad_data)} entries found. Unique: {len(set([ad['id'] for ad in ad_data])) == len (ad_data)}"
    )


if __name__ == "__main__":
    main()


EXIT: Less than 30 items remaining indicating this is the last page
150 repeated entries found of 647 entries pulled
Repeated ids removed. Remaining ids confirmed to be unique
Completed collecting data. 497 entries found
Search results written to ad_data_230409-194857.json
Completed loading data. 497 entries found. Unique: True
