In [1]:
import boto3
import pandas as pd
import pyarrow.parquet as pq
import io

In [3]:
def read_profiles(bucket: str, source: str) -> pd.DataFrame:
    """
    Reads all non-empty Parquet profiling files for a given source from S3.

    Args:
        bucket (str): S3 bucket name
        source (str): Source name (e.g., "payments")

    Returns:
        pd.DataFrame: Concatenated DataFrame of all profiling files
    """
    prefix = f"profiling/source={source}/"
    s3 = boto3.client("s3")

    # List objects in folder
    objects = s3.list_objects_v2(Bucket=bucket, Prefix=prefix).get("Contents", [])
    
    # Filter out empty files
    objects = [obj for obj in objects if obj["Size"] > 0]

    if not objects:
        print(f"No non-empty profiling files found for source: {source}")
        return pd.DataFrame()

    dfs = []
    for obj in objects:
        obj_data = s3.get_object(Bucket=bucket, Key=obj["Key"])
        table = pq.read_table(io.BytesIO(obj_data["Body"].read()))
        dfs.append(table.to_pandas())

    df = pd.concat(dfs, ignore_index=True)
   
    return df

In [4]:
sources = ["payments",  "merchant", "device", "customer_behavior", "fraud_timing"]
bucket = "danske-bank-project-metadata"

source_columns = {}


all_profiles = []

for source in sources:
    df = read_profiles(bucket, source)
    if not df.empty:
        source_columns[source] = df["column_name"].unique()  
        all_profiles.append(df)
df_profiles = pd.concat(all_profiles, ignore_index=True)


In [None]:
# standarization of column names
df_profiles.column_name.unique()

<ArrowStringArray>
[                       'tx_id',                   'event_time',
               'sender_account',             'receiver_account',
                       'amount',              'payment_channel',
                     'location',                'source_system',
          'ingestion_timestamp',                           'ID',
                     'Category',                         'Type',
                       'Amount',                     'Location',
                       'device',                      'network',
               'transaction_id',          'avg_amount_last_24h',
            'tx_count_last_24h',           'max_velocity_score',
          'mean_velocity_score',           'min_velocity_score',
        'avg_geo_anomaly_score', 'avg_spending_deviation_score',
                    'timestamp',                     'is_fraud',
                   'fraud_type',                  'time_of_day',
                  'day_of_week']
Length: 29, dtype: str

In [8]:
#data types
df_profiles.data_type.unique()

<ArrowStringArray>
[                                       'string',
                                     'timestamp',
                                        'double',
 'struct<device_hash:string,device_used:string>',
                     'struct<ip_address:string>',
                                           'int',
                                       'boolean']
Length: 7, dtype: str

In [10]:
#I can see ther is some prblematic data types so lets see where hey are:
df_structs = df_profiles[df_profiles["data_type"].str.startswith("struct<")]
df_structs

Unnamed: 0,source,column_name,metric,metric_value,data_type,ingestion_timestamp,profiling_timestamp
64,device,device,row_count,5000000.0,"struct<device_hash:string,device_used:string>",2026-01-21 22:07:38.114671,2026-01-21 22:07:38.114671
65,device,device,null_count,0.0,"struct<device_hash:string,device_used:string>",2026-01-21 22:07:38.114671,2026-01-21 22:07:38.114671
66,device,device,null_pct,0.0,"struct<device_hash:string,device_used:string>",2026-01-21 22:07:38.114671,2026-01-21 22:07:38.114671
67,device,device,distinct_count,4667869.0,"struct<device_hash:string,device_used:string>",2026-01-21 22:07:38.114671,2026-01-21 22:07:38.114671
68,device,network,row_count,5000000.0,struct<ip_address:string>,2026-01-21 22:07:38.114671,2026-01-21 22:07:38.114671
69,device,network,null_count,0.0,struct<ip_address:string>,2026-01-21 22:07:38.114671,2026-01-21 22:07:38.114671
70,device,network,null_pct,0.0,struct<ip_address:string>,2026-01-21 22:07:38.114671,2026-01-21 22:07:38.114671
71,device,network,distinct_count,4997068.0,struct<ip_address:string>,2026-01-21 22:07:38.114671,2026-01-21 22:07:38.114671


In [16]:
# now lets see the row counts
df_row_counts = (
    df_profiles
    .query("metric == 'row_count'")
    .groupby("source", as_index=False)["metric_value"]
    .max()
    .rename(columns={"metric_value": "row_count"})
)

df_row_counts

Unnamed: 0,source,row_count
0,customer_behavior,896513.0
1,device,5000000.0
2,fraud_timing,5000000.0
3,merchant,5000000.0
4,payments,5000000.0


In [17]:
#finnlay nulls > 0:
df_null_issues = (
    df_profiles[
        (df_profiles["metric"].isin(["null_count", "null_pct"])) &
        (df_profiles["metric_value"] > 0)
    ]
    [["source", "column_name", "metric", "metric_value"]]
    .assign(metric_value=lambda df: df["metric_value"].round(1))
    .sort_values(["source", "column_name", "metric"])
)

df_null_issues


Unnamed: 0,source,column_name,metric,metric_value
149,fraud_timing,day_of_week,null_count,3.0
150,fraud_timing,day_of_week,null_pct,0.0
141,fraud_timing,fraud_type,null_count,4820447.0
142,fraud_timing,fraud_type,null_pct,96.4
133,fraud_timing,location,null_count,50470.0
134,fraud_timing,location,null_pct,1.0
129,fraud_timing,timestamp,null_count,3.0
130,fraud_timing,timestamp,null_pct,0.0
