In [0]:
dbutils.widgets.text("country",'')
country = dbutils.widgets.get("country")

In [0]:
dbutils.widgets.text("week",'')
week = dbutils.widgets.get("week")

In [0]:
import pandas as pd
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, expr, udf, col, size, lit, count,explode,sum,lit, count,explode,sum,regexp_replace, coalesce,lower,concat_ws,array,split, concat_ws
from pyspark.sql.types import StructType, StructField, StringType,ArrayType, IntegerType
import os
from pyspark.sql.utils import AnalysisException

In [0]:
# establish connection
db_configs = [{
    'host': 'caprod-cpp-pgmnr-001.flatns.net',
    'database': 'mnr',
    'user': 'mnr_ro',
    'password': 'mnr_ro',
    'port': 5432
},
    {
        'host': 'caprod-cpp-pgmnr-002.flatns.net',
        'database': 'mnr',
        'user': 'mnr_ro',
        'password': 'mnr_ro',
        'port': 5432
    }
]

In [0]:
def findLatestSchemaName(input_list):
    max_value = -1
    latest_string = None

    for string in input_list:
        parts = string.split('_')
        if len(parts) >= 4:
            value = parts[3]
            if value.isdigit():
                value = int(value)
                if value > max_value:
                    max_value = value
                    latest_string = string

    return latest_string

def getSchemaWithInCountryDB(isocode: str, db_configs) -> list:
    # List to store query results
    results = []

    # Iterate through the databases and check if the schema exists
    for db_config in db_configs:
        conn = psycopg2.connect(**db_config)
        schemas = pd.read_sql(
            f"SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname LIKE '%{isocode}%'",
            con=conn
        )
        conn.close()

        if not schemas.empty:
            results.append({
                'database': db_config['host'],
                'schemas': list(schemas['nspname'])
            })

    return results

# Define a UDF to generate the array for "even" interpolation
# def generate_array_even(min_hsn, max_hsn):
#     return list(range(int(min_hsn), int(max_hsn) + 1, 2))

def generate_array_even(min_hsn, max_hsn):
    # return list(range(int(min_hsn), int(max_hsn) + 1, 2))
        # Check if min_hsn and max_hsn are not None before converting to int
    if min_hsn is not None and max_hsn is not None:
        # Convert to integers and generate the array
        return list(range(int(min_hsn), int(max_hsn) + 1, 2))
    else:
        # Handle the case where either min_hsn or max_hsn is None
        return []



# Define a UDF to generate the array for "odd" interpolation
def generate_array_odd(min_hsn, max_hsn):
    # Check if min_hsn and max_hsn are not None before converting to int
    if min_hsn is not None and max_hsn is not None:
        # Convert to integers and generate the array
        return list(range(int(min_hsn), int(max_hsn) + 1, 2))
    else:
        # Handle the case where either min_hsn or max_hsn is None
        return []


# Define a UDF to generate the array for "numeric_mixed" interpolation
def generate_array_numeric_mixed(min_hsn, max_hsn):
    if min_hsn is not None and max_hsn is not None:
    # Convert to integers and generate the array
        return list(range(int(min_hsn), int(max_hsn) + 1))
    else:
    # Handle the case where either min_hsn or max_hsn is None
        return []

# Define a UDF to generate the array for "numeric_mixed" interpolation
def generate_array_irregular(min_hsn, max_hsn):
    if min_hsn is not None and max_hsn is not None:
        if min_hsn == max_hsn:
            return [min_hsn]
        else:
    # Convert to integers and generate the array
            return [min_hsn, max_hsn]
    else:
    # Handle the case where either min_hsn or max_hsn is None
        return []
    
def string_to_array(string):
    if string:
        return [int(x.strip()) if x.strip() else 0 for x in string.split(',')]
    else:
        return []


# Function to update the array based on min_hsn and max_hsn
def update_array(min_hsn, max_hsn, hnr_array):
    if min_hsn == max_hsn:
        return hnr_array + [min_hsn]
    else:
        return hnr_array + [min_hsn, max_hsn]
    

def swapMin_hsnMax_hsnAlphabate(min_hsn_alpha, max_hsn_alpha):
    """
    Swaps the input HSN codes based on alphanumeric comparison.

    Parameters:
    - min_hsn_alpha (str): The minimum HSN code to be compared and potentially swapped.
    - max_hsn_alpha (str): The maximum HSN code to be compared and potentially swapped.

    Returns:
    tuple: A tuple containing the swapped HSN codes, or the original codes if they are already in the correct order.

    Example:
    ```python
    min_code, max_code = swapMin_hsnMax_hsnAlphabate("123A", "456B")
    print(min_code, max_code)  # Output: "123A" "456B"
    ```
    """
    if min_hsn_alpha is None or max_hsn_alpha is None:
        return min_hsn_alpha, max_hsn_alpha

    min_hsn_i, min_hsn_a = min_hsn_alpha[:-1], min_hsn_alpha[-1]
    max_hsn_i, max_hsn_a = max_hsn_alpha[:-1], max_hsn_alpha[-1]

    if (min_hsn_i.isdigit() and max_hsn_i.isdigit()) and (min_hsn_a.isalpha() and max_hsn_a.isalpha()):
        if ord(min_hsn_a) > ord(max_hsn_a):
            min_hsn_alpha, max_hsn_alpha = max_hsn_alpha, min_hsn_alpha
        return min_hsn_alpha, max_hsn_alpha
    else:
        return min_hsn_alpha, max_hsn_alpha

def generate_array_alphabetic(min_hsn_alphaN, max_hsn_alphaN):
    """
    Generates an array based on the input min_hsn and max_hsn values.

    Parameters:
        min_hsn_alphaN (str): The minimum HSN value.
        max_hsn_alphaN (str): The maximum HSN value.

    Returns:
        list: An array of values based on the conditions:
            - If the last character of both columns is an alphabet:
                - Segregates the last character and remaining digits.
                - Creates lists based on the conditions.
            - If the last character is not an alphabet, returns [min_hsn, max_hsn].
    """
    print(f"min_hsn_alphaN: {min_hsn_alphaN}, max_hsn_alphaN: {max_hsn_alphaN}")
    if min_hsn_alphaN is not None and max_hsn_alphaN is not None:
    
        if isinstance(min_hsn_alphaN, str) and isinstance(max_hsn_alphaN, str) and min_hsn_alphaN and max_hsn_alphaN:
            if min_hsn_alphaN[-1].isalpha() and max_hsn_alphaN[-1].isalpha():
                # Filter records where the last character of both columns is an alphabet
                # Segregate the last character and remaining digits
                min_alpha, min_digits = min_hsn_alphaN[-1], min_hsn_alphaN[:-1]
                max_alpha, max_digits = max_hsn_alphaN[-1], max_hsn_alphaN[:-1]

                # Create lists based on the conditions
                alpha_list = [chr(i) for i in range(ord(min_alpha), ord(max_alpha) + 1)]
                digits_list = [f"{min_digits}{alpha}" for alpha in alpha_list] if min_digits == max_digits else [min_hsn_alphaN, max_hsn_alphaN]

                print(f"digits_list: {digits_list}")
                
                return digits_list
    
    # If the last character is not an alphabet or input is not valid strings, return [min_hsn, max_hsn]
    return None


In [0]:
def format_query(schema) -> str:
    query = """
    SELECT *
    FROM (
    SELECT
        gl.feat_id AS road_id,
        nhnr.hsn_range_id,
        STRING_AGG(CASE WHEN hh.hsn_type = 1 THEN h.hsn END, ', ') AS min_hsn,
        STRING_AGG(CASE WHEN hh.hsn_type = 2 THEN h.hsn END, ', ') AS max_hsn,
--      STRING_AGG(CASE WHEN hh.hsn_type = 3 THEN h.hsn END, ', ') AS intermediate,
        STRING_AGG(
        CASE WHEN hh.hsn_type = 3 THEN
            REGEXP_REPLACE(h.hsn, '[^0-9;]', '', 'g')
        ELSE
            null 
        END,', ' ) AS intermediate_one,
        nhnr.line_side AS side_of_line,
        CASE
        WHEN nhnr.line_side= 2 THEN 'Left Side'
        WHEN nhnr.line_side= 3 THEN 'Right Side'
        WHEN nhnr.line_side= 1 THEN 'Both Sides'
        ELSE 'Unknown'  -- Handle other values, if any
    	END AS sideOfLine,

--      hsn.hsn_variance AS variance,
        CASE
        WHEN hsn.hsn_variance  = 2 THEN 'Even'
        WHEN hsn.hsn_variance  = 3 THEN 'Odd'
        WHEN hsn.hsn_variance  = 4 THEN 'Numeric_Mixed'
        WHEN hsn.hsn_variance  = 5 THEN 'Irregular'
        WHEN hsn.hsn_variance  = 6 THEN 'Alphabetic'
        ELSE 'Unknown'  -- Handle other values, if any
    	END AS interpolation,
    	hsn.hsn_variance as variance

    FROM
        "{schema}".mnr_netw_geo_link gl
    LEFT JOIN
        "{schema}".mnr_netw2hsn_range nhnr ON gl.feat_id = nhnr.netw_id
    LEFT JOIN
        "{schema}".mnr_hsn_range AS hsn ON hsn.hsn_range_id = nhnr.hsn_range_id
    LEFT JOIN
        "{schema}".mnr_hsn_range2hsn AS hh ON hh.hsn_range_id = hsn.hsn_range_id
    INNER JOIN
        "{schema}".mnr_hsn h USING (hsn_id)
    WHERE
        nhnr.hsn_range_id IS NOT null
    GROUP BY
        gl.feat_id, nhnr.hsn_range_id, nhnr.line_side, hsn.hsn_variance
) a""".format(schema=schema)

    return query

In [0]:
def linkAptCount(schema) -> str:
    query = """
    select hnr_.code as country_code,
    hnr_.hnr_netw_id,
    hnr_.hnr_hsn_range_id,
    hnr_.hnr_line_side,
    _apt.link_apt_count,
    _apt.apt_netw_id,
    _apt.apt_line_side
    from 
    (select mnr_netw2hsn_range.netw_id as hnr_netw_id,
    hsn_range_id as hnr_hsn_range_id,
    mnr_netw2hsn_range.line_side as hnr_line_side,
    mnr_netw2admin_area.code
    from "{schema}".mnr_netw2hsn_range
    inner join "{schema}".mnr_netw_route_link
    on mnr_netw2hsn_range.netw_id = mnr_netw_route_link.feat_id
    inner join "{schema}".mnr_netw2admin_area
    on mnr_netw_route_link.feat_id = mnr_netw2admin_area.netw_id
    where mnr_netw2admin_area.feat_type = 1111) hnr_
    left join
    (select count (distinct apt_id) as link_apt_count,
    mnr_apt_entrypoint.netw_id as apt_netw_id,
    mnr_netw2admin_area.code,
    mnr_apt_entrypoint.line_side as apt_line_side
    from "{schema}".mnr_netw2hsn_range
    inner join "{schema}".mnr_netw_route_link
    on mnr_netw2hsn_range.netw_id = mnr_netw_route_link.feat_id
    inner join "{schema}".mnr_netw2admin_area
    on mnr_netw_route_link.feat_id = mnr_netw2admin_area.netw_id
    inner join "{schema}".mnr_apt_entrypoint
    on mnr_netw2hsn_range.netw_id = mnr_apt_entrypoint.netw_id
    where mnr_apt_entrypoint.ep_type_postal is true
    and mnr_netw2hsn_range.line_side = mnr_apt_entrypoint.line_side
    and mnr_netw2admin_area.feat_type = 1111
    group by mnr_apt_entrypoint.netw_id, mnr_apt_entrypoint.line_side,mnr_netw2admin_area.code) _apt
    on concat(hnr_.hnr_netw_id,'_',hnr_.hnr_line_side) = concat(_apt.apt_netw_id,'_',_apt.apt_line_side)
    """.format(schema=schema)
    return query

In [0]:

# countryList = ["deu","gbr","fra","ita","esp","nld","bel","pol","aut","prt","swe","che","dnk","cze","grc","hun","rou","irl","fin","svk","svn","hrv","bgr","ltu","est","lva","cyp","lux","mlt"]

# countryList = ['gbr']

# country =  lower(country)
# print(country)

In [0]:
country = country.lower()
print(country)

In [0]:
# all_dfs = []
if country in ['usa','can']:
    print(f"{country} is big country")

else:
    # for country in countryList:
    schemaList = getSchemaWithInCountryDB(country, db_configs)

    # Get the Data Base Name
    database = schemaList[0]['database']

    # Get the Schema List
    schemas = schemaList[0]['schemas']

    # Calling get latest Schema
    SchemaName = findLatestSchemaName(schemas)
    # Define the PGSQL server connection properties
    pg_properties = {
        "user": "mnr_ro",
        "password": "mnr_ro",
        "driver": "org.postgresql.Driver",
        "url": f"jdbc:postgresql://{database}/mnr"
                    }
    formatted_query = format_query(SchemaName)
    linkAptCount=linkAptCount(SchemaName)
    # Step 4: Read data from PostgreSQL

    # Address Ranges 
    AddressRanges_df = spark.read.jdbc(url=pg_properties["url"], table=f"({formatted_query}) as subquery", properties=pg_properties)
    # Link APT Count
    link_df = spark.read.jdbc(url=pg_properties["url"], table=f"({linkAptCount}) as subquery", properties=pg_properties)

##### Genesis: Left Join Address Ranges, link Apt Count

In [0]:
genesisAddressRangesAptLink = AddressRanges_df.join(link_df,(AddressRanges_df['road_id']==link_df['hnr_netw_id']) & (AddressRanges_df['side_of_line']==link_df['hnr_line_side']), how='left')

In [0]:
# ################# repartion Data ###################
# df = df.repartition(10)
df = genesisAddressRangesAptLink
# # Add a new column "country" with the value 'FRA' to every row
df = df.withColumn("country", lit(country))

df = df.withColumn("interpolation", lower(col("interpolation")))

# create new column "min_hsn_alpha" for "interpolation" == "alphabetic"
df = df.withColumn("min_hsn_alpha",when(col("interpolation") == "alphabetic", col("min_hsn")).otherwise(None))
df = df.withColumn("max_hsn_alpha",when(col("interpolation") == "alphabetic", col("max_hsn")).otherwise(None))

# Remove alphabetic parts from the values ""max_hsn" and "min_hsn"
df = df.withColumn("max_hsn", regexp_replace(col("max_hsn"), "[^0-9]", ""))
df = df.withColumn("min_hsn", regexp_replace(col("min_hsn"), "[^0-9]", ""))


# Convert string columns to integers
df = df.withColumn("min_hsn", df["min_hsn"].cast(IntegerType()))
df = df.withColumn("max_hsn", df["max_hsn"].cast(IntegerType()))

# Swap values if min_hsn > max_hsn
df = df.withColumn("temp",when(col("min_hsn") > col("max_hsn"), col("min_hsn")).otherwise(col("max_hsn")))

df = df.withColumn("max_hsn",when(col("min_hsn") > col("max_hsn"), col("min_hsn")).otherwise(col("max_hsn")))

df = df.withColumn("min_hsn",when(col("min_hsn") > col("max_hsn"), col("temp")).otherwise(col("min_hsn")))

# Drop the temporary column
df = df.drop("temp")

# Register the custom function as a UDF for "interpolation"=='alphabetic' Swapping
swap_udf = udf(swapMin_hsnMax_hsnAlphabate, StructType([StructField("min_hsn_alpha", StringType(), True),
                                                    StructField("max_hsn_alpha", StringType(), True)]))

# Apply the swap_udf UDF to the DataFrame
df = df.withColumn("result", swap_udf("min_hsn_alpha", "max_hsn_alpha"))

# Extracting values from the struct column and creating new columns
df = df.withColumn("min_hsn_alphaN", lower(col("result.min_hsn_alpha")))
df = df.withColumn("max_hsn_alphaN", lower(col("result.max_hsn_alpha")))

# Register the UDF alphabetic Array Creation
generate_array_alphabetic_udf = udf(generate_array_alphabetic, ArrayType(StringType()))

# Apply the UDF to the DataFrame
df_temp = df.withColumn("hnr_array_result", generate_array_alphabetic_udf(col("min_hsn_alphaN"), col("max_hsn_alphaN")))


# Create new columns based on conditions "generate_array_alphabetic_udf"
df_temp = df_temp.withColumn(
    "hnr_array_alphabetic",
    when((col("interpolation") == "alphabetic") & (col("intermediate_one").isNull()), 
        generate_array_alphabetic_udf(col("min_hsn_alphaN"), col("max_hsn_alphaN"))))


# Register the UDFs
generate_array_even_udf = udf(generate_array_even, ArrayType(IntegerType()))
generate_array_odd_udf = udf(generate_array_odd, ArrayType(IntegerType()))
generate_array_numeric_mixed_udf = udf(generate_array_numeric_mixed, ArrayType(IntegerType()))
generate_array_irregular_udf = udf(generate_array_irregular, ArrayType(IntegerType()))
# generate_array_alphabetic_udf = udf(generate_array_numeric_mixed, ArrayType(IntegerType()))

#  Register the UDF String to Array 
string_to_array_udf = udf(string_to_array, ArrayType(IntegerType()))

# Register the UDF intermediate_one_array
update_array_udf = udf(update_array, ArrayType(IntegerType()))


# Apply the UDF to create a new column with the converted array
df_temp = df_temp.withColumn("intermediate_one_array", string_to_array_udf("intermediate_one"))


# # Apply the UDF to update the array
df_temp = df_temp.withColumn("hnr_array_intermediate", update_array_udf(col("min_hsn"), col("max_hsn"), col("intermediate_one_array")))

In [0]:
df_temp.printSchema()

In [0]:
# Combine conditional column creation into a single operation
df_temp = df_temp.withColumn(
    "hnr_array",
    when((col("interpolation") == "even") & (col("intermediate_one").isNull()), generate_array_even_udf(col("min_hsn"), col("max_hsn")))
    .when((col("interpolation") == "odd") & (col("intermediate_one").isNull()), generate_array_odd_udf(col("min_hsn"), col("max_hsn")))
    .when((col("interpolation") == "numeric_mixed") & (col("intermediate_one").isNull()), generate_array_numeric_mixed_udf(col("min_hsn"), col("max_hsn")))
    .when((col("interpolation") == "irregular") & (col("intermediate_one").isNull()), generate_array_irregular_udf(col("min_hsn"), col("max_hsn")))
    .otherwise(col("hnr_array_intermediate"))
)

In [0]:
# count Array House Number count 
df_temp = df_temp.withColumn("hnr_array_count", size(col("hnr_array")).cast("int"))

# Replace null values with 0 in the "hnr_array_count" column
df_temp = df_temp.fillna(0, subset=["hnr_array_count","link_apt_count"])
df_temp = df_temp.withColumn("Ranges_count", lit(1))

# Select and keep only the specified columns
df_temp = df_temp.select("road_id","country","country_code","min_hsn","max_hsn","min_hsn_alphaN","max_hsn_alphaN","interpolation","intermediate_one", "hnr_array_count","hnr_array","link_apt_count","sideofline","Ranges_count")
# Select and keep only the specified columns
# df_temp = df_temp.select("min_hsn","max_hsn","min_hsn_alphaN","max_hsn_alphaN","intermediate_one","interpolation", "hnr_array_count","hnr_array") 

In [0]:
 # concatenated_df = all_dfs[0]
# for df in all_dfs[1:]:
#     concatenated_df = concatenated_df.union(df) 

In [0]:
df_temp.printSchema()

In [0]:
# df_temp = df_temp.repartition(8)

In [0]:
# parquet_df = df_temp.groupBy("country","interpolation").sum("hnr_array_count").alias("expaned_addresses_count")


In [0]:
result = df_temp.groupby("country","country_code","interpolation") \
    .agg(
        sum("hnr_array_count").alias("sum_hnr_array_count"),
        sum("Ranges_count").alias("AddreesRangesCount"),
        sum(when(col("link_apt_count") == 0, col("hnr_array_count")).otherwise(0)).alias("link_apt_count_eq_0"),
        sum(when(col("link_apt_count") == 1, col("hnr_array_count")).otherwise(0)).alias("link_apt_count_eq_1"),
        sum(when(col("link_apt_count") > 1, col("hnr_array_count")).otherwise(0)).alias("link_apt_count_gt_1"),
        sum(when(col("hnr_array_count").between(0, 500), col("hnr_array_count")).otherwise(0)).alias("hnr_ranges_0_500"),
        sum(when(col("hnr_array_count").between(501, 1000), col("hnr_array_count")).otherwise(0)).alias("hnr_ranges_501_1000"),
        sum(when(col("hnr_array_count").between(1001, 2000), col("hnr_array_count")).otherwise(0)).alias("hnr_ranges_1001_2000"),
        sum(when(col("hnr_array_count").between(2001, 3000), col("hnr_array_count")).otherwise(0)).alias("hnr_ranges_2001_3000"),
        sum(when(col("hnr_array_count").between(3001, 5000), col("hnr_array_count")).otherwise(0)).alias("hnr_ranges_3001_5000"),
        sum(when(col("hnr_array_count") > 5000, col("hnr_array_count")).otherwise(0)).alias("hnr_ranges_grt_5000")
    )

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

result = result.withColumn('current_date',current_date()).withColumn('releaseVersion', lit(week))


In [0]:
# display(result)

In [0]:
# Check if the mount point already exists
if not any(mountPoint.mountPoint == '/mnt/addressrangesblob' for mountPoint in dbutils.fs.mounts()):
    # If the mount point doesn't exist, proceed with the mount operation
    dbutils.fs.mount(
        source='wasbs://address-ranges@addressrangesduplicate.blob.core.windows.net',
        mount_point='/mnt/addressrangesblob',
        extra_configs={
            'fs.azure.account.key.addressrangesduplicate.blob.core.windows.net': 'p32JpcVTxylnEHoUIX91DUdaaGPE+P3Ipb8zGy8EfDiz01We6y1nl4ypauEQNGWa0rXdxP6SM0Py+ASt4Hbptg=='
        }
    )

In [0]:
print(SchemaName)

In [0]:
from pyspark.sql.functions import *

In [0]:
# Get the current year and save it into a variable
current_year_df = spark.range(1).withColumn("current_year", year(current_date()))
current_year = current_year_df.collect()[0]["current_year"]

In [0]:
path = "/mnt/addressrangesblob/Genesyis/"
finalPath = f"{path}Genesyis_{current_year}_week_{week}"
print(finalPath)


In [0]:
# Try to list the directory and handle the exception
file_exists = False
try:
    files = dbutils.fs.ls(finalPath)
    if files:
        file_exists = True
except Exception as e:
    file_exists = False

# Check if file exists
if file_exists:
    result.write.format("delta").mode("append").save(finalPath)
else:
    result.write.format("delta").option("header", "true").save(finalPath)
    

In [0]:
dbutils.fs.ls(finalPath)

In [0]:
spark.stop()