In [0]:
# from shapely import wkt
import math
import numpy as np
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    FloatType,
    IntegerType,
    ArrayType,
    MapType,
    DateType,
    DoubleType,
    BooleanType,
    LongType,
)

# 1. Crossjoin Stores to Postal Codes

In [0]:


stores = spark.sql(
    '''
SELECT STORES.STORE_NUM STORE_NUM, STORES.`STORE_NAME` STORE_NAME,STORES.`BANNER` BANNER, STORES.`STREET` STORE_STREET, STORES.POSTAL_CODE, STORES.PROVINCE STORE_PROVINCE, STORES.`STORE_CHAIN` CUSTOMER_GROUP, CAST(STORES.LONGITUDE AS DOUBLE) STORE_LONGITUDE, CAST(STORES.LATITUDE AS DOUBLE) STORE_LATITUDE, REVENUE_CY, STORES.`CHANNEL`, STORES.`SALES_GROUP`
FROM hyper.unioned_stores_52weeks_2024_0927  STORES
WHERE STORES.`STORE_CHAIN` IS NOT NULL AND REVENUE_CY IS NOT NULL 
''')
display(stores)

In [0]:
LDU = spark.sql(
"""
SELECT POSTALCODE, PROV, FIRST(LONGITUDE) LONGITUDE, FIRST(LATITUDE) LATITUDE
    FROM (SELECT * FROM combined_ldu_p1 /* This file from local Jupyter processing of shapefiles */
    UNION
    SELECT * FROM combined_ldu_p2
    ORDER BY POSTALCODE, AREA DESC)
    GROUP BY POSTALCODE, PROV
"""
)
LDU.show()

In [0]:
def adjacent_province_check(prov, store_prov):
    """
    Takes two provinces as input, returns True if provinces are identical or geographically adjacent. Otherwise returns False.
    """
    if (store_prov == 'AB'):
        if (prov == 'AB') | (prov == 'BC') | (prov == 'SK') | (prov == 'NT'):
            return True
    elif store_prov == 'BC':
        if (prov == 'AB') | (prov == 'BC') | (prov == 'YT'):
            return True
    elif store_prov == 'SK':
        if (prov == 'AB') | (prov == 'MB') | (prov == 'SK') | (prov == 'NT'):
            return True
    elif store_prov == 'MB':
        if (prov == 'ON') | (prov == 'MB') | (prov == 'SK') | (prov == 'NU'):
            return True
    elif store_prov == 'ON':
        if (prov == 'ON') | (prov == 'MB') | (prov == 'QC'):
            return True
    elif store_prov == 'QC':
        if (prov == 'ON') | (prov == 'NL') | (prov == 'QC') | (prov == 'NB'):
            return True
    elif store_prov == 'NB':
        if (prov == 'QC') | (prov == 'NS') | (prov == 'NB') | (prov == 'PE'):
            return True
    elif store_prov == 'NL':
        if (prov == 'NL') | (prov == 'QC') | (prov == 'PE'):
            return True
    elif store_prov == 'YT':
        if (prov == 'YT') | (prov == 'NT') | (prov == 'BC'):
            return True
    elif store_prov == 'NS':
        if (prov == 'NS') | (prov == 'NB') | (prov == 'PE'):
            return True
    elif store_prov == 'NT':
        if (prov == 'NT') | (prov == 'YT') | (prov == 'NU') | (prov == 'AB') | (prov == 'SK'):
            return True
    elif store_prov == 'NU':
        if (prov == 'NT') | (prov == 'NU') | (prov == 'MB'):
            return True
    elif store_prov == 'PE':
        if (prov == 'PE') | (prov == 'NS') | (prov == 'NB'):
            return True
    else:
        return False
adj_prov_check_udf = udf(adjacent_province_check, BooleanType())

In [0]:
# Check the adjacent province function is working properly
# stores.crossJoin(LDU).filter(adj_prov_check_udf(col('PROV'), col('STORE_PROVINCE'))==True).select('STORE_PROVINCE', 'PROV').distinct().orderBy('STORE_PROVINCE', 'PROV').collect()

In [0]:
def haversine_distance(lat1, lon1, lat2, lon2):
    """
    Takes latitude and longitude in degrees for two points, returns the Haversine distance (distance between two points on a sphere).
    """
    # Distance between latitudes and longitudes
    dLat = (lat2 - lat1) * np.pi / 180.0
    dLon = (lon2 - lon1) * np.pi / 180.0
    # Convert to radians
    lat1 = (lat1) * np.pi / 180.0
    lat2 = (lat2) * np.pi / 180.0
    r = 6371.009 # mean radius of the Earth
    return float(2*r*np.arcsin(np.sqrt(math.pow(np.sin((dLat)/2), 2)+ (math.pow(np.sin((dLon)/2), 2))*np.cos(lat1)*np.cos(lat2))))
distance_udf = udf(haversine_distance, DoubleType())

In [0]:
# Cross join the stores to postal codes (Roughly 7k stores * 900k LDUs = 6 bil records)
# Prefilter to reduce number of records before crossjoin
joined_df = stores.crossJoin(LDU).filter(adj_prov_check_udf(col('PROV'), col('STORE_PROVINCE'))==True)
# Calculate the haversine distance between each store and LDU coordinates
joined_df = joined_df.withColumn('HAVERSINE_DISTANCE', distance_udf(col('STORE_LATITUDE'), col('STORE_LONGITUDE'), col('LATITUDE'), col('LONGITUDE')))
# Filter to reduce number of records after crossjoin
joined_df = joined_df.withColumn('Attraction', col('REVENUE_CY')/col('HAVERSINE_DISTANCE')**2)
#### joined_df = joined_df.withColumn('Attraction', col('SALES')/col('HAVERSINE_DISTANCE')**2) 
# Calculate Total Attraction for each POSTALCODE
windowPartition = Window.partitionBy(['POSTALCODE'])#['STORE_NUM', 'CUSTOMER_GROUP'])
joined_df = joined_df.withColumn("TOTAL_POSTALCODE_ATTRACTION", sum('Attraction').over(windowPartition)) 
# Calculate running total of store-LDU attraction per store
joined_df = joined_df.withColumn('CUMULATIVE_PERCENT_ATTRACTION', sum(col('Attraction')/col('TOTAL_POSTALCODE_ATTRACTION')).over(windowPartition.orderBy((col('Attraction')/col('TOTAL_POSTALCODE_ATTRACTION')).asc()).rowsBetween(Window.unboundedPreceding, 0))).select('STORE_NUM', 'CUSTOMER_GROUP', 'STORE_NAME','CHANNEL','SALES_GROUP','BANNER', 'STORE_STREET', 'STORE_PROVINCE', 'POSTALCODE', 'STORE_LATITUDE','STORE_LONGITUDE','LATITUDE', 'LONGITUDE', 'PROV', 
              'Attraction', 'TOTAL_POSTALCODE_ATTRACTION', 'CUMULATIVE_PERCENT_ATTRACTION', 'REVENUE_CY', 'HAVERSINE_DISTANCE')
# Filter to exclude lower 1% running total of LDUs, PER STORE to reduce output size to ~500mil
# PERCENT_ATTRACTION for each store adds up to 100%
# joined_df_filtered = joined_df.filter(joined_df.CUMULATIVE_PERCENT_ATTRACTION > 0.01).withColumn('PERCENT_ATTRACTION', col('Attraction')/sum('Attraction').over(windowPartition)).select('STORE_NUM', 'CUSTOMER_GROUP', 'STORE_NAME', 'STORE_STREET', 'STORE_CITY', 'STORE_PROVINCE', 'POSTALCODE', 'LATITUDE', 'LONGITUDE', 'PROV', 
#               'Attraction', 'TOTAL_STORE_ATTRACTION', 'CUMULATIVE_PERCENT_ATTRACTION', 'PERCENT_ATTRACTION', 'DNNSI_IMPUTED', 'HAVERSINE_DISTANCE')
display(joined_df)

In [0]:
joined_df.write.mode("overwrite").option("overwriteSchema", "True").format("delta").saveAsTable(
    "hyper.store_LDU_attraction_0927"
)

In [0]:
joined_df_1 = stores.crossJoin(LDU).filter(adj_prov_check_udf(col('PROV'), col('STORE_PROVINCE'))==True)
# Calculate the haversine distance between each store and LDU coordinates
joined_df_1 = joined_df_1.withColumn('HAVERSINE_DISTANCE', distance_udf(col('STORE_LATITUDE'), col('STORE_LONGITUDE'), col('LATITUDE'), col('LONGITUDE')))
# Filter to reduce number of records after crossjoin
joined_df_1 = joined_df_1.withColumn('Attraction', col('DNNSI_IMPUTED')/col('HAVERSINE_DISTANCE')**2)
windowPartition = Window.partitionBy(['POSTALCODE'])#['STORE_NUM', 'CUSTOMER_GROUP'])
joined_df_filtered = joined_df.withColumn("TOTAL_POSTALCODE_ATTRACTION", sum('Attraction').over(windowPartition) - max('Attraction').over(windowPartition)) #excluding the max value (since this would be the postal code in which the store resides)
# Calculate running total of store-LDU attraction per store
joined_df_filtered = joined_df_filtered.withColumn('CUMULATIVE_PERCENT_ATTRACTION', sum(col('Attraction')/col('TOTAL_POSTALCODE_ATTRACTION')).over(windowPartition.orderBy((col('Attraction')/col('TOTAL_POSTALCODE_ATTRACTION')).asc()).rowsBetween(Window.unboundedPreceding, 0))).select('STORE_NUM', 'CUSTOMER_GROUP', 'STORE_NAME', 'STORE_STREET', 'STORE_CITY', 'STORE_PROVINCE', 'POSTALCODE', 'LATITUDE', 'LONGITUDE', 'PROV', 
              'Attraction', 'TOTAL_POSTALCODE_ATTRACTION', 'CUMULATIVE_PERCENT_ATTRACTION', 'DNNSI_IMPUTED', 'HAVERSINE_DISTANCE')

In [0]:
joined_df = stores.crossJoin(LDU).filter(adj_prov_check_udf(col('PROV'), col('STORE_PROVINCE'))==True)
# Calculate the haversine distance between each store and LDU coordinates
joined_df = joined_df.withColumn('HAVERSINE_DISTANCE', distance_udf(col('STORE_LATITUDE'), col('STORE_LONGITUDE'), col('LATITUDE'), col('LONGITUDE')))
# Filter to reduce number of records after crossjoin
joined_df = joined_df.withColumn('Attraction', col('DNNSI_IMPUTED')/col('HAVERSINE_DISTANCE')**2)
windowPartition = Window.partitionBy(['POSTALCODE'])
joined_df = joined_df.withColumn('Max_Attraction', max('Attraction').over(windowPartition))

display(joined_df)

In [0]:
# Cross join the stores to postal codes (Roughly 7k stores * 900k LDUs = 6 bil records)
# Prefilter to reduce number of records before crossjoin
joined_df_with_max = stores.crossJoin(LDU).filter(adj_prov_check_udf(col('PROV'), col('STORE_PROVINCE'))==True)
# Calculate the haversine distance between each store and LDU coordinates
joined_df_with_max = joined_df_with_max.withColumn('HAVERSINE_DISTANCE', distance_udf(col('STORE_LATITUDE'), col('STORE_LONGITUDE'), col('LATITUDE'), col('LONGITUDE')))
# Filter to reduce number of records after crossjoin
joined_df_with_max = joined_df_with_max.withColumn('Attraction', col('DNNSI_IMPUTED')/col('HAVERSINE_DISTANCE')**2)
#### joined_df = joined_df.withColumn('Attraction', col('SALES')/col('HAVERSINE_DISTANCE')**2) 
# Calculate Total Attraction for each POSTALCODE
windowPartition = Window.partitionBy(['POSTALCODE'])#['STORE_NUM', 'CUSTOMER_GROUP'])
joined_df_with_max = joined_df.withColumn("TOTAL_POSTALCODE_ATTRACTION", sum('Attraction').over(windowPartition)) # - max('Attraction').over(windowPartition)) #excluding the max value (since this would be the postal code in which the store resides)
# Calculate running total of store-LDU attraction per store
joined_df_with_max = joined_df_with_max.withColumn('CUMULATIVE_PERCENT_ATTRACTION', sum(col('Attraction')/col('TOTAL_POSTALCODE_ATTRACTION')).over(windowPartition.orderBy((col('Attraction')/col('TOTAL_POSTALCODE_ATTRACTION')).asc()).rowsBetween(Window.unboundedPreceding, 0))).select('STORE_NUM', 'CUSTOMER_GROUP', 'STORE_NAME', 'STORE_STREET', 'STORE_CITY', 'STORE_PROVINCE', 'POSTALCODE', 'LATITUDE', 'LONGITUDE', 'PROV', 
              'Attraction', 'TOTAL_POSTALCODE_ATTRACTION', 'CUMULATIVE_PERCENT_ATTRACTION', 'DNNSI_IMPUTED', 'HAVERSINE_DISTANCE')
# Filter to exclude lower 1% running total of LDUs, PER STORE to reduce output size to ~500mil
# PERCENT_ATTRACTION for each store adds up to 100%
# joined_df_filtered = joined_df.filter(joined_df.CUMULATIVE_PERCENT_ATTRACTION > 0.01).withColumn('PERCENT_ATTRACTION', col('Attraction')/sum('Attraction').over(windowPartition)).select('STORE_NUM', 'CUSTOMER_GROUP', 'STORE_NAME', 'STORE_STREET', 'STORE_CITY', 'STORE_PROVINCE', 'POSTALCODE', 'LATITUDE', 'LONGITUDE', 'PROV', 
#               'Attraction', 'TOTAL_STORE_ATTRACTION', 'CUMULATIVE_PERCENT_ATTRACTION', 'PERCENT_ATTRACTION', 'DNNSI_IMPUTED', 'HAVERSINE_DISTANCE')
display(joined_df_with_max)

In [0]:
joined_result = stores.crossJoin(LDU).filter(adj_prov_check_udf(col('PROV'), col('STORE_PROVINCE'))==True)
display(joined_result)

In [0]:
%sql
select STORE_NUM,Attraction,TOTAL_POSTALCODE_ATTRACTION, CUMULATIVE_PERCENT_ATTRACTION from store_LDU_attraction
where POSTALCODE= 'A0G2J0'

In [0]:
joined_df_temp = spark.sql("""
                            SELECT * FROM hyper.store_LDU_attraction_0927
                            WHERE Attraction > 0.005 * TOTAL_POSTALCODE_ATTRACTION
                            """)
windowPartition = Window.partitionBy(['POSTALCODE'])#['STORE_NUM', 'CUSTOMER_GROUP'])
joined_df_filtered = joined_df_temp.withColumn('PERCENT_ATTRACTION', col('Attraction')/sum('Attraction').over(windowPartition)).select('STORE_NUM', 'CUSTOMER_GROUP', 'STORE_NAME','CHANNEL','SALES_GROUP','BANNER', 'STORE_STREET', 'STORE_PROVINCE', 'POSTALCODE', 'STORE_LATITUDE','STORE_LONGITUDE','LATITUDE', 'LONGITUDE', 'PROV', 'Attraction', 'TOTAL_POSTALCODE_ATTRACTION', 'CUMULATIVE_PERCENT_ATTRACTION', 'PERCENT_ATTRACTION', 'REVENUE_CY', 'HAVERSINE_DISTANCE')

In [0]:
joined_df_filtered.write.mode("overwrite").option("overwriteSchema", "True").format("delta").saveAsTable(
    "hyper.store_LDU_attraction_f_halfp_0927"
)

In [0]:
%sql
select * from environics.ldu_feature_vals_melted_df_master_variable_0826_2

In [0]:
joined_df_temp = spark.sql("""
                            SELECT * FROM hyper.store_LDU_attraction_0910
                            WHERE Attraction > 0.01 * TOTAL_POSTALCODE_ATTRACTION
                            """)
windowPartition = Window.partitionBy(['POSTALCODE'])#['STORE_NUM', 'CUSTOMER_GROUP'])
joined_df_filtered = joined_df_temp.withColumn('PERCENT_ATTRACTION', col('Attraction')/sum('Attraction').over(windowPartition)).select('STORE_NUM', 'CUSTOMER_GROUP', 'STORE_NAME','CHANNEL','SALES_GROUP','BANNER', 'STORE_STREET', 'STORE_PROVINCE', 'POSTALCODE', 'STORE_LATITUDE','STORE_LONGITUDE','LATITUDE', 'LONGITUDE', 'PROV', 'Attraction', 'TOTAL_POSTALCODE_ATTRACTION', 'CUMULATIVE_PERCENT_ATTRACTION', 'PERCENT_ATTRACTION', 'REVENUE_CY', 'HAVERSINE_DISTANCE')

joined_df_filtered.write.mode("overwrite").option("overwriteSchema", "True").format("delta").saveAsTable(
    "hyper.store_LDU_attraction_fullp_filtered_0910"
)