### Import Libraries

In [28]:
import re
from pyspark.sql.functions import length
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType

StatementMeta(, f696aaca-667e-4ab5-ae32-41ff45b696dc, 30, Finished, Available)

### Classification_Fn Definition

In [29]:
table_name = 'user_defined_regex_patterns'
df2 = spark.sql(f"select * from {table_name}")
map_value = {row['RegexPattern']: row['BusinessTerm'] for row in df2.collect()}  
map_value_bc = sc.broadcast(map_value)


def getbt(input_text):
    input_text = str(input_text)
    map_value = map_value_bc.value
    for k, v in map_value.items():
        if v == 'Year':
            try:
                if 1940 <= int(input_text) <= 2023:
                    return v
            except ValueError:
                pass  # it was not a number
        else:
            pattern = re.compile(k, re.IGNORECASE)
            match_pattern = pattern.match(input_text)
            if match_pattern:
                return v
    return None

convertUDF = udf(lambda input_text: getbt(input_text),StringType())

StatementMeta(, f696aaca-667e-4ab5-ae32-41ff45b696dc, 31, Finished, Available)

### Read Metadata table

In [30]:
metadata = spark.sql("SELECT * FROM poc_profiling_gd_lakehouse.config_profiling_classifn_metadata")
profile_clfn=metadata.filter("profileandclassification_ind==1")
for j in profile_clfn.collect():
    in_path=j.abfss_location
    df=spark.read.format('csv').option('header',True).option('inferSchema',True).load(in_path)
    df=df.limit(100)
    schema = StructType([
    StructField("DatabaseName", StringType(), True),
    StructField("TableName", StringType(), True),
    StructField("ColumnName", StringType(), True),
    StructField("TechnicalDataType", StringType(), True),
    StructField("Businessterm", StringType(), True),
    StructField("MatchPercentage", IntegerType(), True),
    StructField("TotalCount", LongType(), True),
    StructField("TotalDistinctCount", LongType(), True),
    StructField("NullorBlankCount", IntegerType(), True)
])


    cols=df.columns
    total_cnt=df.count()
    data=[]
    datatypes=dict(df.dtypes)
    df=df.cache()

    for i in cols:
        df_stage=df.withColumn(f"bt_{i}",convertUDF(col(i)))
        row=[j.database_name,j.table_name]
        row.append(i)
        int_col=df_stage.select(f"bt_{i}")
        notnull_int_col=int_col.filter(f"`bt_{i}` is not null")
        notnull_cnt=notnull_int_col.count()
        dis_val=notnull_int_col.distinct()
        val=dis_val.first()
        if val is not None:
            bt=val[0]
        else:
            bt=None
        datatype_column=datatypes.get(i)
        row.append(datatype_column)
        row.append(bt)
        match_perc=int((notnull_cnt/total_cnt)*100)
        row.append(match_perc)
        row.append(total_cnt)
        Distinctcnt=df.select(f"`{i}`").distinct().count()
        NullorBlankCount=df.filter(f"`{i}` is null or `{i}`==''").count()
        row.append(Distinctcnt)
        row.append(NullorBlankCount)
        data+=[row]
    profile=spark.createDataFrame(data,schema)
    if spark.catalog.tableExists(j.output_table):
        spark.sql(f"delete from {j.output_table} where DatabaseName='{j.database_name}' and TableName='{j.table_name}'")
        profile.createOrReplaceTempView('temp_profile')
        spark.sql(f"insert into {j.output_table} select * from temp_profile")
    else:
        print("Table does not exist")
        profile.write.format('delta').saveAsTable(j.output_table)



StatementMeta(, f696aaca-667e-4ab5-ae32-41ff45b696dc, 32, Finished, Available)