In [40]:
df=spark.read.format('csv').option('headers','false').load('abfss://Edureka_Fabric_Trail@onelake.dfs.fabric.microsoft.com/spark_study.Lakehouse/Files/raw')
df_withfileName=df.withColumn('FileName',df['_metadata.file_name'])
display(df_withfileName)

StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 42, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, bcaed5d2-8368-4812-8ea7-458281395f54)

In [55]:
from pyspark.sql.functions import window,dense_rank,col,row_number,collect_list,size,when,coalesce,lit
from pyspark.sql.window import Window


window_spec=Window.partitionBy(col('FileName')).orderBy('FileName')
df_ranked=df_withfileName.withColumn('Rnk',row_number().over(window_spec))
df_first=df_ranked.filter('Rnk=1')
df_withoutHeaders=df_ranked.filter('Rnk!=1')
display(df_first)
display(df_withoutHeaders)

StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 57, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 741fbf2e-5374-4402-bc5b-631f586a50f4)

SynapseWidget(Synapse.DataFrame, cf8b33b3-98a0-41ca-a232-68bc15bb89d0)

In [42]:
header_cols = [col for col in df_first.columns if col.startswith("_c")]
print(header_cols)

StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 44, Finished, Available, Finished)

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5']


In [43]:
expr_parts = [f"'{i}', {col}" for i, col in enumerate(header_cols)]
stack_expr = f"stack({len(header_cols)}, " + ", ".join(expr_parts) + ") as (SrcPosition, SrcName)"

print(expr_parts)
print(stack_expr)

StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 45, Finished, Available, Finished)

["'0', _c0", "'1', _c1", "'2', _c2", "'3', _c3", "'4', _c4", "'5', _c5"]
stack(6, '0', _c0, '1', _c1, '2', _c2, '3', _c3, '4', _c4, '5', _c5) as (SrcPosition, SrcName)


In [44]:
df_unpivoted = df_first.selectExpr("FileName", stack_expr)

df_unpivoted = df_unpivoted.withColumn("SrcPosition", col("SrcPosition").cast("int")+1).filter(col('SrcName').isNotNull())

display(df_unpivoted)

StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 46, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c1dd8e08-5798-4aea-96a9-438d47636c6f)

In [45]:
df_mapping = spark.read.format("csv").option("header","true").load("Files/mapping/mapping.csv")
# df now is a Spark DataFrame containing CSV data from "Files/mapping/mapping.csv".
display(df_mapping)

StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 47, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9f5b087f-46dd-4ac8-9dac-e2d9e2756472)

In [46]:
df_join = df_unpivoted.join(
    df_mapping,
    (df_unpivoted.SrcName == df_mapping.ColumnsName) & 
    (df_unpivoted.SrcPosition == df_mapping.Position),
    'left'
)

df_join_correct=df_join.filter(col('Position').isNotNull()).groupBy('FileName').count()
display(df_join_correct)


StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 48, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 40766a74-afdd-46fb-bf66-e31bdf66f355)

In [47]:


df_original_count=df_unpivoted.groupBy('FileName').agg(collect_list('SrcName').alias('Column_List')).withColumn("MatchedCount", size(col("Column_List"))).withColumnRenamed('FileName','OriginalFileName')
display(df_original_count)


StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 49, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 22da0232-2137-4087-9ba2-d2bd155112d8)

In [48]:
df_validated_files=df_original_count.join(df_join_correct,df_original_count.OriginalFileName==df_join_correct.FileName,'Left')

display(df_validated_files)

StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 50, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 89feb342-a4b7-40e5-886b-00f9fc947aed)

In [49]:
df_final=df_validated_files.withColumn('status',when(col('MatchedCount')-coalesce(col('count'),lit(0))==0,'Pass').otherwise('Fail')).select('FileName','Column_List','MatchedCount','Count','status')

display(df_final)

StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 51, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fe489202-0ee2-4a90-a71d-c93fa453a0c6)

In [50]:
from pyspark.sql.functions import col, min, max, avg, count, when, isnan, lit
from pyspark.sql.types import NumericType
from functools import reduce
import os


StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 52, Finished, Available, Finished)

In [51]:
def profile_dataframe(df, file_name):
    profiled_data = []
    schema = df.dtypes

    for col_name, col_type in schema:
        if col_name == "FileName":
            continue  # skip if already exists

        # Base aggregations
        agg_expr = [
            min(col(col_name)).alias("min_value"),
            max(col(col_name)).alias("max_value"),
            count(when(col(col_name).isNull() | isnan(col(col_name)), col_name)).alias("null_count")
        ]

        # Numeric average
        if col_type in ['int', 'bigint', 'double', 'float', 'decimal', 'long', 'short']:
            agg_expr.append(avg(col(col_name)).alias("avg_value"))
        else:
            agg_expr.append(lit(None).alias("avg_value"))

        # Profile
        profiled = df.agg(*agg_expr) \
            .withColumn("FileName", lit(file_name)) \
            .withColumn("column_name", lit(col_name)) \
            .withColumn("data_type", lit(col_type))

        profiled_data.append(profiled)

    return profiled_data


StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 53, Finished, Available, Finished)

In [52]:
from pyspark.sql.functions import col, min, max, avg, count, when, isnan, lit
from functools import reduce

def profile_dataframe(df):
    file_col = "FileName"
    profiled_data = []

    for col_name in df.columns:
        if col_name == file_col:
            continue

        # Get column data type
        dtype = [f.dataType for f in df.schema.fields if f.name == col_name][0]
        dtype_str = dtype.simpleString()

        # Group and profile
        base = df.groupBy(file_col).agg(
            min(col(col_name)).alias("min_value"),
            max(col(col_name)).alias("max_value"),
            avg(col(col_name)).alias("avg_value") if dtype_str in ["int", "double", "float", "long", "decimal", "bigint", "short"] else lit(None).alias("avg_value"),
            count(when(col(col_name).isNull() | isnan(col(col_name)), 1)).alias("null_count"),
            count(col(col_name)).alias("non_null_count"),
            count(lit(1)).alias("total_count")  # Total rows per file
        )

        # Add column metadata
        base = base.withColumn("column_name", lit(col_name))
        base = base.withColumn("data_type", lit(dtype_str))

        profiled_data.append(base)

    return reduce(lambda a, b: a.unionByName(b), profiled_data)


StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 54, Finished, Available, Finished)

In [56]:
result = profile_dataframe(df_withoutHeaders)
display(result.select("FileName", "column_name", "data_type", "min_value", "max_value", "avg_value", "null_count",'Total_Count'))


StatementMeta(, 8d287389-49ae-41be-aafa-a92a2c0d5c2d, 58, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7ba4843f-27d3-42cb-9097-ca315ad18987)