In [9]:
from pyspark.sql import SparkSession, Row
import requests
import json
import time
from tqdm import tqdm
from pyspark.sql.types import DoubleType
from urllib3.util.retry import Retry
from pyspark.sql.functions import col, substring_index
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut, GeocoderServiceError
from requests.adapters import HTTPAdapter

In [10]:
def slice_dataframe(df, size: int):
    return df.limit(size)

In [11]:
def extract_and_load_data(INPUT_JSON: str, OVERPASS_URL: str, overpass_query: str):
    try:
        with open(INPUT_JSON) as f:
            data = json.load(f)
        print("Loaded cached data")
    except FileNotFoundError:
        print("Fetching fresh data...")
        response = requests.get(OVERPASS_URL, params={"data": overpass_query})
        data = response.json()
        with open(INPUT_JSON, 'w') as f:
            json.dump(data, f, indent=4)
        print("Saved new data")
    elements = data["elements"]
    shops_data = []
    for element in elements:
        shop = {
            "id": element.get("id", None),
            "lat": element.get("lat", None),
            "lon": element.get("lon", None),
            "name": element.get("tags", {}).get("name", None),
            "shop": element.get("tags", {}).get("shop", None),
        }
        shops_data.append(shop)
    return spark.createDataFrame(shops_data)

In [12]:
def clean_data(df):
    df = df.dropna(subset = ["name", "shop"])
    df = df.filter((col("lat").isNotNull()) & (col("lon").isNotNull()) &(col("lat").between(-90, 90)) & (col("lon").between(-180, 180))).withColumn("lat", col("lat").cast("double")).withColumn("lon", col("lon").cast("double"))
    return df

In [13]:
def clean_data_locations(df):
    df = df.dropna(subset = ["city", "state"])
    return df

In [14]:
def reverse_geocode(lat, lon, cache, last_req):
    key = f"{lat:.5f}, {lon:.5f}"
    if key in cache: return cache[key]
    if time.time() - last_req[0] < 1.1:
        time.sleep(1.1 - (time.time() - last_req[0]))
    try:
        geolocator = Nominatim(user_agent = "retail-analysis/2.0", timeout = 15)
        location = geolocator.reverse((lat, lon), zoom = 13, exactly_one = True)
        address = location.raw.get("address", {}) if location else {}
        city = address.get("city") or address.get("town") or address.get("village") or None
        state = address.get("state", None)
        result = (city[:15],state[:15]) if city and state else (city, state)
    except Exception as e:
        result = ("error",str(e)[:15])
    cache[key] = result
    last_req[0] = time.time()
    return result

In [15]:
def extract_location(df):
    cache = {}
    last_req = [time.time()]
    rows = df.collect()

    with tqdm(total = len(rows), desc = "Geocoding", bar_format = "{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") as pbar:
        results = []
        for row in rows:
            city, state = reverse_geocode(row.lat, row.lon, cache, last_req)
            results.append(Row(id = row.id, lat = row.lat, lon = row.lon, name = row.name, shop = row.shop, city = city, state = state))
            pbar.update(1)
            pbar.set_postfix_str(f"Cache: {len(cache)} Last: {city}")
        return spark.createDataFrame(results)

In [None]:
if __name__ == "__main__":
    spark = SparkSession.builder.appName("RetailDensityAnalysis").config("spark.sql.shuffle.partitions", "1").config("spark.default.parallelism", "1").getOrCreate()
    
    INPUT_JSON = "../dataset/overpass_data.json"
    OUTPUT_CSV = "../dataset/retail_density_results"
    AGG_OUTPUT_CSV = "../dataset/retail_density_agg_results"
    # OUTPUT_CSV_UTM = "./retail_density_results"
    # AGG_OUTPUT_CSV_UTM = "./retail_density_agg_results"
    OVERPASS_URL = "http://overpass-api.de/api/interpreter"
    DATASET_SIZE = 30000
    
    overpass_query = """
    [out:json];
    area["name"="India"]->.searchArea;
    (
        node["shop"](area.searchArea);
        way["shop"](area.searchArea);
        relation["shop"](area.searchArea);
    );
    out center;"""
    
    # load and extract data
    print("loading data...")
    df = extract_and_load_data(INPUT_JSON, OVERPASS_URL, overpass_query)

    # clean data
    print("Started cleaning processes...")
    clean_df = clean_data(df)
    clean_df = slice_dataframe(clean_df, DATASET_SIZE)

    # add location data
    print("Extracting Geolocation...")
    print(f"Processing {clean_df.count()} locations...")
    start = time.time()
    df_with_location = extract_location(clean_df)
    print(f"/nCompleted in {time.time() - start:.1f}s")

    print("Cleaning DataFrame of null locations...")
    clean_df_with_location = clean_data_locations(df_with_location).sort('city', 'name')

    print("aggregating data for analysis...")
    agg_df = clean_df_with_location.groupBy('city').count().sort('city')

    # write out the csv files
    print("Writing csv files...")
    
    print(f"Writing {OUTPUT_CSV}")
    clean_df_with_location.write.mode("overwrite").option("header",True).csv(OUTPUT_CSV) 
    
    # print(f"Writing {OUTPUT_CSV_UTM}")
    # clean_df_with_location.write.mode("overwrite").option("header",True).csv(OUTPUT_CSV_UTM)   
    
    print(f"Writing {AGG_OUTPUT_CSV}")
    agg_df.write.mode("overwrite").option("header",True).csv(AGG_OUTPUT_CSV)
    
    # print(f"Writing {AGG_OUTPUT_CSV_UTM}")
    # agg_df.write.mode("overwrite").option("header",True).csv(AGG_OUTPUT_CSV_UTM)

    spark.stop()

loading data...
Loaded cached data
Started cleaning processes...


25/03/05 20:27:58 WARN TaskSetManager: Stage 0 contains a task of very large size (6854 KiB). The maximum recommended task size is 1000 KiB.
25/03/05 20:28:03 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
                                                                                

+---------+----------+----------+--------------------+-----------+
|       id|       lat|       lon|                name|       shop|
+---------+----------+----------+--------------------+-----------+
|171171134|30.8806773|75.8474324|Jassal Department...|convenience|
|171277546|30.8982473|75.8194336|    Vishal Mega Mart|supermarket|
|171313618|30.8974153| 75.820184|  Ansal Plaza;Basant|supermarket|
|181316289|30.8687898|75.8838422| Ashok Karyana Store|convenience|
|181316582|30.8590486|75.8827218|       Grewal Market|       hifi|
|243970408|30.8358801|75.9703788|               Jhajj|      dairy|
|245606842| 12.444432|80.1089115|CINDURA'S Beauty ...|     beauty|
|245748968|26.4998975|83.7829787|              Deoria|video_games|
|246912870|30.8911208|75.8391201|     Satpal Di Hatti|convenience|
|246914014|30.8924466|75.8434116|     Satpal Di Hatti|convenience|
|252417084| 8.5265181|77.0364515|          Nava Yatra|     bakery|
|263575322|30.8656351|75.8723119|Robert Clinical L...|convenie