In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession \
    .builder\
    .appName("SparkSession") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext

In [2]:
def choose_df_read(path):
    '''function to choose read method'''
    if path.endswith('csv'):
        df = spark.read.options(header='True', inferSchema='True', delimiter=',').csv(path)
    else:
        df = spark.read.parquet(path)
    return df

In [3]:
import pandas as pd
from pyspark.sql.functions import col
pd.set_option("display.precision", 11)
pd.set_option('display.max_colwidth', None)

def func_completeness_by_sum(table, path_source, path_target, list_columns, list_pk):
    '''if column name differs from source provide columns names as a tuple where first name from source and second from target''' 
    
    source_df_spark = choose_df_read(path_source)
    target_df_spark = choose_df_read(path_target)
    

    
    result_list = []
    
    for column in list_columns:
        if isinstance(column, tuple):
            source_df_spark = source_df_spark.withColumn(column[0],col(column[0]).cast('double'))
            source_df = source_df_spark.toPandas()
            
            target_df_spark = target_df_spark.withColumn(column[1],col(column[1]).cast('double'))
            target_df = target_df_spark.toPandas()
            
            sum_source = source_df[column[0]].sum()
            sum_target = target_df[column[1]].sum()
        else:
            source_df_spark = source_df_spark.withColumn(column,col(column).cast('double'))
            source_df = source_df_spark.toPandas()
            
            target_df_spark = target_df_spark.withColumn(column,col(column).cast('double'))
            target_df = target_df_spark.toPandas()
            
            sum_source = source_df[column].sum()
            sum_target = target_df[column].sum()
        
        bad_data = []
        result_check = sum_source == sum_target
        if sum_source != sum_target:
            merged_df = pd.merge(source_df, target_df, on=list_pk, suffixes=('_source', '_target'))
            if isinstance(column, tuple):
                name_source = column[0]
                name_target = column[1]
            else:
                name_source = column + '_source'
                name_target = column + '_target'
            filtered_df = merged_df[merged_df[name_source] != merged_df[name_target]]
            list_to_displ = list_pk
            list_to_displ.append(name_source)
            list_to_displ.append(name_target)
            bad_data = filtered_df[list_to_displ].values.tolist()
        
        result_list.append([table, 'Completeness', column, result_check, bad_data])
        
    return result_list

In [4]:
print(func_completeness_by_sum('Airports', "source/airports.csv", "raw/airports/*.parquet",  ['lat', ('long', 'longt')], ['iata', 'airport'] ))

                                                                                

[['Airports', 'Completeness', 'lat', True, []], ['Airports', 'Completeness', ('long', 'longt'), True, []]]


In [5]:
from pyspark.sql.functions import sum, col, substring, lpad, when

def func_validity_by_time_range(table, path_to_data, list_columns):
    
    df = choose_df_read(path_to_data)

    result_list = []
    
    for column in list_columns:
        
        modified_df = df.withColumn(
                                            column,
                                            when((col(column).isNull()) | (col(column) == ''), None)
                                            .otherwise(lpad(col(column), 4, '0'))
                                            )
        invalid_data = modified_df.filter(
            ~((substring(col(column), 1, 2).cast('integer').between(0, 23)) &
              (substring(col(column), 3, 4).cast('integer').between(0, 59)))
            ).select(column)
        
        result_check = True if invalid_data.count() == 0 else False
        
        list_bad_data = invalid_data.select(column).rdd.flatMap(lambda x: x).collect()
        
        result_list.append([table, 'Validity', column, result_check, list_bad_data])
    
    return result_list

In [6]:
def uniqueness_by_PK(table, path_to_data, list_columns, flag_combination = False):
    
    df = choose_df_read(path_to_data)
        
    result_list = []
    if flag_combination:
        unique_by_combintation = df.groupBy(list_columns).count()
        non_unique = unique_by_combintation.filter(col('count') > 1)
        result_check = True if non_unique.count() == 0 else False
        list_bad_data = [list(row) for row in non_unique.select(list_columns).collect()]
        result_list.append([table, 'Uniqueness by combination', list_columns, result_check, list_bad_data])
    else: 
        # check by uniqueness by combination of PK
        for column in list_columns:
            unique = df.groupBy(column).count()
            non_unique = unique.filter(col('count') > 1)
            result_check = True if non_unique.count() == 0 else False
        
            list_bad_data = non_unique.select(column).rdd.flatMap(lambda x: x).collect()
        
            result_list.append([table, 'Uniqueness', column, result_check, list_bad_data])
            
    return result_list

In [7]:
def consistency_for_CancellationCode(table, path_to_data):
    
    df = choose_df_read(path_to_data)
        
    result_list = []
    inconsistent_cancellation = df.filter(
        (col('Cancelled') == 0) & ~col('CancellationCode').isin('') |
        (col('Cancelled') == 1) & ~col('CancellationCode').isin('A', 'B', 'C')
    )
    
    result_check = True if inconsistent_cancellation.count() == 0 else False
    list_bad_data = [list(row) for row in inconsistent_cancellation.select('Cancelled', 'CancellationCode').collect()]
    result_list.append([table, 'Consistency check for CancellationCode', 'CancellationCode', result_check, list_bad_data])
    return result_list

In [8]:
result_list = []
result_list.append(func_completeness_by_sum('Airports', "source/airports.csv", "raw/airports/*.parquet",  ['lat', ('long', 'longt')], ['iata', 'airport'] ))
result_list.append(func_validity_by_time_range('Flights', "raw/flights/*.parquet", ['ArrTime', 'DepTime']))
result_list.append(uniqueness_by_PK('Carrier', "raw/carriers/*.parquet", ['Code', 'Description']))
result_list.append(uniqueness_by_PK('Carrier', "raw/carriers/*.parquet", ['Code', 'Description'], True))
result_list.append(consistency_for_CancellationCode('Flights', "raw/flights/*.parquet"))

23/05/25 07:23:59 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [9]:
import pandas as pd

flattened_list = []
for sublist in result_list:
    for item in sublist:
        flattened_list.append(item)

df = pd.DataFrame(flattened_list, columns=['Table', 'DQ check', 'Column', 'Status', 'Bad Data'])

df.insert(0, '#', range(1, len(df) + 1))

df.set_index('#', inplace=True)

# Display the resulting DataFrame
print(df)


      Table                                DQ check               Column  \
#                                                                          
1  Airports                            Completeness                  lat   
2  Airports                            Completeness        (long, longt)   
3   Flights                                Validity              ArrTime   
4   Flights                                Validity              DepTime   
5   Carrier                              Uniqueness                 Code   
6   Carrier                              Uniqueness          Description   
7   Carrier               Uniqueness by combination  [Code, Description]   
8   Flights  Consistency check for CancellationCode     CancellationCode   

   Status  \
#           
1    True   
2    True   
3   False   
4   False   
5   False   
6   False   
7   False   
8   False   

                                                                            Bad Data  
#                    