<a href="https://colab.research.google.com/github/Shivvk/FGMC-Project/blob/main/FMGC%20Location%20Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [14]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
from os import environ
import findspark

In [15]:

# Setting environment variables
environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

In [16]:

# Init spark
findspark.init()


In [17]:

from pyspark.sql import SparkSession
# spark.sql.repl.eagerEval.enabled: Property used to format output tables better

spark = (
    SparkSession
    .builder
    .appName("cg-pyspark-assignment")
    .master("local")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .getOrCreate()
  )

spark

In [18]:
!pip install requests



In [19]:
import logging
from logging import Logger, FileHandler, Formatter
import requests
import json
from pyspark.sql.functions import col, explode, lit, udf, array_contains, avg, radians, sin, cos, sqrt, atan2, lit,  round as round_func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, ArrayType, TimestampType
from datetime import datetime, timezone
import hashlib
import math

# Initialize Spark session
spark = SparkSession.builder.appName("API Data Extraction").getOrCreate()

# Set up logging configuration
logger = logging.getLogger('API_Extraction_Logger')
logger.setLevel(logging.INFO)
file_handler = FileHandler('assignment.log')
formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

# Define the schema for the logging DataFrame
log_schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("logger_name", StringType(), True),
    StructField("log_level", StringType(), True),
    StructField("message", StringType(), True)
])

# List of API URLs
brand_urls = [
    'https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/clp-places',
    'https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/okay-places',
    'https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/spar-places',
    'https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/dats-places',
    'https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/cogo-colpnts'
]

# Function to fetch JSON data from URL
def fetch_json_data(url):
    try:
        response = requests.get(url)
        if response.status_code == 200:
            logger.info(f"Successfully fetched data from {url}")
            return response.json()
        else:
            logger.error(f"Failed to fetch data from {url} with status code {response.status_code}")
            return None
    except Exception as e:
        logger.error(f"Exception occurred while fetching data from {url}: {e}")
        return None

# Fetch data from all URLs and collect valid data
all_data = []
for url in brand_urls:
    data = fetch_json_data(url)
    if data:
        all_data.extend(data)  # Assuming the JSON data is a list of records
        print("json data as dict :", all_data)

# Check if there is any valid data to process
if all_data:
    # Convert the list of dictionaries to a JSON string
    json_data = json.dumps(all_data)
    print("json data  :", json_data)

    # Define schema for nested fields
    address_schema = StructType([
        StructField("streetName", StringType(), True),
        StructField("houseNumber", StringType(), True),
        StructField("postalcode", StringType(), True),
        StructField("cityName", StringType(), True),
        StructField("countryName", StringType(), True),
        StructField("countryCode", StringType(), True)
    ])

    ensign_schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])

    geo_coordinates_schema = StructType([
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True)
    ])

    opening_hours_schema = StructType([
        StructField("date", StringType(), True),
        StructField("opens", IntegerType(), True),
        StructField("closes", IntegerType(), True),
        StructField("isToday", BooleanType(), True),
        StructField("isOpenForTheDay", BooleanType(), True)
    ])

    place_type_schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("longName", StringType(), True),
        StructField("placeTypeDescription", StringType(), True)
    ])

    # Define the schema for the main JSON structure
    main_schema = StructType([
        StructField("placeId", IntegerType(), True),
        StructField("ensign", ensign_schema, True),
        StructField("commercialName", StringType(), True),
        StructField("branchId", StringType(), True),
        StructField("sourceStatus", StringType(), True),
        StructField("placeType", place_type_schema, True),
        StructField("sellingPartners", ArrayType(StringType()), True),
        StructField("handoverServices", ArrayType(StringType()), True),
        StructField("geoCoordinates", geo_coordinates_schema, True),
        StructField("address", address_schema, True),
        StructField("moreInfoUrl", StringType(), True),
        StructField("routeUrl", StringType(), True),
        StructField("isActive", BooleanType(), True),
        StructField("placeSearchOpeningHours", ArrayType(opening_hours_schema), True),
        StructField("temporaryClosures", ArrayType(StringType()), True)
    ])

    # Load JSON data into DataFrame
    df = spark.read.schema(main_schema).json(spark.createDataFrame([(json_data,)], ["value"]).rdd.map(lambda x: x[0]))

    print("after getting json data from url", display(df))

    # Explode the placeSearchOpeningHours array
    df = df.withColumn("placeSearchOpeningHours", explode(col("placeSearchOpeningHours")))

    print("after exploding placeSearchOpeningHours from json data ", display(df))

    # Extract and rename columns
    df = df.select(
        col("placeId"),
        col("ensign.id").alias("ensign_id"),
        col("ensign.name").alias("ensign_name"),
        col("commercialName"),
        col("branchId"),
        col("sourceStatus"),
        col("placeType.id").alias("placeType_id"),
        col("placeType.longName").alias("placeType_longName"),
        col("placeType.placeTypeDescription").alias("placeType_description"),
        col("sellingPartners"),
        col("handoverServices"),
        col("geoCoordinates.latitude").alias("latitude"),
        col("geoCoordinates.longitude").alias("longitude"),
        col("address.streetName"),
        col("address.houseNumber"),
        col("address.postalcode").alias("postal_code"),
        col("address.cityName"),
        col("address.countryName"),
        col("address.countryCode"),
        col("moreInfoUrl"),
        col("routeUrl"),
        col("isActive"),
        col("placeSearchOpeningHours.date").alias("opening_date"),
        col("placeSearchOpeningHours.opens").alias("opening_time"),
        col("placeSearchOpeningHours.closes").alias("closing_time"),
        col("placeSearchOpeningHours.isToday").alias("is_today"),
        col("placeSearchOpeningHours.isOpenForTheDay").alias("is_open_for_the_day"),
        col("temporaryClosures")
    )

    # Define postal code to province mapping function
    def derive_province(postal_code):
        if not postal_code:
            return None
        postal_code = int(postal_code)
        if 1000 <= postal_code <= 1299:
            return "Brussel"
        elif 1300 <= postal_code <= 1499:
            return "Waals-Brabant"
        elif 1500 <= postal_code <= 1999 or 3000 <= postal_code <= 3499:
            return "Vlaams-Brabant"
        elif 2000 <= postal_code <= 2999:
            return "Antwerpen"
        elif 3500 <= postal_code <= 3999:
            return "Limburg"
        elif 4000 <= postal_code <= 4999:
            return "Luik"
        elif 5000 <= postal_code <= 5999:
            return "Namen"
        elif 6000 <= postal_code <= 6599 or 7000 <= postal_code <= 7999:
            return "Henegouwen"
        elif 6600 <= postal_code <= 6999:
            return "Luxemburg"
        elif 8000 <= postal_code <= 8999:
            return "West-Vlaanderen"
        elif 9000 <= postal_code <= 9999:
            return "Oost-Vlaanderen"
        else:
            return None

    # Register the UDF
    derive_province_udf = udf(derive_province, StringType())

    # Add the province column
    df = df.withColumn("province", derive_province_udf(col("postal_code")))

    # One-hot encode the handoverServices array
    unique_services = df.select(explode(col("handoverServices"))).distinct().rdd.flatMap(lambda x: x).collect()
    for service in unique_services:
        df = df.withColumn(f"handoverService_{service}", array_contains(col("handoverServices"), lit(service)))

    # Drop sensitive columns and apply anonymization
    def hash_sensitive_info(value):
        return hashlib.sha256(value.encode('utf-8')).hexdigest()

    hash_sensitive_info_udf = udf(hash_sensitive_info, StringType())

    def anonymize_for_unauthorized(df):
        return df.withColumn("streetName", hash_sensitive_info_udf(col("streetName"))) \
                 .withColumn("houseNumber", hash_sensitive_info_udf(col("houseNumber")))

    def process_data_for_unauthorized_users(df):
        # Anonymize sensitive data
        anonymized_df = anonymize_for_unauthorized(df)
        return anonymized_df.drop("postal_code")  # Assuming postal code might also be considered sensitive

    def process_data_for_authorized_users(df):
        return df

    # Process the data
    unauthorized_df = process_data_for_unauthorized_users(df)
    authorized_df = process_data_for_authorized_users(df)

    # Fill null values with empty strings in unauthorized and authorized DataFrames
    unauthorized_df = unauthorized_df.fillna("")
    authorized_df = authorized_df.fillna("")

    # Save the results as Parquet files with partitioning
    unauthorized_df.write.partitionBy("province").mode("overwrite").parquet("unauthorized_data.parquet")
    authorized_df.write.partitionBy("province").mode("overwrite").parquet("authorized_data.parquet")

    # Example usage
    display(unauthorized_df)
    authorized_df.show()

else:
    logger.error("No valid data fetched from the APIs.")


INFO:API_Extraction_Logger:Successfully fetched data from https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/clp-places


json data as dict : [{'placeId': 902, 'ensign': {'id': 8, 'name': 'COLR_Colruyt'}, 'commercialName': 'AALST (COLRUYT)', 'branchId': '4156', 'sourceStatus': 'AC', 'placeType': {'id': 1, 'longName': 'Winkel', 'placeTypeDescription': 'Winkel'}, 'sellingPartners': ['QUALITY'], 'handoverServices': ['CSOP_ORDERABLE'], 'geoCoordinates': {'latitude': 50.933074, 'longitude': 4.0538972}, 'address': {'streetName': 'BRUSSELSE STEENWEG', 'houseNumber': '41', 'postalcode': '9300', 'cityName': 'AALST', 'countryName': 'België', 'countryCode': 'BE'}, 'moreInfoUrl': 'https://www.colruyt.be/nl/colruyt-openingsuren/4156', 'routeUrl': 'https://maps.apple.com/?daddr=50.933074,4.0538972', 'isActive': True, 'placeSearchOpeningHours': [{'date': '03-08-2024', 'opens': 830, 'closes': 2000, 'isToday': True, 'isOpenForTheDay': True}, {'date': '05-08-2024', 'opens': 830, 'closes': 2000, 'isToday': False, 'isOpenForTheDay': True}], 'temporaryClosures': []}, {'placeId': 946, 'ensign': {'id': 8, 'name': 'COLR_Colruyt'

INFO:API_Extraction_Logger:Successfully fetched data from https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/okay-places


json data as dict : [{'placeId': 902, 'ensign': {'id': 8, 'name': 'COLR_Colruyt'}, 'commercialName': 'AALST (COLRUYT)', 'branchId': '4156', 'sourceStatus': 'AC', 'placeType': {'id': 1, 'longName': 'Winkel', 'placeTypeDescription': 'Winkel'}, 'sellingPartners': ['QUALITY'], 'handoverServices': ['CSOP_ORDERABLE'], 'geoCoordinates': {'latitude': 50.933074, 'longitude': 4.0538972}, 'address': {'streetName': 'BRUSSELSE STEENWEG', 'houseNumber': '41', 'postalcode': '9300', 'cityName': 'AALST', 'countryName': 'België', 'countryCode': 'BE'}, 'moreInfoUrl': 'https://www.colruyt.be/nl/colruyt-openingsuren/4156', 'routeUrl': 'https://maps.apple.com/?daddr=50.933074,4.0538972', 'isActive': True, 'placeSearchOpeningHours': [{'date': '03-08-2024', 'opens': 830, 'closes': 2000, 'isToday': True, 'isOpenForTheDay': True}, {'date': '05-08-2024', 'opens': 830, 'closes': 2000, 'isToday': False, 'isOpenForTheDay': True}], 'temporaryClosures': []}, {'placeId': 946, 'ensign': {'id': 8, 'name': 'COLR_Colruyt'

INFO:API_Extraction_Logger:Successfully fetched data from https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/spar-places


json data as dict : [{'placeId': 902, 'ensign': {'id': 8, 'name': 'COLR_Colruyt'}, 'commercialName': 'AALST (COLRUYT)', 'branchId': '4156', 'sourceStatus': 'AC', 'placeType': {'id': 1, 'longName': 'Winkel', 'placeTypeDescription': 'Winkel'}, 'sellingPartners': ['QUALITY'], 'handoverServices': ['CSOP_ORDERABLE'], 'geoCoordinates': {'latitude': 50.933074, 'longitude': 4.0538972}, 'address': {'streetName': 'BRUSSELSE STEENWEG', 'houseNumber': '41', 'postalcode': '9300', 'cityName': 'AALST', 'countryName': 'België', 'countryCode': 'BE'}, 'moreInfoUrl': 'https://www.colruyt.be/nl/colruyt-openingsuren/4156', 'routeUrl': 'https://maps.apple.com/?daddr=50.933074,4.0538972', 'isActive': True, 'placeSearchOpeningHours': [{'date': '03-08-2024', 'opens': 830, 'closes': 2000, 'isToday': True, 'isOpenForTheDay': True}, {'date': '05-08-2024', 'opens': 830, 'closes': 2000, 'isToday': False, 'isOpenForTheDay': True}], 'temporaryClosures': []}, {'placeId': 946, 'ensign': {'id': 8, 'name': 'COLR_Colruyt'

INFO:API_Extraction_Logger:Successfully fetched data from https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/dats-places


json data as dict : [{'placeId': 902, 'ensign': {'id': 8, 'name': 'COLR_Colruyt'}, 'commercialName': 'AALST (COLRUYT)', 'branchId': '4156', 'sourceStatus': 'AC', 'placeType': {'id': 1, 'longName': 'Winkel', 'placeTypeDescription': 'Winkel'}, 'sellingPartners': ['QUALITY'], 'handoverServices': ['CSOP_ORDERABLE'], 'geoCoordinates': {'latitude': 50.933074, 'longitude': 4.0538972}, 'address': {'streetName': 'BRUSSELSE STEENWEG', 'houseNumber': '41', 'postalcode': '9300', 'cityName': 'AALST', 'countryName': 'België', 'countryCode': 'BE'}, 'moreInfoUrl': 'https://www.colruyt.be/nl/colruyt-openingsuren/4156', 'routeUrl': 'https://maps.apple.com/?daddr=50.933074,4.0538972', 'isActive': True, 'placeSearchOpeningHours': [{'date': '03-08-2024', 'opens': 830, 'closes': 2000, 'isToday': True, 'isOpenForTheDay': True}, {'date': '05-08-2024', 'opens': 830, 'closes': 2000, 'isToday': False, 'isOpenForTheDay': True}], 'temporaryClosures': []}, {'placeId': 946, 'ensign': {'id': 8, 'name': 'COLR_Colruyt'

INFO:API_Extraction_Logger:Successfully fetched data from https://ecgplacesmw.colruytgroup.com/ecgplacesmw/v3/nl/places/filter/cogo-colpnts


json data as dict : [{'placeId': 902, 'ensign': {'id': 8, 'name': 'COLR_Colruyt'}, 'commercialName': 'AALST (COLRUYT)', 'branchId': '4156', 'sourceStatus': 'AC', 'placeType': {'id': 1, 'longName': 'Winkel', 'placeTypeDescription': 'Winkel'}, 'sellingPartners': ['QUALITY'], 'handoverServices': ['CSOP_ORDERABLE'], 'geoCoordinates': {'latitude': 50.933074, 'longitude': 4.0538972}, 'address': {'streetName': 'BRUSSELSE STEENWEG', 'houseNumber': '41', 'postalcode': '9300', 'cityName': 'AALST', 'countryName': 'België', 'countryCode': 'BE'}, 'moreInfoUrl': 'https://www.colruyt.be/nl/colruyt-openingsuren/4156', 'routeUrl': 'https://maps.apple.com/?daddr=50.933074,4.0538972', 'isActive': True, 'placeSearchOpeningHours': [{'date': '03-08-2024', 'opens': 830, 'closes': 2000, 'isToday': True, 'isOpenForTheDay': True}, {'date': '05-08-2024', 'opens': 830, 'closes': 2000, 'isToday': False, 'isOpenForTheDay': True}], 'temporaryClosures': []}, {'placeId': 946, 'ensign': {'id': 8, 'name': 'COLR_Colruyt'

placeId,ensign,commercialName,branchId,sourceStatus,placeType,sellingPartners,handoverServices,geoCoordinates,address,moreInfoUrl,routeUrl,isActive,placeSearchOpeningHours,temporaryClosures
902,"{8, COLR_Colruyt}",AALST (COLRUYT),4156,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.933074, 4.053...",{BRUSSELSE STEENW...,https://www.colru...,https://maps.appl...,True,"[{03-08-2024, 830...",[]
946,"{8, COLR_Colruyt}",AALTER (COLRUYT),4218,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{51.0784761, 3.45...","{LOSTRAAT, 66, 98...",https://www.colru...,https://maps.appl...,True,"[{03-08-2024, 830...",[]
950,"{8, COLR_Colruyt}",AARSCHOT (COLRUYT),4222,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.9760369, 4.81...",{LEUVENSESTEENWEG...,https://www.colru...,https://maps.appl...,True,"[{03-08-2024, 830...",[]
886,"{8, COLR_Colruyt}",ALSEMBERG (COLRUYT),4138,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.7415212, 4.33...",{BRUSSELSESTEENWE...,https://www.colru...,https://maps.appl...,True,"[{03-08-2024, 830...",[]
783,"{8, COLR_Colruyt}",AMAY (COLRUYT),3853,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.5599284, 5.30...",{CHAUSSEE DE TONG...,https://www.colru...,https://maps.appl...,True,"[{03-08-2024, 830...",[]
650,"{8, COLR_Colruyt}",ANDENNE (COLRUYT),3596,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.4917055, 5.09...",{RUE DE LA PAPETE...,https://www.colru...,https://maps.appl...,True,"[{03-08-2024, 830...",[]
669,"{8, COLR_Colruyt}",ANDERLECHT (HERBE...,3620,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.8439965, 4.30...",{MAURICE HERBETTE...,https://www.colru...,https://maps.appl...,True,"[{03-08-2024, 830...",[]
744,"{8, COLR_Colruyt}",ANDERLECHT (VEEWE...,3759,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.8275378372, 4...",{BERGENSESTEENWEG...,https://www.colru...,https://maps.appl...,True,"[{03-08-2024, 830...",[]
448,"{8, COLR_Colruyt}",ANDERLUES (COLRUYT),3074,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.401257, 4.279...",{RUE DE LA STATIO...,https://www.colru...,https://maps.appl...,True,"[{03-08-2024, 830...",[]
681,"{8, COLR_Colruyt}",ANS (COLRUYT),3644,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.6588119, 5.53...",{RUE DES FRANCAIS...,https://www.colru...,https://maps.appl...,True,"[{03-08-2024, 830...",[]


after getting json data from url None


placeId,ensign,commercialName,branchId,sourceStatus,placeType,sellingPartners,handoverServices,geoCoordinates,address,moreInfoUrl,routeUrl,isActive,placeSearchOpeningHours,temporaryClosures
902,"{8, COLR_Colruyt}",AALST (COLRUYT),4156,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.933074, 4.053...",{BRUSSELSE STEENW...,https://www.colru...,https://maps.appl...,True,"{03-08-2024, 830,...",[]
902,"{8, COLR_Colruyt}",AALST (COLRUYT),4156,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.933074, 4.053...",{BRUSSELSE STEENW...,https://www.colru...,https://maps.appl...,True,"{05-08-2024, 830,...",[]
946,"{8, COLR_Colruyt}",AALTER (COLRUYT),4218,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{51.0784761, 3.45...","{LOSTRAAT, 66, 98...",https://www.colru...,https://maps.appl...,True,"{03-08-2024, 830,...",[]
946,"{8, COLR_Colruyt}",AALTER (COLRUYT),4218,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{51.0784761, 3.45...","{LOSTRAAT, 66, 98...",https://www.colru...,https://maps.appl...,True,"{05-08-2024, 830,...",[]
950,"{8, COLR_Colruyt}",AARSCHOT (COLRUYT),4222,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.9760369, 4.81...",{LEUVENSESTEENWEG...,https://www.colru...,https://maps.appl...,True,"{03-08-2024, 830,...",[]
950,"{8, COLR_Colruyt}",AARSCHOT (COLRUYT),4222,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.9760369, 4.81...",{LEUVENSESTEENWEG...,https://www.colru...,https://maps.appl...,True,"{05-08-2024, 830,...",[]
886,"{8, COLR_Colruyt}",ALSEMBERG (COLRUYT),4138,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.7415212, 4.33...",{BRUSSELSESTEENWE...,https://www.colru...,https://maps.appl...,True,"{03-08-2024, 830,...",[]
886,"{8, COLR_Colruyt}",ALSEMBERG (COLRUYT),4138,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.7415212, 4.33...",{BRUSSELSESTEENWE...,https://www.colru...,https://maps.appl...,True,"{05-08-2024, 830,...",[]
783,"{8, COLR_Colruyt}",AMAY (COLRUYT),3853,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.5599284, 5.30...",{CHAUSSEE DE TONG...,https://www.colru...,https://maps.appl...,True,"{03-08-2024, 830,...",[]
783,"{8, COLR_Colruyt}",AMAY (COLRUYT),3853,AC,"{1, Winkel, Winkel}",[QUALITY],[CSOP_ORDERABLE],"{50.5599284, 5.30...",{CHAUSSEE DE TONG...,https://www.colru...,https://maps.appl...,True,"{05-08-2024, 830,...",[]


after exploding placeSearchOpeningHours from json data  None


placeId,ensign_id,ensign_name,commercialName,branchId,sourceStatus,placeType_id,placeType_longName,placeType_description,sellingPartners,handoverServices,latitude,longitude,streetName,houseNumber,cityName,countryName,countryCode,moreInfoUrl,routeUrl,isActive,opening_date,opening_time,closing_time,is_today,is_open_for_the_day,temporaryClosures,province,handoverService_COLLECTNEXTDAY,handoverService_CSOP_ORDERABLE,handoverService_HOMEDELNEXTDAYDRIVER,handoverService_HOMEDELIVERYNEXTDAY,handoverService_PARCELCOLLECTNEXTDAY
902,8,COLR_Colruyt,AALST (COLRUYT),4156,AC,1,Winkel,Winkel,[QUALITY],[CSOP_ORDERABLE],50.933074,4.0538972,7a6c1e82d6d13dc16...,3d914f9348c9cc0ff...,AALST,België,BE,https://www.colru...,https://maps.appl...,True,03-08-2024,830,2000,True,True,[],Oost-Vlaanderen,False,True,False,False,False
902,8,COLR_Colruyt,AALST (COLRUYT),4156,AC,1,Winkel,Winkel,[QUALITY],[CSOP_ORDERABLE],50.933074,4.0538972,7a6c1e82d6d13dc16...,3d914f9348c9cc0ff...,AALST,België,BE,https://www.colru...,https://maps.appl...,True,05-08-2024,830,2000,False,True,[],Oost-Vlaanderen,False,True,False,False,False
946,8,COLR_Colruyt,AALTER (COLRUYT),4218,AC,1,Winkel,Winkel,[QUALITY],[CSOP_ORDERABLE],51.0784761,3.4500133,0a22f1e17ecf26857...,3ada92f28b4ceda38...,AALTER,België,BE,https://www.colru...,https://maps.appl...,True,03-08-2024,830,2000,True,True,[],Oost-Vlaanderen,False,True,False,False,False
946,8,COLR_Colruyt,AALTER (COLRUYT),4218,AC,1,Winkel,Winkel,[QUALITY],[CSOP_ORDERABLE],51.0784761,3.4500133,0a22f1e17ecf26857...,3ada92f28b4ceda38...,AALTER,België,BE,https://www.colru...,https://maps.appl...,True,05-08-2024,830,2000,False,True,[],Oost-Vlaanderen,False,True,False,False,False
950,8,COLR_Colruyt,AARSCHOT (COLRUYT),4222,AC,1,Winkel,Winkel,[QUALITY],[CSOP_ORDERABLE],50.9760369,4.8110969,0b2d467f21ec4548a...,749fc650cacb0f065...,AARSCHOT,België,BE,https://www.colru...,https://maps.appl...,True,03-08-2024,830,2000,True,True,[],Vlaams-Brabant,False,True,False,False,False
950,8,COLR_Colruyt,AARSCHOT (COLRUYT),4222,AC,1,Winkel,Winkel,[QUALITY],[CSOP_ORDERABLE],50.9760369,4.8110969,0b2d467f21ec4548a...,749fc650cacb0f065...,AARSCHOT,België,BE,https://www.colru...,https://maps.appl...,True,05-08-2024,830,2000,False,True,[],Vlaams-Brabant,False,True,False,False,False
886,8,COLR_Colruyt,ALSEMBERG (COLRUYT),4138,AC,1,Winkel,Winkel,[QUALITY],[CSOP_ORDERABLE],50.7415212,4.336719,7917c0ec9ed81753e...,9400f1b21cb527d7f...,ALSEMBERG,België,BE,https://www.colru...,https://maps.appl...,True,03-08-2024,830,2000,True,True,[],Vlaams-Brabant,False,True,False,False,False
886,8,COLR_Colruyt,ALSEMBERG (COLRUYT),4138,AC,1,Winkel,Winkel,[QUALITY],[CSOP_ORDERABLE],50.7415212,4.336719,7917c0ec9ed81753e...,9400f1b21cb527d7f...,ALSEMBERG,België,BE,https://www.colru...,https://maps.appl...,True,05-08-2024,830,2000,False,True,[],Vlaams-Brabant,False,True,False,False,False
783,8,COLR_Colruyt,AMAY (COLRUYT),3853,AC,1,Winkel,Winkel,[QUALITY],[CSOP_ORDERABLE],50.5599284,5.3061951,5e7fa97cfcd0e1797...,396f804443825586c...,AMAY,België,BE,https://www.colru...,https://maps.appl...,True,03-08-2024,830,2000,True,True,[],Luik,False,True,False,False,False
783,8,COLR_Colruyt,AMAY (COLRUYT),3853,AC,1,Winkel,Winkel,[QUALITY],[CSOP_ORDERABLE],50.5599284,5.3061951,5e7fa97cfcd0e1797...,396f804443825586c...,AMAY,België,BE,https://www.colru...,https://maps.appl...,True,05-08-2024,830,2000,False,True,[],Luik,False,True,False,False,False


+-------+---------+------------+--------------------+--------+------------+------------+------------------+---------------------+---------------+----------------+-------------+------------+--------------------+-----------+-----------+----------+-----------+-----------+--------------------+--------------------+--------+------------+------------+------------+--------+-------------------+-----------------+---------------+------------------------------+------------------------------+------------------------------------+-----------------------------------+------------------------------------+
|placeId|ensign_id| ensign_name|      commercialName|branchId|sourceStatus|placeType_id|placeType_longName|placeType_description|sellingPartners|handoverServices|     latitude|   longitude|          streetName|houseNumber|postal_code|  cityName|countryName|countryCode|         moreInfoUrl|            routeUrl|isActive|opening_date|opening_time|closing_time|is_today|is_open_for_the_day|temporaryClosures

In [20]:
from pyspark.sql.functions import col, udf, round as round_func, avg
from pyspark.sql.types import DoubleType
import math

# Define the Haversine function
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius in kilometers
    lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

# Create UDF for Haversine
haversine_udf = udf(haversine, DoubleType())

# Select necessary columns early
authorized_df = authorized_df.select("placeId", "latitude", "longitude", "placeType_id", "commercialName")

# Alias the DataFrames with unique names
customer_df = authorized_df.alias("cust")
other_df = authorized_df.alias("other")

# Cross join to calculate distances
distance_df = customer_df.crossJoin(other_df) \
    .withColumn("distance", haversine_udf(
        col("cust.latitude"), col("cust.longitude"),
        col("other.latitude"), col("other.longitude")
    )) \
    .filter(col("cust.placeId") != col("other.placeId"))

print("distance_df data as below:")
display(distance_df)

# Cache distance_df to avoid recomputation
distance_df.cache()

# Calculate average distance to all customers and round to 2 decimal places
avg_distance_customers_df = distance_df \
    .filter(col("other.placeType_id").isNotNull()) \
    .groupBy(col("cust.placeId").alias("placeId")) \
    .agg(round_func(avg("distance"), 2).alias("avg_distance_to_customers"))

print("avg_distance_customers_df data as below:")
display(avg_distance_customers_df)

# Calculate average distance to all competitors and round to 2 decimal places
avg_distance_competitors_df = distance_df \
    .filter(col("cust.placeType_id") == col("other.placeType_id")) \
    .groupBy(col("cust.placeId").alias("placeId")) \
    .agg(round_func(avg("distance"), 2).alias("avg_distance_to_competitors"))

print("avg_distance_competitors_df data as below:")
display(avg_distance_competitors_df)

# Join average distances back with the original DataFrame
final_df = authorized_df.alias("authorized") \
    .join(avg_distance_customers_df.alias("customers"), col("authorized.placeId") == col("customers.placeId"), "left") \
    .join(avg_distance_competitors_df.alias("competitors"), col("authorized.placeId") == col("competitors.placeId"), "left") \
    .select(
        col("authorized.placeId"),
        col("authorized.commercialName"),
        col("customers.avg_distance_to_customers"),
        col("competitors.avg_distance_to_competitors")
    )

# Fill missing values and drop duplicates
final_df = final_df.fillna({
    "avg_distance_to_customers": 0,
    "avg_distance_to_competitors": 0
}).dropDuplicates()

# Show the final result
display(final_df)


distance_df data as below:


placeId,latitude,longitude,placeType_id,commercialName,placeId.1,latitude.1,longitude.1,placeType_id.1,commercialName.1,distance
902,50.933074,4.0538972,1,AALST (COLRUYT),946,51.0784761,3.4500133,1,AALTER (COLRUYT),45.24038843466246
902,50.933074,4.0538972,1,AALST (COLRUYT),946,51.0784761,3.4500133,1,AALTER (COLRUYT),45.24038843466246
902,50.933074,4.0538972,1,AALST (COLRUYT),950,50.9760369,4.8110969,1,AARSCHOT (COLRUYT),53.253096687392045
902,50.933074,4.0538972,1,AALST (COLRUYT),950,50.9760369,4.8110969,1,AARSCHOT (COLRUYT),53.253096687392045
902,50.933074,4.0538972,1,AALST (COLRUYT),886,50.7415212,4.336719,1,ALSEMBERG (COLRUYT),29.12233778896256
902,50.933074,4.0538972,1,AALST (COLRUYT),886,50.7415212,4.336719,1,ALSEMBERG (COLRUYT),29.12233778896256
902,50.933074,4.0538972,1,AALST (COLRUYT),783,50.5599284,5.3061951,1,AMAY (COLRUYT),97.38923305188248
902,50.933074,4.0538972,1,AALST (COLRUYT),783,50.5599284,5.3061951,1,AMAY (COLRUYT),97.38923305188248
902,50.933074,4.0538972,1,AALST (COLRUYT),650,50.4917055,5.0930033,1,ANDENNE (COLRUYT),88.09837040377951
902,50.933074,4.0538972,1,AALST (COLRUYT),650,50.4917055,5.0930033,1,ANDENNE (COLRUYT),88.09837040377951


avg_distance_customers_df data as below:


placeId,avg_distance_to_customers
833,100.11
471,59.8
3794,72.78
463,90.7
496,61.97
148,63.52
2866,59.71
897,70.61
623,62.81
1025,64.8


avg_distance_competitors_df data as below:


placeId,avg_distance_to_competitors
833,99.38
471,60.16
3794,72.03
463,90.38
496,61.84
148,63.41
2866,59.41
897,70.13
623,62.85
1025,65.1


placeId,commercialName,avg_distance_to_customers,avg_distance_to_competitors
833,ASSEBROEK (COLRUYT),100.11,99.38
471,ETTERBEEK (GRAYST...,59.8,60.16
3794,NOORDERWIJK HEREN...,72.78,72.03
463,OKAY CRISNEE,90.7,90.38
496,OKAY HOFSTADE,61.97,61.84
148,Spar Leuven Bruss...,63.52,63.41
2866,VILVOORDE-MUTSAER...,59.71,59.41
897,DEURNE ZUID (COLR...,70.61,70.13
623,LEBBEKE (COLRUYT),62.81,62.85
1025,OKAY VOLLEZELE,64.8,65.1
