Python script to identify which jde_proddta tables where data seems to be "stale" (no updates today)

In [0]:
%python
from pyspark.sql.functions import col, current_date, to_date

# Function to find all tables with "proddta" in their names
def find_proddta_tables(schema_name):
    tables_df = spark.sql(f"SHOW TABLES IN {schema_name}")
    proddta_tables = tables_df.filter(col("tableName").like("%proddta%")) \
                              .select("tableName") \
                              .collect()
    return [f"{schema_name}.{row['tableName']}" for row in proddta_tables]

# Use the function to get the list of tables
tables = find_proddta_tables("gms_us_lake")

# Function to check if a table has data from today
def check_table(table_name):
    df = spark.table(table_name)
    if df.filter(to_date(col("AUD_LD_DTS")) == current_date()).count() > 0:
        return (table_name, "Updated Today")
    else:
        return (table_name, "Stale Data")

# Check each table and collect results
results = [check_table(table) for table in tables]

# Convert results to a DataFrame for display
results_df = spark.createDataFrame(results, ["table_name", "status"])
display(results_df)

In [0]:
%python
from pyspark.sql.functions import col, current_date, to_date, regexp_extract

# Function to find all tables with "proddta" in their names
def find_proddta_tables(schema_name):
    tables_df = spark.sql(f"SHOW TABLES IN {schema_name}")
    proddta_tables = tables_df.filter(col("tableName").like("%erp_glbl%")) \
                              .filter(~col("tableName").like("%bckup%")) \
                              .filter(~col("tableName").like("%bkp%")) \
                              .filter(~col("tableName").like("%bck%")) \
                              .filter(~col("tableName").like("%dummy%")) \
                              .filter(~col("tableName").like("%test%")) \
                              .filter(~col("tableName").like("%phase%")) \
                              .select("tableName") \
                              .collect()
    return [f"{schema_name}.{row['tableName']}" for row in proddta_tables]

# Use the function to get the list of tables
tables = find_proddta_tables("gms_us_hub")

# Function to check if a table has data from today and extract distinct values
def check_table(table_name):
    df = spark.table(table_name)
    if "AUD_LD_DTS" in df.columns:
        if df.filter(to_date(col("AUD_LD_DTS")) == current_date()).count() > 0:
            status = "Updated Today"
        else:
            status = "Stale Data"
    else:
        status = "Column Not Found"
    
    if "AUD_FILE_NM" in df.columns:
        distinct_values = df.select(regexp_extract(col("AUD_FILE_NM"), 'PRODDTA/([^/]+)/', 1).alias("extracted_value")).distinct().collect()
        extracted_values = [row["extracted_value"] for row in distinct_values if row["extracted_value"]]
        extracted_value = extracted_values[0] if extracted_values else "N/A"
    else:
        extracted_value = "N/A"
    
    return (table_name, status, extracted_value)

# Check each table and collect results
hub_erp_results = [check_table(table) for table in tables]

# Convert results to a DataFrame for display
erp_results_df = spark.createDataFrame(hub_erp_results, ["table_name", "status", "extracted_value"])
display(erp_results_df)

# combined

In [0]:
%python
from pyspark.sql.functions import col, current_date, to_date, regexp_extract

# Function to find all tables with "proddta" in their names
def find_proddta_tables(schema_name):
    tables_df = spark.sql(f"SHOW TABLES IN {schema_name}")
    proddta_tables = tables_df.filter(col("tableName").like("%proddta%")) \
                              .select("tableName") \
                              .collect()
    return [f"{schema_name}.{row['tableName']}" for row in proddta_tables]

# Use the function to get the list of tables
tables = find_proddta_tables("gms_us_lake")

# Function to check if a table has data from today
def check_table(table_name):
    df = spark.table(table_name)
    if df.filter(to_date(col("AUD_LD_DTS")) == current_date()).count() > 0:
        return (table_name, "Updated Today")
    else:
        return (table_name, "Stale Data")

# Check each table and collect results
results = [check_table(table) for table in tables]

# Convert results to a DataFrame for display
results_df = spark.createDataFrame(results, ["table_name_lake", "status_lake"])
display(results_df)

# Function to find all tables with "proddta" in their names
def find_proddta_tables(schema_name):
    tables_df = spark.sql(f"SHOW TABLES IN {schema_name}")
    proddta_tables = tables_df.filter(col("tableName").like("%erp_glbl%")) \
                              .filter(~col("tableName").like("%bckup%")) \
                              .filter(~col("tableName").like("%bkp%")) \
                              .filter(~col("tableName").like("%bck%")) \
                              .filter(~col("tableName").like("%dummy%")) \
                              .filter(~col("tableName").like("%test%")) \
                              .filter(~col("tableName").like("%phase%")) \
                              .select("tableName") \
                              .collect()
    return [f"{schema_name}.{row['tableName']}" for row in proddta_tables]

# Use the function to get the list of tables
tables = find_proddta_tables("gms_us_hub")

# Function to check if a table has data from today and extract distinct values
def check_table(table_name):
    df = spark.table(table_name)
    if "AUD_LD_DTS" in df.columns:
        if df.filter(to_date(col("AUD_LD_DTS")) == current_date()).count() > 0:
            status = "Updated Today"
        else:
            status = "Stale Data"
    else:
        status = "Column Not Found"
    
    if "AUD_FILE_NM" in df.columns:
        distinct_values = df.select(regexp_extract(col("AUD_FILE_NM"), 'PRODDTA/([^/]+)/', 1).alias("extracted_value")).distinct().collect()
        extracted_values = [row["extracted_value"] for row in distinct_values if row["extracted_value"]]
        extracted_value = extracted_values[0] if extracted_values else "N/A"
    else:
        extracted_value = "N/A"
    
    return (table_name, status, extracted_value)

# Check each table and collect results
hub_erp_results = [check_table(table) for table in tables]

# Convert results to a DataFrame for display
erp_results_df = spark.createDataFrame(hub_erp_results, ["table_name", "status_hub", "extracted_value"])
display(erp_results_df)

from pyspark.sql.functions import expr

joined_df = erp_results_df.join(
    results_df,
    expr("table_name like concat('%', extracted_value, '%')"),
    "left"
).select(
    erp_results_df["table_name"],
    erp_results_df["status_hub"],
    results_df["table_name_lake"],
    results_df["status_lake"]
)

display(joined_df)

# which JDE tables in hub are up to date 

In [0]:
%python
from pyspark.sql.functions import lower, col, expr

joined_df = erp_results_df.join(
    results_df,
    expr("lower(table_name_lake) like concat('%', lower(extracted_value), '%')"),
    "left"
)

display(joined_df)

In [0]:
select distinct AUD_FILE_NM, REGEXP_EXTRACT(AUD_FILE_NM, 'PRODDTA/([^/]+)/', 1) AS extracted_string from gms_us_hub.ref_plantcode_masterdata_erp_glbl
where AUD_FILE_NM like '%PRODDTA%'
-- limit 100

In [0]:
select distinct REGEXP_EXTRACT(AUD_FILE_NM, 'PRODDTA/([^/]+)/', 1) AS extracted_string from gms_us_hub.txn_stockmovement_erp_glbl
where AUD_FILE_NM like '%PRODDTA%'
-- limit 100

# sql solution - not enough rights to execute

In [0]:
SELECT table_name, last_altered
FROM information_schema.tables
WHERE table_schema = 'gms_us_lake'
  AND table_name LIKE '%proddta%'
  AND DATE(last_altered) = '2025-06-13';

In [0]:
SELECT table_name, last_altered
FROM information_schema.tables
WHERE table_schema = 'gms_us_lake'
  AND table_name LIKE '%jdf_proddta%'
  AND DATE(last_altered) != '2025-06-13';