In [None]:
def clean_all_date_fields(df, columns):
    from datetime import datetime
    from pyspark.sql import functions as F

    spark = df.sparkSession
    spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
    spark.conf.set("spark.sql.ansi.enabled", "false")

    now = datetime.now()
    current_year = now.year
    min_year = 1900
    invalid_values = ["null", "none", "n/a", "missing", "not null", "", "na"]

    if isinstance(columns, str):
        columns = [columns]

    for c in columns:
        if c not in df.columns:
            continue

        df = (
            df.withColumn(c, F.col(c).cast("string"))
              .withColumn(c, F.trim(F.col(c)))
              .withColumn(c, F.regexp_replace(F.col(c), "[–—−]", "-"))
              .withColumn(c, F.upper(F.col(c)))
              .withColumn(c, F.when(F.col(c).isin(invalid_values), None).otherwise(F.col(c)))
        )

        # non-throwing coalesce chain (only to_date / to_timestamp)
        parsed = F.coalesce(
            F.to_date(F.col(c), "yyyy-MM-dd"),
            F.to_date(F.col(c), "yyyy/MM/dd"),
            F.to_date(F.col(c), "MM/dd/yyyy"),
            F.to_date(F.col(c), "dd-MM-yyyy"),
            F.to_date(F.col(c), "dd/MM/yyyy"),
            F.to_date(F.col(c), "yyyyMMdd"),
            F.to_date(F.col(c), "MMM dd, yyyy"),
            F.to_date(F.col(c), "MMMM dd, yyyy"),
            F.to_date(F.col(c), "dd-MMM-yyyy"),
            F.to_date(F.col(c), "dd-MMMM-yyyy"),
            F.to_date(F.col(c), "dd-MMM-yy"),
            F.to_date(F.col(c), "MMM dd, yy"),
            F.to_date(F.col(c), "MM/dd/yy"),
            F.to_date(F.col(c), "M/d/yy"),
            F.to_timestamp(F.col(c), "M/d/yy h:mm a"),
            F.to_timestamp(F.col(c), "MM/dd/yy h:mm a"),
            F.to_timestamp(F.col(c), "M/d/yyyy h:mm a")
        )

        df = df.withColumn(c, parsed)
        df = df.withColumn(
            c,
            F.when((F.year(F.col(c)) < min_year) | (F.year(F.col(c)) > current_year), None)
             .otherwise(F.col(c))
        )
        df = df.withColumn(c, F.date_format(F.col(c), "yyyy-MM-dd"))

    return df

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("DOB_Test")
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
    .getOrCreate()
)
data = [
    ("Amy", "1-1-1823"),
    ("andrew", "10-Aug-22"),
    ("greg", "null"),
    ("harry", "2001-09-03"),
    ("hulk", "20230806"),
    ("jack", "1/1/1823"),
    ("jones", "20-Jul-2000"),
    ("Lisa", "2/29/2023"),
    ("parker", "invalid"),
    ("sam", "1/1/23 12:00 AM"),
    ("Sean", "na"),
    ("sia", "September 4, 2000"),
    ("sid", "20-July-2000"),
    ("William", "8/1/23 12:00 PM")
]

columns = ["first_name", "DOB"]

df = spark.createDataFrame(data, columns)
df.show(truncate=False)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/21 19:00:00 WARN Utils: Your hostname, Prasannas-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.68.109 instead (on interface en0)
25/10/21 19:00:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/21 19:00:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+----------+-----------------+
|first_name|DOB              |
+----------+-----------------+
|Amy       |1-1-1823         |
|andrew    |10-Aug-22        |
|greg      |null             |
|harry     |2001-09-03       |
|hulk      |20230806         |
|jack      |1/1/1823         |
|jones     |20-Jul-2000      |
|Lisa      |2/29/2023        |
|parker    |invalid          |
|sam       |1/1/23 12:00 AM  |
|Sean      |na               |
|sia       |September 4, 2000|
|sid       |20-July-2000     |
|William   |8/1/23 12:00 PM  |
+----------+-----------------+



In [3]:
cleaned_df = clean_all_date_fields(df, "DOB")

In [4]:
cleaned_df.show()

                                                                                

+----------+----------+
|first_name|       DOB|
+----------+----------+
|       Amy|      NULL|
|    andrew|      NULL|
|      greg|      NULL|
|     harry|2001-09-03|
|      hulk|2023-08-06|
|      jack|      NULL|
|     jones|      NULL|
|      Lisa|      NULL|
|    parker|      NULL|
|       sam|      NULL|
|      Sean|      NULL|
|       sia|2000-09-04|
|       sid|      NULL|
|   William|      NULL|
+----------+----------+

