In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType, StructField, NumericType, IntegerType, DoubleType, FloatType
from pyspark.sql.window import Window
from concurrent.futures import ThreadPoolExecutor

import time

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import calendar
from math import radians, sqrt, hypot, pi, sin, cos, asin, atan2
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from tqdm.notebook import tqdm
from collections import defaultdict

import tempfile, shutil, requests, warnings, logging, glob, json
from functools import reduce
from itertools import product

tqdm.pandas()

warnings.filterwarnings('ignore')

end = '2024-12-01'
end_date = datetime.strptime(end, '%Y-%m-%d')
start_date = end_date - relativedelta(years=2)
start_date = datetime(start_date.year, start_date.month, 1)
start = start_date.strftime('%Y-%m-%d')

state_names = [
    'NJ_NY_PA', 'CT_IL_TX', 'IN_MI_TN', 'AK_AL_AR_AZ_CA', 'CO_DE_DL_FL_GA_HI', 'IA_ID_KS_KY_LA_MA_MD', 
    'ME_MN_MO_MS_MT_NE_NV_NH_NM_NC_ND', 'OH_OK_OR_RI_SC', 'SD_UT_VT_VA_WA_WV_WI_WY'
]

states = [
    ['NJ', 'NY', 'PA'], ['CT', 'IL', 'TX'], ['IN', 'MI', 'TN'], ['AK', 'AL', 'AR', 'AZ', 'CA'], 
    ['CO', 'DE', 'DL', 'FL', 'GA', 'HI'], ['IA', 'ID', 'KS', 'KY', 'LA', 'MA', 'MD'], 
    ['ME', 'MN', 'MO', 'MS', 'MT', 'NE', 'NV', 'NH', 'NM', 'NC', 'ND'], 
    ['OH', 'OK', 'OR', 'RI', 'SC'], ['SD', 'UT', 'VT', 'VA', 'WA', 'WV', 'WI', 'WY']
]

################################################################################################
# azuremaps api for coordinates generation
################################################################################################
def azuremaps_api(location):
    endpoint = 'https://atlas.microsoft.com/search/address/json'
    api_key = '4t6tMJFedI3Dag3Ewu1oBYmpa3jqiKhlG12FUxe9Vns'
    params = {"subscription-key": api_key, "api-version": "1.0", "query": location, "countrySet": "US", "top": 1, "typeahead": True, "extendedPostalCodesFor": "Addr", "includeEntityTypes": "Address", "includeIso2": True}

    response = requests.get(endpoint, params)
    if response.status_code == 200 and len(response.json()["results"])>0:
        data = response.json()
        lat = data["results"][0]["position"]["lat"]
        lon = data["results"][0]["position"]["lon"]
        return dict(latitude=lat, longitude=lon)
    else:
        return dict(latitude=None, longitude=None)

################################################################################################
# haversine distance calculation
################################################################################################
def haversine_distance(lat1, lon1, lat2, lon2):
    lat1_rad, lon1_rad, lat2_rad, lon2_rad = map(radians, [lat1, lon1, lat2, lon2])
    d_lat = lat2_rad - lat1_rad
    d_lon = lon2_rad - lon1_rad
    a = sin(d_lat / 2) ** 2 + cos(lat1_rad) * cos(lat2_rad) * sin(d_lon / 2) ** 2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = 3958.8 * c  # Earth's radius in miles (approximate)
    return distance

################################################################################################
# Tomforth api
################################################################################################
def tomforth_api(lat, lon, radius):
    api_url = 'https://ringpopulationsapi.azurewebsites.net/api/globalringpopulations'
    params = {
        'latitude': lat,
        'longitude': lon,
        'distance_km': radius # integer only
    }
    response = requests.get(api_url, params)
    if response.status_code == 200:
        data = response.json()
        return pd.DataFrame(data)
    else:
        return None

################################################################################################
# Generate Coordinate for dealer registry
################################################################################################
def generate_dealer_coordinates(dealers, coordinates):
    dealers = pd.merge(dealers, coordinates, on=['DEALER_ADDRESS_FULL'], how='left')

    out=[]
    for i, row in dealers.iterrows():
        # print('6 -----   ',row['LATITUDE'])
        if str(row['LATITUDE'])=='nan':
            location_info = azuremaps_api(row['DEALER_ADDRESS_FULL'])
            # print('2 ------  ', location_info)
            if location_info['latitude'] is not None and location_info['longitude'] is not None:
                out.append({'DEALER_ADDRESS_FULL': row['DEALER_ADDRESS_FULL'], 'LATITUDE': location_info['latitude'], 'LONGITUDE': location_info['longitude']})
            else:
                out.append({'DEALER_ADDRESS_FULL': row['DEALER_ADDRESS_FULL'], 'LATITUDE': None, 'LONGITUDE': None})
        else:
            out.append({'DEALER_ADDRESS_FULL': row['DEALER_ADDRESS_FULL'], 'LATITUDE': row['LATITUDE'], 'LONGITUDE': row['LONGITUDE']})
    out = pd.DataFrame(out)
    print(f'{len(out)} - {len(coordinates)} = {len(out) - len(coordinates)} rows added. {len(out[out.isna().any(axis=1)])} null rows out of {len(out)}')

    return out

################################################################################################
# Generate Dealer Population
################################################################################################
def generate_dealer_population(dealers, coordinates, pops_df):
    # states = ['OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VA', 'VT', 'WA', 'WI', 'WV', 'WY']
    df = pd.merge(coordinates.dropna(), pops_df.drop_duplicates(subset='DEALER_ADDRESS_FULL'), on=['DEALER_ADDRESS_FULL'], how='left')
    df = pd.merge(df, dealers[['DEALER_ADDRESS_FULL', 'DEALER_STATE_ABBRV']], on=['DEALER_ADDRESS_FULL'], how='left')

    print(f'{len(df[df.isna().any(axis=1)])} null rows out of {len(df)} in population data')
    # df = df[df['DEALER_STATE_ABBRV'].isin(states)]
    print(f'{len(df[df.isna().any(axis=1)])} null rows out of {len(df)} in selected population data')

    new_pops_df = pd.DataFrame()
    for i, row in tqdm(df.iterrows(), total=len(df), desc='Processing'):
        if pd.isna(row['people']):
            try:
                pop_df = tomforth_api(lat=row['LATITUDE'], lon=row['LONGITUDE'], radius=161).drop(columns='name')
                pop_df['DEALER_ADDRESS_FULL'] = row['DEALER_ADDRESS_FULL']
                new_pops_df = pd.concat([new_pops_df, pop_df])
            except:
                continue
    upd_pops_df = pd.concat([pops_df, new_pops_df])
    print(f'{len(upd_pops_df)} - {len(pops_df)} = {len(upd_pops_df) - len(pops_df)} rows added. {len(upd_pops_df[upd_pops_df.isna().any(axis=1)])} null rows out of {len(upd_pops_df)}')
    return upd_pops_df

################################################################################################
# optimal radius calculation for a dealers
################################################################################################

def count_addresses_in_state_make(polk_df_n, selected_dealers):
    selected_dealers_spark = spark.createDataFrame(selected_dealers)
    # Join selected_dealers with polk_df_n on MAKE and DEALER_STATE_ABBRV
    joined_df = selected_dealers_spark.join(
        polk_df_n,
        (selected_dealers_spark['MAKE'] == polk_df_n['MAKE']) & 
        (selected_dealers_spark['DEALER_STATE_ABBRV'] == polk_df_n['DEALER_STATE_ABBRV']),
        how='inner'
    )
    # Group by the columns in selected_dealers and count the distinct DEALER_ADDRESS_FULL from polk_df_n
    count_df = joined_df.groupBy(
        selected_dealers_spark['DEALER_ADDRESS_FULL'],
        selected_dealers_spark['MAKE'],
        selected_dealers_spark['DEALER_STATE_ABBRV']
    ).agg(F.countDistinct(polk_df_n['DEALER_ADDRESS_FULL']).alias('distinct_count_in_polk_df'))

    # Convert to pandas and merge with the original selected_dealers DataFrame
    count_df_pd = count_df.toPandas()

    return count_df_pd

def dealer_to_dealer_distance(all_dealers, coordinates):
    # Create a DataFrame for dealers and join with coordinates
    base_df = spark.createDataFrame(all_dealers).select('DEALER_NAME', 'DEALER_ADDRESS_FULL', 'MAKE')
    base_df = base_df.join(coordinates.dropna(), on=['DEALER_ADDRESS_FULL'])
    
    # Create aliases for self join
    df1 = base_df.alias('df1')
    df2 = base_df.alias('df2')
    
    print('Applying Cross Join')
    # Cross join the dataframe with itself
    cross_df = df1.crossJoin(df2)
    
    # Filter out rows where addresses are the same or dealer names are the same,
    # and ensure that the MAKE is the same for both rows.
    filtered_df = cross_df.where(
        (F.col("df1.DEALER_NAME") != F.col("df2.DEALER_NAME")) &
        (F.col("df1.DEALER_ADDRESS_FULL") != F.col("df2.DEALER_ADDRESS_FULL")) &
        (F.col("df1.MAKE") == F.col("df2.MAKE"))
    )
    
    print('Calculating Distance')
    # Convert degrees to radians for both sets of coordinates
    distance_df = filtered_df.withColumn("lat1", F.radians(F.col("df1.LATITUDE"))) \
        .withColumn("lon1", F.radians(F.col("df1.LONGITUDE"))) \
        .withColumn("lat2", F.radians(F.col("df2.LATITUDE"))) \
        .withColumn("lon2", F.radians(F.col("df2.LONGITUDE")))
    
    # Earth's radius in kilometers (or use miles if preferred)
    # r = 6371.0

    # Earth's radius in miles (or use miles if preferred)
    r = 3963.1

    # Calculate haversine distance using Spark SQL functions
    distance_df = distance_df.withColumn(
        "DISTANCE",
        2 * F.lit(r) * F.asin(
            F.sqrt(
                F.pow(F.sin((F.col("lat2") - F.col("lat1")) / 2), 2) +
                F.cos(F.col("lat1")) * F.cos(F.col("lat2")) *
                F.pow(F.sin((F.col("lon2") - F.col("lon1")) / 2), 2)
            )
        )
    )
    
    # Select and rename the desired columns, ensuring DISTANCE is cast to double.
    result_df = distance_df.select(
        F.col("df1.DEALER_NAME").alias("DEALER_NAME"),
        F.col("df1.DEALER_ADDRESS_FULL").alias("DEALER_ADDRESS_FULL"),
        F.col("df1.MAKE").alias("MAKE"),
        F.col("df2.DEALER_NAME").alias("COMPARED_DEALER_NAME"),
        F.col("df2.DEALER_ADDRESS_FULL").alias("COMPARED_DEALER_ADDRESS_FULL"),
        F.col("df2.MAKE").alias("COMPARED_MAKE"),
        F.col("DISTANCE").cast("double")
    )
    
    # Optionally, join with the original dealer DataFrame to filter or enrich the data.
    final_df = result_df.join(
        base_df.select('DEALER_NAME', 'DEALER_ADDRESS_FULL', 'MAKE'),
        on=['DEALER_NAME', 'DEALER_ADDRESS_FULL', 'MAKE']
    )
    
    return final_df

def dealer_optimal_radius(polk_df, coordinates, state_pop, highlighted_dealer, pops_df):
    # Storing the latitude and longitude values of the selected dealer
    dealer_radius_factor = dict()
    for key, value in coordinates.filter(coordinates['DEALER_ADDRESS_FULL'] == highlighted_dealer['DEALER_ADDRESS_FULL']).drop('DEALER_ADDRESS_FULL').first().asDict().items():
        dealer_radius_factor[key] = value

    # Getting the list of full addresses of all the dealers who sells the brand same as our target dealer
    # brand_dealers = list(polk_df.filter((F.col('REG_TYPE')=='N') & (F.col('MAKE')==highlighted_dealer['MAKE'])).select("DEALER_ADDRESS_FULL").distinct().rdd.map(lambda row: row[0]).collect())

    # Count of dealers in the same state as our target dealer
    # state_brand_dealer_count = polk_df.filter((F.col('REG_TYPE')=='N') & (F.col('MAKE')==highlighted_dealer['MAKE']) & (F.col('DEALER_STATE_ABBRV')==highlighted_dealer['DEALER_STATE_ABBRV'])).select('DEALER_ADDRESS_FULL').distinct().count()
    state_brand_dealer_count = count_df_pd[count_df_pd['DEALER_ADDRESS_FULL'] == highlighted_dealer['DEALER_ADDRESS_FULL']]['distinct_count_in_polk_df'].iloc[0]

    # print('------------>>>>>>>>',highlighted_dealer['DEALER_ADDRESS_FULL'], state_brand_dealer_count)

    # Getting the state population for the state of target dealer and appending into the 'dealer_radius_factor' dictionary
    dealer_radius_factor['state_pop'] = state_pop[state_pop['state_code']==highlighted_dealer['DEALER_STATE_ABBRV']]['pop2022'].values[0]
    
    # Dealer's optimal population = State population * Factor / Brand Dealers in State
    for factor in [15]:
        radius_factor = dealer_radius_factor['state_pop'] * factor / state_brand_dealer_count
        
        if highlighted_dealer['DEALER_ADDRESS_FULL'] in list(pops_df['DEALER_ADDRESS_FULL']):
            pop_df = pops_df[pops_df['DEALER_ADDRESS_FULL']==highlighted_dealer['DEALER_ADDRESS_FULL']]
        else:
            pop_df = tomforth_api(lat=dealer_radius_factor['LATITUDE'], lon=dealer_radius_factor['LONGITUDE'], radius=161).drop(columns='name')
        # converting km to miles using 0.62137119
        pop_df['distance_mi'] = round(pop_df['distance'] * 0.62137119, 0)

        # the difference between actual population and optimal population gives the best place to be in
        # the best difference will be zero means optimal population is equal to actual population
        pop_df['difference'] = abs(pop_df['people'] - radius_factor)

        # display(pop_df)
        # finding the row where difference column of pop_df is minimum
        highlight = pop_df.loc[pop_df['difference'].idxmin()]

        dealer_radius_factor[f'optimal_radius_{factor}'] = highlight['distance_mi']
        dealer_radius_factor[f'population_{factor}'] = highlight['people']
        # print(dealer_radius_factor)
        # print(type(dealer_radius_factor[[f'optimal_radius_{factor}', f'population_{factor}']]))

    return dealer_radius_factor[f'optimal_radius_{factor}'], dealer_radius_factor[f'population_{factor}']

################################################################################################
# optimal radius calculation for all new dealers
################################################################################################
def calculate_optimal_radius(selected_dealers):
    print('Nunmber of New rows - ', selected_dealers.shape)
    out=[]
    for i, row in tqdm(selected_dealers.iterrows(), total=len(selected_dealers), desc='Processing'):
        highlighted_dealer = row.to_dict()
        highlighted_dealer['State'] = state_codes[state_codes['state_code']==highlighted_dealer['DEALER_STATE_ABBRV']]['State'].iloc[0]
        # print(type(highlighted_dealer))
        radius, population = dealer_optimal_radius(polk_df=polk_df, state_pop=pd.merge(pop2022, state_codes), highlighted_dealer=highlighted_dealer, coordinates=dealer_coordinates, pops_df=pops_df)

        highlighted_dealer['RADIUS'] = radius
        highlighted_dealer['POPULATION'] = population
        out.append(highlighted_dealer)

    optimal_radius_df = pd.DataFrame(out)
    if not optimal_radius_df.empty:
        optimal_radius_df = optimal_radius_df[['DEALER_ADDRESS_FULL', 'RADIUS', 'POPULATION']]
    return optimal_radius_df
    # optimal_radius_df_final = pd.concat([optimal_radius_df, oprad_all], axis=0)
    # return optimal_radius_df_final

def calculate_optimal_radius_new(selected_dealers, state_codes, count_df_pd, pop2022, polk_df, dealer_coordinates, pops_df):

    state_pop = pd.merge(pop2022, state_codes).rename(columns={'state_code':'DEALER_STATE_ABBRV'}).drop('State', axis=1)
    selected_dealers = selected_dealers.merge(state_codes.rename(columns={'state_code':'DEALER_STATE_ABBRV'}), on='DEALER_STATE_ABBRV')

    # Storing the latitude and longitude values of the selected dealer
    dealer_radius_factor = dealer_coordinates.toPandas().merge(selected_dealers, on=['DEALER_ADDRESS_FULL'], how='inner')
    dealer_radius_factor = dealer_radius_factor.merge(count_df_pd.drop_duplicates(subset=['DEALER_ADDRESS_FULL']), on=['DEALER_ADDRESS_FULL', 'DEALER_STATE_ABBRV'], how='inner')

    # Getting the state population for the state of target dealer and appending into the 'dealer_radius_factor' dictionary
    dealer_radius_factor = dealer_radius_factor.merge(state_pop, on=['DEALER_STATE_ABBRV'], how='inner')
    
    # Dealer's optimal population = State population * Factor / Brand Dealers in State
    factor = 15
    dealer_radius_factor['radius_factor'] = dealer_radius_factor['pop2022'] * factor / dealer_radius_factor['distinct_count_in_polk_df']

    old_add = set(pops_df['DEALER_ADDRESS_FULL'])
    def generate_pop_df(row, old_add):
        if row['DEALER_ADDRESS_FULL'] in old_add:
            pop_df = pops_df[pops_df['DEALER_ADDRESS_FULL']==row['DEALER_ADDRESS_FULL']]
        else:
            pop_df = tomforth_api(lat=row['LATITUDE'], lon=row['LONGITUDE'], radius=161).drop(columns='name')

        # converting km to miles using 0.62137119
        pop_df['distance_mi'] = round(pop_df['distance'] * 0.62137119, 0)

        # the difference between actual population and optimal population gives the best place to be in
        # the best difference will be zero means optimal population is equal to actual population
        pop_df['difference'] = abs(pop_df['people'] - row['radius_factor'])

        # finding the row where difference column of pop_df is minimum
        highlight = pop_df.loc[pop_df['difference'].idxmin()]

        return highlight['distance_mi'], highlight['people']

    dealer_radius_factor[['RADIUS', 'POPULATION']] = dealer_radius_factor.progress_apply(
        lambda row: generate_pop_df(row, old_add), axis=1, result_type='expand')

    optimal_radius_df = dealer_radius_factor[['DEALER_ADDRESS_FULL', 'RADIUS', 'POPULATION']]
    return optimal_radius_df


################################################################################################
# Tuning Optimal Radius, Local Dealers and Population
################################################################################################

def merge_and_get_dealers_count(optimal_radius_df_final1, d2d):

    # Merge optimal_radius_df_final1 with d2d on DEALER_NAME and DEALER_ADDRESS_FULL
    merged_df = optimal_radius_df_final1.merge(
        d2d, 
        on=["DEALER_NAME", "DEALER_ADDRESS_FULL"], 
        how="left")

    # Filter rows where DISTANCE is within Local Radius
    filtered_df = merged_df[merged_df["DISTANCE"] <= merged_df["Local Radius"]]

    # Remove duplicate (COMPARED_DEALER_ADDRESS_FULL, COMPARED_DEALER_NAME) pairs
    filtered_df = filtered_df.drop_duplicates(
        subset=["DEALER_NAME", "DEALER_ADDRESS_FULL", "COMPARED_DEALER_ADDRESS_FULL", "COMPARED_DEALER_NAME"])

    # Count unique pairs per dealer
    n_local_dealers = (
        filtered_df.groupby(["DEALER_NAME", "DEALER_ADDRESS_FULL"])
        .size()
        .reset_index(name="N Local Dealers"))

    # Merge the computed counts back to the original DataFrame
    optimal_radius_df_final1 = optimal_radius_df_final1.merge(
        n_local_dealers, 
        on=["DEALER_NAME", "DEALER_ADDRESS_FULL"], 
        how="left"
    ).fillna({"N Local Dealers": 0})  # Fill NaN values with 0 if no local dealers found

    return optimal_radius_df_final1

def fixing_more_than_30_local_dealers(hilo, d2d):
    hilo = optimal_radius_df_final1
    hilo_0 = hilo[(hilo['N Local Dealers']<=0) & (hilo['Local Radius']<=100)]
    hilo_30 = hilo[hilo['N Local Dealers']>30]
    # print(hilo_0.shape)
    # print(hilo_30.shape)

    d2d_30 = d2d.merge(hilo_30[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner')
    d2d_0 = d2d.merge(hilo_0[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner')
    # print(d2d_30.drop_duplicates(['DEALER_NAME', 'DEALER_ADDRESS_FULL']).shape)
    # print(d2d_0.drop_duplicates(['DEALER_NAME', 'DEALER_ADDRESS_FULL']).shape)


    # Sort values by DEALER_ADDRESS_FULL and DISTANCE
    df_sorted = d2d_30.sort_values(by=["DEALER_NAME", "DEALER_ADDRESS_FULL", "DISTANCE"])

    # Remove rows where DISTANCE > 100 before selecting the 30th row
    df_filtered = df_sorted[df_sorted["DISTANCE"] <= 100*1.60934]

    # Get the 30th row per group (nth index 29) if at least 30 rows exist, otherwise take the last available row
    df_30th = df_filtered.groupby(["DEALER_NAME", "DEALER_ADDRESS_FULL"]).nth(29).reset_index()

    # If a group has fewer than 30 rows, take the last available row
    df_last = df_filtered.groupby(["DEALER_NAME", "DEALER_ADDRESS_FULL"]).tail(1)  # Get the last row per group
    df_30th = pd.concat([df_30th, df_last]).drop_duplicates(["DEALER_NAME", "DEALER_ADDRESS_FULL"])
    print(df_30th.columns)

    # Count unique COMPARED_DEALER_ADDRESS_FULL within the selected DISTANCE ---------------------------

    # Merge df_30th with df_filtered on DEALER_NAME and DEALER_ADDRESS_FULL
    merged_df = df_30th[['DEALER_NAME', 'DEALER_ADDRESS_FULL', 'MAKE', 'DISTANCE']].merge(
        df_filtered, 
        on=["DEALER_NAME", "DEALER_ADDRESS_FULL"], 
        how="left"
    )

    # Filter rows where DISTANCE is within the allowed range
    filtered_df = merged_df[merged_df["DISTANCE_y"] <= merged_df["DISTANCE_x"]]

    # Remove duplicate (COMPARED_DEALER_ADDRESS_FULL, COMPARED_DEALER_NAME) pairs per dealer
    filtered_df = filtered_df.drop_duplicates(
        subset=["DEALER_NAME", "DEALER_ADDRESS_FULL", "COMPARED_DEALER_ADDRESS_FULL", "COMPARED_DEALER_NAME"]
    )

    # Count unique dealer pairs per (DEALER_NAME, DEALER_ADDRESS_FULL)
    dealer_counts = (
        filtered_df.groupby(["DEALER_NAME", "DEALER_ADDRESS_FULL"])
        .size()
        .reset_index(name="DEALER_COUNT")
    )

    # Merge the computed counts back to the original DataFrame
    df_30th = df_30th.merge(
        dealer_counts, 
        on=["DEALER_NAME", "DEALER_ADDRESS_FULL"], 
        how="left"
    ).fillna({"DEALER_COUNT": 0})  # Fill NaN values with 0 if no matches

    # ---------------------------------------------------------------------------------

    # Keep only relevant columns
    df_30th = df_30th[["DEALER_NAME", "DEALER_ADDRESS_FULL", "DISTANCE", "DEALER_COUNT"]]

    df_30th['DISTANCE'] = np.floor(df_30th['DISTANCE']).astype(int)


    # ---------------------------------------------------------------------------------

    # Sort values by DEALER_ADDRESS_FULL and DISTANCE
    df_sorted = d2d_0.sort_values(by=["DEALER_NAME", "DEALER_ADDRESS_FULL", "DISTANCE"])

    # Get the top 10 rows per group where DISTANCE ≤ 100 miles
    df_top10 = df_sorted[df_sorted["DISTANCE"] <= 100*1.60934].groupby(["DEALER_NAME", "DEALER_ADDRESS_FULL"]).head(10)

    # Compute the maximum distance within these top 10 rows
    df_0th = df_top10.groupby(["DEALER_NAME", "DEALER_ADDRESS_FULL"])["DISTANCE"].max().reset_index()

    # If a dealer has fewer than 10 valid distances, set DISTANCE to 100 miles
    df_0th["DISTANCE"] = df_0th["DISTANCE"].fillna(100*1.60934)

    # Count unique COMPARED_DEALER_ADDRESS_FULL within the selected DISTANCE ---------------------------

    # Merge df_30th with df_filtered on DEALER_NAME and DEALER_ADDRESS_FULL
    merged_df = df_0th[['DEALER_NAME', 'DEALER_ADDRESS_FULL', 'DISTANCE']].merge(
        df_sorted, 
        on=["DEALER_NAME", "DEALER_ADDRESS_FULL"], 
        how="left"
    )

    # Filter rows where DISTANCE is within the allowed range
    filtered_df = merged_df[merged_df["DISTANCE_y"] <= merged_df["DISTANCE_x"]]

    # Remove duplicate (COMPARED_DEALER_ADDRESS_FULL, COMPARED_DEALER_NAME) pairs per dealer
    filtered_df = filtered_df.drop_duplicates(
        subset=["DEALER_NAME", "DEALER_ADDRESS_FULL", "COMPARED_DEALER_ADDRESS_FULL", "COMPARED_DEALER_NAME"]
    )

    # Count unique dealer pairs per (DEALER_NAME, DEALER_ADDRESS_FULL)
    dealer_counts = (
        filtered_df.groupby(["DEALER_NAME", "DEALER_ADDRESS_FULL"])
        .size()
        .reset_index(name="DEALER_COUNT")
    )

    # Merge the computed counts back to the original DataFrame
    df_0th = df_0th.merge(
        dealer_counts, 
        on=["DEALER_NAME", "DEALER_ADDRESS_FULL"], 
        how="left"
    ).fillna({"DEALER_COUNT": 0})  # Fill NaN values with 0 if no matches

    # Keep only relevant columns
    df_0th = df_0th[["DEALER_NAME", "DEALER_ADDRESS_FULL", "DISTANCE", "DEALER_COUNT"]]

    df_0th['DISTANCE'] = np.ceil(df_0th['DISTANCE']).astype(int)

    # ---------------------------------------------------------------------------------

    # Find rows in df1 that are NOT in df2 based on columns 'DEALER_NAME', 'DEALER_ADDRESS_FULL'
    df_0th_100_add = df_sorted.merge(df_top10[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='left', indicator=True)
    df_0th_100_add = df_0th_100_add[df_0th_100_add['_merge'] == 'left_only'].drop(columns=['_merge'])

    df_0th_100 = df_0th_100_add.drop_duplicates(subset=['DEALER_NAME', 'DEALER_ADDRESS_FULL'])
    df_0th_100 = df_0th_100[['DEALER_NAME', 'DEALER_ADDRESS_FULL',]]

    df_0th_100['DISTANCE'] = 100*1.60934
    df_0th_100['DISTANCE'] = np.ceil(df_0th_100['DISTANCE']).astype(int)
    df_0th_100['DEALER_COUNT'] = 0

    # print(df_0th_100.columns, df_0th_100.shape)

    # ---------------------------------------------------------------------------------

    df_30th_0th = pd.concat([df_30th, df_0th, df_0th_100], axis=0)
    # print(df_30th_0th.info())

    df_30th_0th['DISTANCE'] = df_30th_0th['DISTANCE'].where(df_30th_0th['DISTANCE'] <= 100*1.60934, 100*1.60934)

    # Distance in KM
    # df_30th_0th['DISTANCE'] = df_30th_0th['DISTANCE']*1.60934
    df_30th_0th['DISTANCE'] = df_30th_0th['DISTANCE'].astype(int)

    df_30th_0th = df_30th_0th.merge(pops_df[['distance', 'people', 'DEALER_NAME', 'DEALER_ADDRESS_FULL']], left_on=['DEALER_NAME', 'DEALER_ADDRESS_FULL', 'DISTANCE'], right_on=['DEALER_NAME', 'DEALER_ADDRESS_FULL', 'distance'], how='left').sort_values(by='distance').drop(columns='distance')

    df_30th_0th = df_30th_0th.drop_duplicates()

    return df_30th_0th

def update_original_df(optimal_radius_df_final1, df_30th_0th):

    hilo_final = optimal_radius_df_final1.copy()

    hilo_final_1 = hilo_final.merge(df_30th_0th[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='left', indicator=True)
    hilo_final_1 = hilo_final_1[hilo_final_1['_merge'] == 'left_only'].drop(columns=['_merge'])
    # hilo_final_1.DEALER_ADDRESS_FULL.nunique()


    difference_df = df_30th_0th.merge(hilo_final[[
    'DEALER_NAME', 'DEALER_ADDRESS', 'DEALER_ADDRESS_FULL', 'MAKE',
    'Local Radius', 'Local Population', 'N Local Dealers']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner').drop_duplicates()

    difference_df.loc[:, ['Local Radius', 'N Local Dealers', 'Local Population']] = difference_df[['DISTANCE', 'DEALER_COUNT', 'people']].values

    difference_df.drop(columns=['DISTANCE', 'DEALER_COUNT', 'people'], inplace=True)

    hilo_final = pd.concat([hilo_final_1, difference_df], axis=0)
    hilo_final.drop_duplicates(['DEALER_NAME', 'DEALER_ADDRESS_FULL']).shape
    hilo_final = hilo_final.merge(dealer_registry[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner', indicator=True)
    hilo_final = hilo_final[hilo_final['_merge'] == 'both'].drop(columns=['_merge'])
    hilo_final.drop_duplicates(['DEALER_NAME', 'DEALER_ADDRESS_FULL']).shape

    return hilo_final

def population_constraint_fix(population_constraint, hilo_final, d2d, pops_df):

    hilo_final_1_10mil = hilo_final[hilo_final['Local Population']>population_constraint]
    hilo_final_1_10mil = hilo_final_1_10mil[['DEALER_NAME', 'DEALER_ADDRESS', 'DEALER_ADDRESS_FULL', 'Local Radius', 'N Local Dealers', 'Local Population']]
    print(hilo_final_1_10mil.info())

    pops_df_10mil = pops_df.merge(hilo_final_1_10mil[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner')
    pops_df_10mil = pops_df_10mil[(pops_df_10mil['distance']<100*1.60934) & (pops_df_10mil['people']<10000000)]
    pops_df_10mil = (
        pops_df_10mil.loc[pops_df_10mil.groupby(["DEALER_NAME", "DEALER_ADDRESS_FULL"])["people"].idxmax()]
        [["DEALER_NAME", "DEALER_ADDRESS_FULL", "distance", "people"]]
    )

    d2d_10mil = d2d.merge(pops_df_10mil[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner')

    # Merge the two DataFrames on DEALER_ADDRESS_FULL
    merged_df = pd.merge(d2d_10mil, pops_df_10mil, on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], suffixes=('_d2d', '_pops'))
    # Filter rows where the DISTANCE in d2d_10mil is less than or equal to the distance in pops_df_10mil
    filtered_df = merged_df[merged_df['DISTANCE'] <= merged_df['distance']]
    # Count the number of COMPARED_DEALER_ADDRESS_FULL for each DEALER_ADDRESS_FULL
    result = (
        filtered_df.groupby(['DEALER_NAME', 'DEALER_ADDRESS_FULL'])
        .agg({'COMPARED_DEALER_ADDRESS_FULL': 'count'})
        .rename(columns={'COMPARED_DEALER_ADDRESS_FULL': 'count_compared_dealers'})
    )
    result = result.reset_index()


    result2 = hilo_final_1_10mil.merge(result, on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='left', indicator=True)
    result2 = result2[result2['_merge'] == 'left_only'].drop(columns=['_merge'])
    result2 = result2[['DEALER_NAME', 'DEALER_ADDRESS_FULL']].drop_duplicates()
    result2['count_compared_dealers'] = 0

    result = pd.concat([result, result2], axis=0)
    result = result.sort_values(by='count_compared_dealers', ascending=False)
    result = result.drop_duplicates(subset=['DEALER_NAME', 'DEALER_ADDRESS_FULL'])


    pops_df_10mil = pops_df_10mil.merge(result, on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'])
    pops_df_10mil.rename(columns={'distance':'DISTANCE', 'count_compared_dealers':'DEALER_COUNT'}, inplace=True)

    hilo_final_1 = hilo_final.merge(pops_df_10mil[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='left', indicator=True)
    hilo_final_1 = hilo_final_1[hilo_final_1['_merge'] == 'left_only'].drop(columns=['_merge'])
    hilo_final_1.drop_duplicates(['DEALER_NAME', 'DEALER_ADDRESS_FULL']).shape

    difference_df = pops_df_10mil.merge(hilo_final[['DEALER_NAME', 'DEALER_ADDRESS', 'DEALER_ADDRESS_FULL', 'MAKE',
    'Local Radius', 'Local Population', 'N Local Dealers']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner').drop_duplicates()

    difference_df.loc[:, ['Local Radius', 'N Local Dealers', 'Local Population']] = difference_df[['DISTANCE', 'DEALER_COUNT', 'people']].values
    difference_df.drop(columns=['DISTANCE', 'DEALER_COUNT', 'people'], inplace=True)

    hilo_final = pd.concat([hilo_final_1, difference_df], axis=0)
    hilo_final.drop_duplicates(['DEALER_NAME', 'DEALER_ADDRESS_FULL']).shape

    return hilo_final

def low_values_fix(h4, d2d, hilo_final):

    # print(h4.drop_duplicates(['DEALER_NAME', 'DEALER_ADDRESS_FULL']).shape)

    d2d_low = d2d.merge(h4[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner')
    d2d_low = d2d_low.drop_duplicates()
    # Sort values by DEALER_ADDRESS_FULL and DISTANCE
    df_sorted = d2d_low.sort_values(by=["DEALER_ADDRESS_FULL", "DISTANCE"])
    # Get the top 10 rows per group where DISTANCE ≤ 100 miles
    df_top10 = df_sorted[df_sorted["DISTANCE"] <= 100*1.60934].groupby(["DEALER_NAME","DEALER_ADDRESS_FULL"]).head(10)
    # Compute the maximum distance within these top 10 rows
    df_low = df_top10.groupby(["DEALER_NAME", "DEALER_ADDRESS_FULL"])["DISTANCE"].max().reset_index()


    # Fill NaN DISTANCE with 100 miles converted to km
    df_low["DISTANCE"] = df_low["DISTANCE"].fillna(100 * 1.60934)
    # Drop duplicate dealer comparisons once, before filtering
    df_unique = df_sorted.drop_duplicates(subset=["DEALER_NAME", "DEALER_ADDRESS_FULL", "COMPARED_DEALER_ADDRESS_FULL", "COMPARED_DEALER_NAME"])
    # Filter df_sorted in advance for valid distances
    # df_filtered = df_unique[df_unique["DISTANCE"] <= df_low["DISTANCE"].max()]
    df_merged = df_unique.merge(df_low[['DEALER_NAME', 'DEALER_ADDRESS_FULL', 'DISTANCE']], 
                                on=["DEALER_NAME", "DEALER_ADDRESS_FULL"], how="left")
    # Filter based on the dealer-specific DISTANCE condition
    df_filtered = df_merged[df_merged["DISTANCE_x"] <= df_merged["DISTANCE_y"]]
    # Count unique COMPARED_DEALERS per (DEALER_NAME, DEALER_ADDRESS_FULL)
    df_counts = df_filtered.groupby(["DEALER_NAME", "DEALER_ADDRESS_FULL"]).size().reset_index(name="DEALER_COUNT")

    # Merge precomputed counts with df_low
    df_low = df_low.merge(df_counts, on=["DEALER_NAME", "DEALER_ADDRESS_FULL"], how="left").fillna({"DEALER_COUNT": 0})
    # Round up DISTANCE to the nearest integer
    df_low["DISTANCE"] = np.ceil(df_low["DISTANCE"]).astype(int)
    # Keep only relevant columns
    df_low = df_low[["DEALER_NAME", "DEALER_ADDRESS_FULL", "DISTANCE", "DEALER_COUNT"]]
    # assigning max distance = 100 miles and count of dealers = 0, to the dealers left out due to <100 miles condition

    df_sorted_1 = df_sorted[['DEALER_NAME', 'DEALER_ADDRESS_FULL']].drop_duplicates()
    df_top10_1 = df_top10[['DEALER_NAME', 'DEALER_ADDRESS_FULL']].drop_duplicates()

    df_low1 = df_sorted_1.merge(df_top10_1, on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='left', indicator=True)
    df_low1 = df_low1[df_low1['_merge']=='left_only'].drop(columns=['_merge'])
    df_low1['DISTANCE'] = 100*1.60934
    df_low1['DISTANCE'] = np.ceil(df_low1['DISTANCE']).astype(int)
    df_low1['DEALER_COUNT'] = 0

    # concating all
    df_low = pd.concat([df_low, df_low1], axis=0)

    df_low = df_low.merge(pops_df[['distance', 'people', 'DEALER_NAME', 'DEALER_ADDRESS_FULL']], left_on=['DEALER_NAME', 'DEALER_ADDRESS_FULL', 'DISTANCE'], right_on=['DEALER_NAME', 'DEALER_ADDRESS_FULL', 'distance'], how='left').sort_values(by='distance').drop(columns='distance')

    # ---------------------------------------------------------------------------------

    hilo_final = hilo_final

    hilo_final_1 = hilo_final.merge(df_low[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='left', indicator=True)

    hilo_final_1 = hilo_final_1[hilo_final_1['_merge'] == 'left_only'].drop(columns=['_merge'])

    hilo_final_1.DEALER_ADDRESS_FULL.nunique()

    difference_df = df_low.merge(hilo_final[['DEALER_NAME', 'DEALER_ADDRESS', 'DEALER_ADDRESS_FULL', 'MAKE',
    'Local Radius', 'Local Population', 'N Local Dealers']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner').drop_duplicates()

    difference_df.loc[:, ['Local Radius', 'N Local Dealers', 'Local Population']] = difference_df[['DISTANCE', 'DEALER_COUNT', 'people']].values
    difference_df.drop(columns=['DISTANCE', 'DEALER_COUNT', 'people'], inplace=True)

    hilo_final = pd.concat([hilo_final_1, difference_df], axis=0)
    hilo_final.DEALER_ADDRESS_FULL.nunique()
    hilo_final.describe()

    return hilo_final

def population_constraint_fix_2(population_constraint, hilo_final, d2d, pops_df):

    hilo_final_1_10mil = hilo_final[hilo_final['Local Population']>population_constraint]
    hilo_final_1_10mil = hilo_final_1_10mil[['DEALER_NAME', 'DEALER_ADDRESS', 'DEALER_ADDRESS_FULL', 'Local Radius', 'N Local Dealers', 'Local Population']]
    # print(hilo_final_1_10mil.info())

    pops_df_10mil = pops_df.merge(hilo_final_1_10mil[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner')
    pops_df_10mil = pops_df_10mil[(pops_df_10mil['distance']<100*1.60934) & (pops_df_10mil['people']<10000000)]
    pops_df_10mil = (
        pops_df_10mil.loc[pops_df_10mil.groupby(["DEALER_NAME", "DEALER_ADDRESS_FULL"])["people"].idxmax()]
        [["DEALER_NAME", "DEALER_ADDRESS_FULL", "distance", "people"]]
    )

    d2d_10mil = d2d.merge(pops_df_10mil[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner')
    # Merge the two DataFrames on DEALER_ADDRESS_FULL
    merged_df = pd.merge(d2d_10mil, pops_df_10mil, on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], suffixes=('_d2d', '_pops'))
    # Filter rows where the DISTANCE in d2d_10mil is less than or equal to the distance in pops_df_10mil
    filtered_df = merged_df[merged_df['DISTANCE'] <= merged_df['distance']]
    # Count the number of COMPARED_DEALER_ADDRESS_FULL for each DEALER_ADDRESS_FULL
    result = (
        filtered_df.groupby(['DEALER_NAME', 'DEALER_ADDRESS_FULL'])
        .agg({'COMPARED_DEALER_ADDRESS_FULL': 'count'})
        .rename(columns={'COMPARED_DEALER_ADDRESS_FULL': 'count_compared_dealers'})
    )
    result = result.reset_index()

    result2 = hilo_final_1_10mil.merge(result, on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='left', indicator=True)
    result2 = result2[result2['_merge'] == 'left_only'].drop(columns=['_merge'])
    result2 = result2[['DEALER_NAME', 'DEALER_ADDRESS_FULL']].drop_duplicates()
    result2['count_compared_dealers'] = 0
    result = pd.concat([result, result2], axis=0)
    result = result.sort_values(by='count_compared_dealers', ascending=False)
    result = result.drop_duplicates(subset=['DEALER_NAME', 'DEALER_ADDRESS_FULL'])
    result.info()

    pops_df_10mil = pops_df_10mil.merge(result, on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'])
    pops_df_10mil.rename(columns={'distance':'DISTANCE', 'count_compared_dealers':'DEALER_COUNT'}, inplace=True)

    hilo_final_1 = hilo_final.merge(pops_df_10mil[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='left', indicator=True)
    hilo_final_1 = hilo_final_1[hilo_final_1['_merge'] == 'left_only'].drop(columns=['_merge'])
    hilo_final_1.drop_duplicates(['DEALER_NAME', 'DEALER_ADDRESS_FULL']).shape

    difference_df = pops_df_10mil.merge(hilo_final[['DEALER_NAME', 'DEALER_ADDRESS', 'DEALER_ADDRESS_FULL', 'MAKE',
    'Local Radius', 'Local Population', 'N Local Dealers']], on=['DEALER_NAME', 'DEALER_ADDRESS_FULL'], how='inner').drop_duplicates()

    difference_df.loc[:, ['Local Radius', 'N Local Dealers', 'Local Population']] = difference_df[['DISTANCE', 'DEALER_COUNT', 'people']].values
    difference_df.drop(columns=['DISTANCE', 'DEALER_COUNT', 'people'], inplace=True)

    hilo_final = pd.concat([hilo_final_1, difference_df], axis=0)
    hilo_final.drop_duplicates(['DEALER_NAME', 'DEALER_ADDRESS_FULL']).shape

    return hilo_final


################################################################################################
# Dealer Data Generation [Location Factors]
################################################################################################
def dealer_location_factors(all_dealers, dealers, coordinates, density, pops_df):
    testing = False # True | False
    test_address = '1001 S WHITE  ST, ATHENS, MC MINN, TN'
    test_make = 'RAM'
    
    df = spark.createDataFrame(all_dealers).select('DEALER_ADDRESS_FULL', 'MAKE').join(
        coordinates.dropna(), on=['DEALER_ADDRESS_FULL']
    )
    haversine_udf = F.udf(haversine_distance)

    print('Applying Cross Join')
    distance_df = df.alias('df1').crossJoin(df.alias('df2'))
    distance_df = distance_df.where("df1.DEALER_ADDRESS_FULL != df2.DEALER_ADDRESS_FULL")
    
    print('Calculating Distance')
    distance_df = distance_df.withColumn('DISTANCE', haversine_udf('df1.LATITUDE', 'df1.LONGITUDE', 'df2.LATITUDE', 'df2.LONGITUDE'))
    distance_df = distance_df.select(
        F.col('df1.DEALER_ADDRESS_FULL').alias('DEALER_ADDRESS_FULL'), 
        F.col('df1.MAKE').alias('MAKE'), 
        F.col('df2.DEALER_ADDRESS_FULL').alias('COMPARED_DEALER_ADDRESS_FULL'), 
        F.col('df2.MAKE').alias('COMPARED_MAKE'), 'DISTANCE'
    ).withColumn('DISTANCE', F.col('DISTANCE').cast('double'))
    df = distance_df.join(
        spark.createDataFrame(dealers).select('DEALER_ADDRESS_FULL', 'MAKE'), 
        on=['DEALER_ADDRESS_FULL', 'MAKE']
    )
    if testing:
        df = df.filter(F.col('DEALER_ADDRESS_FULL') == test_address)
        print(f'Dealer Data')
        display(df)
        print(f'Dealer Data - {test_make} Brand')
        display(df.filter(F.col('COMPARED_MAKE') == test_make))

    print('Calculating Distance Factor from population')
    pops_df = pops_df.join(
        spark.createDataFrame(dealers).select('DEALER_ADDRESS_FULL', 'MAKE', 'DEALER_STATE_ABBRV'), 
        on=['DEALER_ADDRESS_FULL']
    ).withColumnRenamed('DEALER_STATE_ABBRV', 'state_code')
    density = density.withColumn('Factor', F.col('pop2022') * 15 / F.col('n_dealers'))
    pops_df = pops_df.withColumn('distance_mi', F.round(F.col('distance') * 0.62137119, 0))

    pops_df = pops_df.select('MAKE', 'state_code', 'DEALER_ADDRESS_FULL', 'distance_mi', 'people').join(density.select('MAKE', 'state_code', 'Factor'), on=['MAKE', 'state_code'], how='left')
    
    local_pops_df = pops_df.filter(F.col('people') >= F.col('Factor')).orderBy(F.asc('distance_mi'))
    local_pops_df = local_pops_df.dropDuplicates(subset=['DEALER_ADDRESS_FULL']).select('DEALER_ADDRESS_FULL', 'MAKE', 'distance_mi', 'people')
    print('Done')
    if testing:
        print('Dealer Population Data - Local')
        display(local_pops_df.filter(F.col('DEALER_ADDRESS_FULL') == test_address))
        print('Dealer Population Data')
        display(pops_df.filter(F.col('DEALER_ADDRESS_FULL') == test_address))

    local_df = df.join(local_pops_df, on=['DEALER_ADDRESS_FULL', 'MAKE'])
    if testing:
        print(f'Dealer Data <-> Dealer Population Data - Local - {test_make} Brand')
        display(local_df.filter((F.col('COMPARED_MAKE') == test_make)).orderBy(F.asc('DISTANCE')))
        print(f'Dealer Data - {test_make} Brand')
        display(df.filter((F.col('COMPARED_MAKE') == test_make)))
        display(df.select('DEALER_ADDRESS_FULL').distinct())

    cols = []
    radius_range = [1, 2, 3, 4, 5, 7, 10, 15, 20, 30, 40, 50, 75, 100]
    
    # All Brands Calculations
    print('Calculating for All Brands')
    data = df
    out = df.select('DEALER_ADDRESS_FULL').distinct()
    print('out.count()', out.count())
    for radius in radius_range:
        col = f'ALL_BRAND_DEALERS_{radius}'
        out = out.join(
            data.filter(F.col('DISTANCE') <= radius).groupBy('DEALER_ADDRESS_FULL').agg(F.collect_list('COMPARED_DEALER_ADDRESS_FULL').alias(col)), 
            on=['DEALER_ADDRESS_FULL'], how='left'
        )
        cols.append(col)

    data = local_df
    out = out.join(
        data.filter(F.col('DISTANCE') <= F.col('distance_mi')).groupBy('DEALER_ADDRESS_FULL').agg(
            F.collect_list('COMPARED_DEALER_ADDRESS_FULL').alias('ALL_BRAND_DEALERS')
        ), on=['DEALER_ADDRESS_FULL'], how='left'
    )
    cols.append('ALL_BRAND_DEALERS')

    # Similar Brands Calculations
    print('Calculating for Similar Brands')
    sections = {
        'Luxury': ['PORSCHE', 'MERCEDES-BENZ', 'LEXUS', 'BMW', 'AUDI', 'JAGUAR', 'LAND ROVER', 'VOLVO', 'ACURA', 'CADILLAC', 'LINCOLN', 'INFINITI', 'GENESIS', 'ALFA ROMEO', 'MASERATI'], 
        'Domestic': ['CHEVROLET', 'FORD', 'BUICK', 'GMC', 'CHRYSLER', 'DODGE', 'JEEP', 'RAM'],
        'Import': ['TOYOTA', 'HONDA', 'SUBARU', 'HYUNDAI', 'KIA', 'VOLKSWAGEN', 'MAZDA', 'NISSAN', 'MITSUBISHI', 'MINI', 'FIAT' ,'SMARTCAR'],
    }
    data = local_df.filter(
        (F.col('MAKE').isin(sections['Luxury']) & F.col('COMPARED_MAKE').isin(sections['Luxury'])) | 
        (F.col('MAKE').isin(sections['Domestic']) & F.col('COMPARED_MAKE').isin(sections['Domestic'])) | 
        (F.col('MAKE').isin(sections['Import']) & F.col('COMPARED_MAKE').isin(sections['Import']))
    )

    out = out.join(
        data.filter(F.col('DISTANCE') <= F.col('distance_mi')).groupBy('DEALER_ADDRESS_FULL').agg(
            F.collect_list('COMPARED_DEALER_ADDRESS_FULL').alias('SIMILAR_BRAND_DEALERS')
        ), on=['DEALER_ADDRESS_FULL'], how='left'
    )
    cols.append('SIMILAR_BRAND_DEALERS')
        
    data = df.filter(
        (F.col('MAKE').isin(sections['Luxury']) & F.col('COMPARED_MAKE').isin(sections['Luxury'])) | 
        (F.col('MAKE').isin(sections['Domestic']) & F.col('COMPARED_MAKE').isin(sections['Domestic'])) | 
        (F.col('MAKE').isin(sections['Import']) & F.col('COMPARED_MAKE').isin(sections['Import']))
    )
    for radius in radius_range:
        col = f'SIMILAR_BRAND_DEALERS_{radius}'
        out = out.join(data.filter(F.col('DISTANCE') <= radius).groupBy('DEALER_ADDRESS_FULL').agg(F.collect_list('COMPARED_DEALER_ADDRESS_FULL').alias(col)), on=['DEALER_ADDRESS_FULL'], how='left')
        cols.append(col)

    # Same Brand Calculations
    print('Calculating for Same Brands')
    data = local_df.filter(F.col('MAKE') == F.col('COMPARED_MAKE'))
    
    out = out.join(data.filter(F.col('DISTANCE') <= F.col('distance_mi')).groupBy('DEALER_ADDRESS_FULL').agg(F.collect_list('COMPARED_DEALER_ADDRESS_FULL').alias('BRAND_DEALERS')), on=['DEALER_ADDRESS_FULL'], how='left')
    cols.append('BRAND_DEALERS')

    data = df.filter(F.col('MAKE') == F.col('COMPARED_MAKE'))
    if testing:
        display(data.filter(F.col('DISTANCE')<=7))
    for radius in radius_range:
        col = f'BRAND_DEALERS_{radius}'
        out = out.join(data.filter(F.col('DISTANCE') <= radius).groupBy('DEALER_ADDRESS_FULL').agg(F.collect_list('COMPARED_DEALER_ADDRESS_FULL').alias(col)), on=['DEALER_ADDRESS_FULL'], how='left')
        cols.append(col)

    out = out.join(
        local_pops_df.select('DEALER_ADDRESS_FULL', 'distance_mi', 'people'), on=['DEALER_ADDRESS_FULL'], how='left'
    ).withColumnRenamed('distance_mi', 'RADIUS').withColumnRenamed('people', 'POPULATION')
    
    stack_expr = ", ".join([f"'{col}', {col}" for col in cols])
    out = out.selectExpr(
        'DEALER_ADDRESS_FULL', 'RADIUS', 'POPULATION', 
        f"stack({len(cols)}, {stack_expr}) as (Factor, List)"
    )
    out = out.groupBy('DEALER_ADDRESS_FULL', 'RADIUS', 'POPULATION').agg(
        F.map_from_entries(F.collect_list(F.struct("Factor", "List"))).alias("Factors")
    ).select('DEALER_ADDRESS_FULL', 'RADIUS', 'POPULATION', 'Factors')
    return out


In [0]:
end_date = datetime.strptime(end, '%Y-%m-%d')
start_date = end_date - relativedelta(years=2)
start_date = datetime(start_date.year, start_date.month, 1)

dealer_registry = pd.read_parquet(f'/dbfs/jump-datasets/pipeline_data/base_files/dealers_{start_date.strftime("%y%m")}_{end_date.strftime("%y%m")}.parquet')

# dealer_coordinates = pd.read_parquet('/dbfs/jump-datasets/files/polk/all_new_dealers_coordinates.parquet')
dealer_coordinates = pd.read_parquet('/dbfs/jump-datasets/pipeline_data/radius_calc/all_new_dealers_coordinates.parquet')
dealer_coordinates = dealer_coordinates.drop_duplicates()

out = generate_dealer_coordinates(dealer_registry, dealer_coordinates)
out = out.drop_duplicates()

savepath = f'/dbfs/jump-datasets/pipeline_data/radius_calc/all_new_dealers_coordinates.parquet'
out.to_parquet(savepath)
print(savepath)

In [0]:
end_date = datetime.strptime(end, '%Y-%m-%d')
start_date = end_date - relativedelta(years=2)
start_date = datetime(start_date.year, start_date.month, 1)

dealer_registry = pd.read_parquet(f'/dbfs/jump-datasets/pipeline_data/base_files/dealers_{start_date.strftime("%y%m")}_{end_date.strftime("%y%m")}.parquet')
dealer_coordinates = pd.read_parquet('/dbfs/jump-datasets/pipeline_data/radius_calc/all_new_dealers_coordinates.parquet')

# pops_df = pd.read_parquet('/dbfs/jump-datasets/files/other/tomforth_data_2311.parquet')
pops_df = pd.read_parquet(f'/dbfs/jump-datasets/pipeline_data/radius_calc/tomforth_data_{(end_date - relativedelta(months=1)).strftime("%y%m")}.parquet')

pops_df = generate_dealer_population(dealer_registry, dealer_coordinates, pops_df)

savepath = f'/dbfs/jump-datasets/pipeline_data/radius_calc/tomforth_data_{end_date.strftime("%y%m")}.parquet'
pops_df.to_parquet(savepath, index=False)
print(savepath)

In [0]:
end_date = datetime.strptime(end, '%Y-%m-%d')
start_date = end_date - relativedelta(years=2)
start_date = datetime(start_date.year, start_date.month, 1)

################################################################################################
state_codes = pd.read_excel('/dbfs/jump-datasets/files/other/US state codes.xlsx')
dealer_coordinates = spark.read.parquet('dbfs:/jump-datasets/pipeline_data/radius_calc/all_new_dealers_coordinates.parquet')
pop2022 = pd.read_excel("/dbfs/jump-datasets/files/other/POP2011-2023.xlsx")[['State', 'pop2022']]
pops_df = pd.read_parquet(f'/dbfs/jump-datasets/pipeline_data/radius_calc/tomforth_data_{end_date.strftime("%y%m")}.parquet')
selected_dealers = pd.read_parquet(f'/dbfs/jump-datasets/pipeline_data/base_files/dealers_{start_date.strftime("%y%m")}_{end_date.strftime("%y%m")}.parquet')
selected_dealers = selected_dealers[selected_dealers['DEALER_STATE_ABBRV']!='PR']

polk_df = spark.read.parquet(f'dbfs:/jump-datasets/datasets/polk_2019-01-01_{end}.parquet')
polk_df = polk_df.filter(F.col('REPORT_YEAR_MONTH')<=end)

################################################################################################
## Count of nearby dealers for same state and make
################################################################################################
polk_df_n = polk_df.filter(F.col('REG_TYPE')=='N')
count_df_pd = count_addresses_in_state_make(polk_df_n, selected_dealers)

# Record the start time
start_time = time.time()

optimal_radius_df_final = calculate_optimal_radius_new(selected_dealers, state_codes, count_df_pd, pop2022, polk_df, dealer_coordinates, pops_df)

optimal_radius_df_final1 = optimal_radius_df_final.copy()
print(optimal_radius_df_final1.info())

end_time = time.time()
time_spent = end_time - start_time

print(time_spent)

savepath = f'/dbfs/jump-datasets/pipeline_data/radius_calc/optimal_radius_df_{end_date.strftime("%y%m")}.parquet'

optimal_radius_df_final1.to_parquet(savepath)

In [0]:
all_dealers = pd.read_parquet(
    f'/dbfs/jump-datasets/pipeline_data/base_files/dealers_{start_date.strftime("%y%m")}_{end_date.strftime("%y%m")}.parquet'
).drop(columns='SALES')

coordinates = spark.read.parquet('dbfs:/jump-datasets/pipeline_data/radius_calc/all_new_dealers_coordinates.parquet')

out = dealer_to_dealer_distance(all_dealers, coordinates)
out = out.toPandas().drop_duplicates()

#save to Parquet:
out.to_parquet(f'/dbfs/FileStore/Manish Data/rough/dealer_to_dealer_distance_{end_date.strftime("%y%m")}.parquet')

In [0]:
dealer_registry = pd.read_parquet(f'/dbfs/jump-datasets/pipeline_data/base_files/dealers_{start_date.strftime("%y%m")}_{end_date.strftime("%y%m")}_make_sales.parquet')

pops_df = pd.read_parquet(f'/dbfs/jump-datasets/pipeline_data/radius_calc/tomforth_data_{end_date.strftime("%y%m")}.parquet')
pops_df = pops_df.merge(dealer_registry[['DEALER_NAME', 'DEALER_ADDRESS_FULL']], on='DEALER_ADDRESS_FULL', how='inner')

optimal_radius_df_final1 = pd.read_parquet(f'/dbfs/jump-datasets/pipeline_data/radius_calc/optimal_radius_df_{end_date.strftime("%y%m")}.parquet')

optimal_radius_df_final1 = optimal_radius_df_final1.merge(dealer_registry[['DEALER_NAME', 'DEALER_ADDRESS', 'DEALER_ADDRESS_FULL', 'MAKE']], on = 'DEALER_ADDRESS_FULL', how='inner')
optimal_radius_df_final1.rename(columns={'RADIUS':'Local Radius', 'POPULATION':'Local Population'}, inplace=True)
optimal_radius_df_final1 = optimal_radius_df_final1.drop_duplicates()
optimal_radius_df_final1 = optimal_radius_df_final1[['DEALER_NAME', 'DEALER_ADDRESS' ,'DEALER_ADDRESS_FULL', 'MAKE', 'Local Radius', 'Local Population']]

d2d = pd.read_parquet(f'/dbfs/FileStore/Manish Data/rough/dealer_to_dealer_distance_{end_date.strftime("%y%m")}.parquet')
d2d['DISTANCE'] = d2d['DISTANCE']*1.60934
d2d = d2d.drop_duplicates()

optimal_radius_df_final1 = merge_and_get_dealers_count(optimal_radius_df_final1, d2d)

df_30th_0th = fixing_more_than_30_local_dealers(optimal_radius_df_final1, d2d)

hilo_final = update_original_df(optimal_radius_df_final1, df_30th_0th)

population_constraint = 10000000

hilo_final = population_constraint_fix(population_constraint, hilo_final, d2d, pops_df)

h4 = hilo_final[
    ((hilo_final['Local Radius'] <= 20 * 1.60934) & 
     (hilo_final['Local Population'] < 3000000) & 
     (hilo_final['N Local Dealers'] <= 9)) 
    |
    ((hilo_final['N Local Dealers'] < 9) & 
     (hilo_final['Local Population'] <= 7000000))]

hilo_final = low_values_fix(h4, d2d, hilo_final)

hilo_final = population_constraint_fix_2(population_constraint, hilo_final, d2d, pops_df)

hilo_final['Local Radius'] = hilo_final['Local Radius']*0.621371
hilo_final['Local Radius'] = hilo_final['Local Radius'].astype(int)

print(hilo_final.describe())
hilo_final[['Local Radius', 'N Local Dealers', 'Local Population']].hist(bins=30, figsize=(12, 6))
plt.show()

hilo_final = hilo_final[['DEALER_NAME', 'DEALER_ADDRESS_FULL', 'Local Radius', 'Local Population', 'N Local Dealers']].rename(columns={'Local Radius':'RADIUS', 'Local Population':'POPULATION'})

savepath = f'/dbfs/jump-datasets/pipeline_data/radius_calc/optimal_radius_df_{end_date.strftime("%y%m")}.parquet'
hilo_final.to_parquet(savepath)

In [0]:

calc_type = 'location factors'

date = next(item.name for item in dbutils.fs.ls('dbfs:/jump-datasets/pipeline_data/radius_calc/') if 'dealer_location_factors_' in item.name).split('_')[3] 
date = datetime.strptime(date, '%y%m').replace(day=1).strftime('%Y-%m-%d')
date = (pd.to_datetime(date) + pd.DateOffset(months=6)).strftime('%y%m')

if date == end_date.strftime("%y%m"):
    print(f'Calculating {calc_type} ...')
    all_dealers = pd.read_parquet(f'/dbfs/jump-datasets/pipeline_data/base_files/dealers_{start_date.strftime("%y%m")}_{end_date.strftime("%y%m")}.parquet').drop(columns='SALES')
    mapping = pd.read_parquet('/dbfs/jump-datasets/files/other/model_mapping_ram_added.parquet')                  
    all_dealers = all_dealers[all_dealers['MAKE'].isin(list(mapping['Brand'].unique()))]
    print(f'Dealers: {len(all_dealers):,}')

    if calc_type == 'location factors':
        pops_df = spark.read.parquet(f'dbfs:/jump-datasets/pipeline_data/radius_calc/tomforth_data_{end_date.strftime("%y%m")}.parquet')
        density = spark.read.parquet(f'dbfs:/jump-datasets/pipeline_data/base_files/state_brand_dealer_density_{end_date.strftime("%y%m")}.parquet').select('Brand', 'state_code', 'n_dealers', 'pop2022').withColumnRenamed('Brand', 'MAKE')
        coordinates = spark.read.parquet('dbfs:/jump-datasets/pipeline_data/radius_calc/all_new_dealers_coordinates.parquet')
        for state_id in range(0, 9):
            savepath = f'dbfs:/jump-datasets/pipeline_data/radius_calc/dealer_location_factors_{end_date.strftime("%y%m")}_{state_names[state_id]}.parquet'
            dealers = all_dealers[all_dealers['DEALER_STATE_ABBRV'].isin(states[state_id])].reset_index(drop=True)
            print(f'{len(dealers)} - {list(dealers["DEALER_STATE_ABBRV"].sort_values().unique())}')
            
            out = dealer_location_factors(all_dealers, dealers, coordinates, density, pops_df)
            out.write.mode('overwrite').parquet(savepath)
            print(f'Saved: {savepath}')
else:
    print(f"This will be calculated in {datetime.strptime(date, '%y%m').strftime('%Y-%m')}.")