In [10]:
pip install gcsfs

Collecting gcsfs
  Downloading gcsfs-2023.1.0-py2.py3-none-any.whl (26 kB)
Collecting requests
  Downloading requests-2.31.0-py3-none-any.whl (62 kB)
[K     |████████████████████████████████| 62 kB 1.1 MB/s eta 0:00:011
[?25hCollecting google-auth>=1.2
  Downloading google_auth-2.36.0-py2.py3-none-any.whl (209 kB)
[K     |████████████████████████████████| 209 kB 8.9 MB/s eta 0:00:01
Collecting google-auth-oauthlib
  Downloading google_auth_oauthlib-1.2.1-py2.py3-none-any.whl (24 kB)
Collecting google-cloud-storage
  Downloading google_cloud_storage-2.18.2-py2.py3-none-any.whl (130 kB)
[K     |████████████████████████████████| 130 kB 44.9 MB/s eta 0:00:01
Collecting cachetools<6.0,>=2.0.0
  Downloading cachetools-5.5.0-py3-none-any.whl (9.5 kB)
Collecting rsa<5,>=3.1.4
  Downloading rsa-4.9-py3-none-any.whl (34 kB)
Collecting pyasn1-modules>=0.2.1
  Downloading pyasn1_modules-0.3.0-py2.py3-none-any.whl (181 kB)
[K     |████████████████████████████████| 181 kB 47.2 MB/s eta 0:00:01


In [8]:
pip install pandas pyarrow s3fs --user

Collecting s3fs
  Using cached s3fs-2023.1.0-py3-none-any.whl (27 kB)
Collecting aiobotocore~=2.4.2
  Using cached aiobotocore-2.4.2-py3-none-any.whl (66 kB)
Installing collected packages: aiobotocore, s3fs
Successfully installed aiobotocore-2.4.2 s3fs-2023.1.0
You should consider upgrading via the '/opt/python/python3.7.9/bin/python3.7 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [57]:
import pandas as pd

In [58]:
import pyarrow.parquet as pq

In [59]:
from pyspark.sql import SparkSession, Row

In [60]:
spark = SparkSession.builder \
    .appName("DataQualityEngineer") \
    .getOrCreate()

In [61]:
gsi_airports_parquet_path = "gs://iskldl04-dqelearn-local-bucket/raw/airports/part-00001-a9aee747-6f56-4317-bf6b-075fe3b3ed5f-c000.snappy.parquet" 

In [62]:
airports_df = pd.read_parquet(gsi_airports_parquet_path, engine='pyarrow')

In [63]:
gsi_carriers_parquet_path = "gs://iskldl04-dqelearn-local-bucket/raw/carriers/part-00001-366e67ad-4fd6-41cd-af99-1ff7b3e314db-c000.snappy.parquet" 

In [64]:
carriers_df = pd.read_parquet(gsi_carriers_parquet_path, engine='pyarrow')

In [65]:
gsi_flights_parquet_path = "gs://iskldl04-dqelearn-local-bucket/raw/flights/part-00001-55c5be74-a9db-4265-8f2c-bceb8279269e-c000.snappy.parquet" 

In [66]:
flights_df = pd.read_parquet(gsi_flights_parquet_path, engine='pyarrow')

In [67]:
def validate_unique_by_pk(parquet_file_path, pk_columns, engine='pyarrow'):
    df = pd.read_parquet(parquet_file_path, engine=engine)
    duplicates = df[df.duplicated(subset=pk_columns, keep=False)]
    is_unique = duplicates.empty 
    return is_unique, duplicates

In [68]:
def validate_completeness_by_non_nullable(parquet_file_path, non_nullable_columns, engine='pyarrow'):
    df = pd.read_parquet(parquet_file_path, engine=engine)
    missing_values = df[non_nullable_columns].isnull().sum()
    rows_with_missing_values = df[df[non_nullable_columns].isnull().any(axis=1)]
    is_complete = rows_with_missing_values.empty
    return is_complete, rows_with_missing_values

In [69]:
def check_column_consistency_between_csv_and_parquet(source_csv_path, target_parquet_path, column_name, engine='pyarrow'):
    # Step 1: Read the source CSV and target Parquet files into Pandas DataFrames
    source_df = pd.read_csv(source_csv_path)
    target_df = pd.read_parquet(target_parquet_path, engine=engine)
    
    # Ensure the column exists in both DataFrames
    if column_name not in source_df.columns:
        raise ValueError(f"Column '{column_name}' not found in the source CSV file.")
    if column_name not in target_df.columns:
        raise ValueError(f"Column '{column_name}' not found in the target Parquet file.")
    
    # Step 2: Check for missing values in both source and target datasets
    source_missing_values = source_df[column_name].isnull().sum()
    target_missing_values = target_df[column_name].isnull().sum()
    
    if source_missing_values > 0 or target_missing_values > 0:
        return False, f"Missing values detected: Missing values in Source: {source_missing_values}, Missing values in Target: {target_missing_values}"
    
    # Step 3: Check if the data types match between the source and target columns
    source_dtype = source_df[column_name].dtype
    target_dtype = target_df[column_name].dtype
    
    if source_dtype != target_dtype:
        return False, f"Data type mismatch: Source data type: {source_dtype}, Target data type: {target_dtype}"
    
    # Step 4: Check if the values in the column match between source and target
    if not source_df[column_name].equals(target_df[column_name]):
        return False, "Column values do not match between source and target datasets."
    
    # If all checks pass
    return True, "The column is consistent between source and target datasets."

In [70]:
dq_results = []

In [71]:
primary_key_columns = ['code', 'description'] 

In [72]:
is_unique, duplicates = validate_unique_by_pk(gsi_carriers_parquet_path, primary_key_columns)

In [73]:
if is_unique:
    dq_results.append({
        'Table': 'Carrier',
        'DQ Check': 'Uniqueness',
        'Column': primary_key_columns,
        'Status': 'Passed',
        'Bad Data': ''
    })
else:
    dq_results.append({
        'Table': 'Carrier',
        'DQ Check': 'Uniqueness',
        'Column': primary_key_columns,
        'Status': 'Failed',
        'Bad Data': duplicates
    })

In [74]:
non_nullable_columns = ['iata', 'airport', 'city', 'state', 'country', 'lat', 'longt'] 

In [75]:
is_complete, rows_with_missing_values = validate_completeness_by_non_nullable(gsi_airports_parquet_path, non_nullable_columns)

In [76]:
if is_complete:
    dq_results.append({
        'Table': 'Airports',
        'DQ Check': 'Completeness',
        'Column': non_nullable_columns,
        'Status': 'Passed',
        'Bad Data': ''
    })
else:
    dq_results.append({
        'Table': 'Airports',
        'DQ Check': 'Completeness',
        'Column': non_nullable_columns,
        'Status': 'Failed',
        'Bad Data': rows_with_missing_values
    })

In [77]:
source_csv_flights_path = "gs://iskldl04-dqelearn-local-bucket/source/flights.csv" 
target_parquet_path = gsi_flights_parquet_path
column_name = 'Dest'

In [78]:
is_consistent, message = check_column_consistency_between_csv_and_parquet(source_csv_flights_path, target_parquet_path, column_name)

In [79]:
if is_consistent:
    dq_results.append({
        'Table': 'Flights',
        'DQ Check': 'Consistency',
        'Column': column_name,
        'Status': 'Passed',
        'Bad Data': ''
    })
else:
    dq_results.append({
        'Table': 'Flights',
        'DQ Check': 'Consistency',
        'Column': column_name,
        'Status': 'Failed',
        'Bad Data': message
    })

In [80]:
column_name = 'ArrDelay'

In [81]:
is_consistent, message = check_column_consistency_between_csv_and_parquet(source_csv_flights_path, target_parquet_path, column_name)

In [82]:
if is_consistent:
    dq_results.append({
        'Table': 'Flights',
        'DQ Check': 'Consistency',
        'Column': column_name,
        'Status': 'Passed',
        'Bad Data': ''
    })
else:
    dq_results.append({
        'Table': 'Flights',
        'DQ Check': 'Consistency',
        'Column': column_name,
        'Status': 'Failed',
        'Bad Data': message
    })

In [83]:
dq_summary_df = pd.DataFrame(dq_results)

In [84]:
dq_summary_df

Unnamed: 0,Table,DQ Check,Column,Status,Bad Data
0,Carrier,Uniqueness,"[code, description]",Failed,code description 735 ZUQ Zulian...
1,Airports,Completeness,"[iata, airport, city, state, country, lat, longt]",Passed,
2,Flights,Consistency,Dest,Failed,Missing values detected: Missing values in Sou...
3,Flights,Consistency,ArrDelay,Failed,Missing values detected: Missing values in Sou...


In [85]:
dq_summary_df.to_csv('dq:report.csv', index=False)