# Data Quality - Kaggle Datasets

## Objectives of the Notebook
The purpose of this notebook is to apply a standardized cleaning and validating data quality for datasets obtained from the Kaggle platform. This is a dataset only for experimentation. Our goal is to create a pipeline that would fit any Kaggle dataset. While reviewing different projects and their associated datasets, this one was chosen manually after a search using the Kaggle API, where the best datasets were filtered based on user votes.


## Data Quality (DQ)
Refers to the degree to which data is suitable for its intended use. In other words, data must be accurate, complete, relevant, and reliable in the context in which it is used. Poor data quality can seriously affect the results of analyses, predictive models, and business decisions.

This notebook implements a cleaning and validation pipeline based on the framework proposed in the paper **“The Five Facets of Data Quality Assessment,”** which organizes data quality assessment through:

5 Key Dimensions of DQ:

- Accuracy: Does the data reflect reality?
- Representativeness: Is it representative of the domain it models?
- Completeness: Is data missing or well represented?
- Relevance: Is it useful for the stated objective?

5 Facets for evaluation:

- Data: analysis of digital data and its metadata.
- Source: traceability and quality of the dataset's origin.
- System: technical support, reproducibility, and compliance.
- Task: relevance and adaptation to the specific problem.
- Human: user perception, intention, and needs.


## Applying the Five Facets to the Analysis of Kaggle Datasets

After reviewing the approach proposed in the paper "The Five Facets of Data Quality Assessment" and comparing it with the types of datasets available on Kaggle, We following this adaptation for our case, focused on building a standardized data quality analysis framework:

1. **Data Facet:**
This is the most relevant facet for our use case. We can analyze structure, data types, missing values, schema consistency, and placeholder values. The most challenging part is evaluating accuracy against reference data, since Kaggle datasets typically don't come with ground-truth benchmarks. However, all other aspects can be assessed in an automated and scalable way.

2. **Source Facet:**
We can extract the dataset author's name and check if a source is provided in the dataset description. However, we cannot access version history or transformation lineage through the Kaggle API. Traceability is limited to what is manually documented.

3. **System Facet:**
Kaggle maintains the original state of the dataset, allowing us to assume reproducibility. While we don't have access to low-level storage or file system metadata, this doesn't significantly affect our analysis.

4. **Task Facet:**
If the dataset supports multiple tasks or lacks a clearly defined purpose, this facet becomes harder to apply automatically.

5. **Human Facet:**
This facet depends on knowing the end users, their domain expertise, and the usage context, information we can't access automatically. This would require manual input and should be treated as part of the metadata.

## Data Quality Metric Thresholds

There is currently no universally accepted standard for assessing data quality using metrics such as missing values, duplicates, or cardinality. The literature consistently emphasizes that data quality thresholds must be defined based on the specific context, purpose, and domain of use. Papers such as Measuring Data Quality in Information Systems Research, The Challenges of Data Quality and Data Quality Assessment in the Big Data Era, and Increasing Trust in Real-World Evidence Through Evaluation of Observational Data Quality support this view, arguing that rigid, one-size-fits-all thresholds are inadequate. Instead, they advocate for flexible, context-driven frameworks and transparent reporting to ensure that data quality assessments are meaningful and aligned with the goals of the analysis.

# Set Project and Connect to the Kaggle API and Download the File

In [None]:
pip install kaggle

In [None]:
import os
import re
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from pathlib import Path
import math
from sklearn.impute import SimpleImputer

from kaggle.api.kaggle_api_extended import KaggleApi

In [None]:
# Set your Kaggle credentials:
os.environ['KAGGLE_USERNAME'] = "abnerasteteh"
os.environ['KAGGLE_KEY'] = "5dec29fba975d4119a37855b3653d27d"

# Authentication
api = KaggleApi()
api.authenticate()

In [None]:
# Data Quality Configuration
WEIGHTS = {
    "completeness": 0.45,
    "uniqueness": 0.25,
    "outliers": 0.30
}
ALERT_THRESHOLD = 75

In [None]:
# List of Kaggle projects
kernel_refs = [
    #"joelknapp/student-performance-analysis/",
    "aremoto/retail-sales-forecast/",
]

# Creation of the base directory and complementary functions
base_dir = "kaggle_notebooks"
os.makedirs(base_dir, exist_ok=True)

def safe_name(ref):
    return re.sub(r'[^\w\-]', '_', ref)

In [None]:
def convert_for_json(obj):
    """Convert Python objects to JSON serializable format"""
    if isinstance(obj, np.bool_):
        return bool(obj)
    elif isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.floating):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, (pd.Timestamp, datetime)):
        return obj.isoformat()
    elif hasattr(obj, 'item'):
        return obj.item()
    elif hasattr(obj, 'tolist'):
        return obj.tolist()
    elif isinstance(obj, dict):
        return {k: convert_for_json(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_for_json(item) for item in obj]
    return obj

def safe_json_dump(data, file_path, **kwargs):
    """Safely dump data to JSON with proper type conversion"""
    converted_data = convert_for_json(data)
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(converted_data, f, **kwargs)

# Data Facet

In [None]:
def extract_csv_metadata(file_path):
   """Extract comprehensive metadata and quality metrics from a CSV file."""
   file_path = Path(file_path)
   metadata = {
       'filename': file_path.name,
       'file_path': str(file_path),
       'analysis_timestamp': datetime.now().isoformat(),
       'file_exists': file_path.exists(),
       'file_info': {},
       'csv_schema': {},
       'data_quality': {},
       'errors': []
   }

   if not file_path.exists():
       metadata['errors'].append(f"File not found: {file_path}")
       return metadata

   try:
       # Extract file system metadata
       stat_info = file_path.stat()
       metadata['file_info'] = {
           'size_bytes': stat_info.st_size,
           'size_mb': round(stat_info.st_size / (1024*1024), 2),
           'creation_time': datetime.fromtimestamp(stat_info.st_ctime).isoformat(),
           'modification_time': datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
           'extension': file_path.suffix.lower(),
       }

       try:
           # Read and analyze CSV data
           df_full = pd.read_csv(file_path)

           # Extract schema information
           metadata['csv_schema'] = {
               'total_rows': len(df_full),
               'total_columns': len(df_full.columns),
               'columns': list(df_full.columns),
               'column_types': df_full.dtypes.astype(str).to_dict(),
               'memory_usage_mb': round(df_full.memory_usage(deep=True).sum() / (1024*1024), 2),
               'shape': df_full.shape
           }

           # Calculate data quality metrics
           metadata['data_quality'] = {
               'null_values_per_column': df_full.isnull().sum().to_dict(),
               'null_percentage_per_column': (df_full.isnull().sum() / len(df_full) * 100).round(2).to_dict(),
               'total_null_values': int(df_full.isnull().sum().sum()),
               'duplicate_rows': int(df_full.duplicated().sum()),
               'unique_values_per_column': df_full.nunique().to_dict(),
               'completeness_score': round((1 - df_full.isnull().sum().sum() / df_full.size) * 100, 2)
           }

           # Extract numeric statistics if numeric columns exist
           numeric_columns = df_full.select_dtypes(include=['number']).columns
           if len(numeric_columns) > 0:
               numeric_stats = df_full[numeric_columns].describe().to_dict()
               metadata['numeric_statistics'] = convert_for_json(numeric_stats)

       except Exception as e:
           metadata['errors'].append(f"Error reading CSV: {str(e)}")

   except Exception as e:
       metadata['errors'].append(f"Error accessing file: {str(e)}")

   return metadata

def analyze_multiple_csvs(directory_path):
   """Analyze all CSV files in a given directory and generate a comprehensive report."""
   directory_path = Path(directory_path)
   analysis_report = {
       'directory': str(directory_path),
       'analysis_timestamp': datetime.now().isoformat(),
       'csv_files_found': 0,
       'files_analysis': {},
       'summary': {'errors': []}
   }

   if not directory_path.exists():
       analysis_report['summary']['errors'].append(f"Directory does not exist: {directory_path}")
       return analysis_report

   # Find and analyze all CSV files
   csv_files = list(directory_path.glob('*.csv'))
   analysis_report['csv_files_found'] = len(csv_files)

   for csv_file in csv_files:
       print(f"Analyzing: {csv_file.name}")
       file_metadata = extract_csv_metadata(csv_file)
       analysis_report['files_analysis'][csv_file.name] = file_metadata

   return analysis_report

def calculate_completeness(null_percentage_per_column):
   """Calculate data completeness score based on null value percentages."""
   return 100 - np.mean(list(null_percentage_per_column.values()))

def calculate_uniqueness(total_rows, duplicate_rows):
   """Calculate data uniqueness score based on duplicate row count."""
   return max(0, 100 - (duplicate_rows / total_rows * 100))

def calculate_outliers(data):
   """Calculate outlier penalty score using 3-sigma rule for numeric columns."""
   penalties = []
   for col, stats in data.get("numeric_statistics", {}).items():
       mean = stats.get("mean", 0)
       std = stats.get("std", 0)
       col_min = stats.get("min", mean)
       col_max = stats.get("max", mean)
       # Apply 3-sigma rule for outlier detection
       if std > 0:
           if col_min < mean - 3 * std or col_max > mean + 3 * std:
               penalties.append(1)
           else:
               penalties.append(0)
   return 100 - (np.mean(penalties) * 100) if penalties else 100

def calculate_global_score(dataset):
   """Calculate overall data quality score using weighted metrics."""
   completeness = calculate_completeness(dataset["data_quality"]["null_percentage_per_column"])
   uniqueness = calculate_uniqueness(dataset["csv_schema"]["total_rows"], dataset["data_quality"]["duplicate_rows"])
   outliers = calculate_outliers(dataset)

   # Calculate weighted global score
   global_score = (
       completeness * WEIGHTS["completeness"] +
       uniqueness * WEIGHTS["uniqueness"] +
       outliers * WEIGHTS["outliers"]
   )

   return {
       "completeness": completeness,
       "uniqueness": uniqueness,
       "outliers": outliers,
       "global_score": global_score
   }

def clean_dataset(file_path, output_dir, lower_quantile=0.05, upper_quantile=0.95, max_missing_frac=0.1):
   """Clean dataset by handling missing values, duplicates, and outliers."""
   df = pd.read_csv(file_path)
   numeric_cols = df.select_dtypes(include=[np.number]).columns

   # Handle missing values in numeric columns using median imputation
   if len(numeric_cols) > 0:
       imputer = SimpleImputer(strategy='median')
       df[numeric_cols] = imputer.fit_transform(df[numeric_cols])

   # Handle missing values in categorical columns
   categorical_cols = df.select_dtypes(include=['object']).columns
   for col in categorical_cols:
       missing_frac = df[col].isna().mean()
       if 0 < missing_frac <= max_missing_frac:
           df = df[df[col].notna()]

   # Remove duplicate rows
   df = df.drop_duplicates()

   # Handle outliers using quantile-based clipping
   for col in numeric_cols:
       lower = df[col].quantile(lower_quantile)
       upper = df[col].quantile(upper_quantile)
       df[col] = np.clip(df[col], lower, upper)

   # Save cleaned dataset
   output_dir.mkdir(exist_ok=True)
   output_path = output_dir / f"clean_{Path(file_path).name}"
   df.to_csv(output_path, index=False)
   print(f"Cleaned dataset saved at {output_path}")
   return output_path

def plot_histograms(file_data, save_dir=None):
    """Generate histogram plots for all numeric columns in a dataset with statistical annotations."""
    file_path = file_data.get('file_path')
    filename = file_data.get('filename', 'File')

    if not file_path or not Path(file_path).exists():
        print(f"Cannot open {filename} for plotting")
        return

    try:
        df_full = pd.read_csv(file_path)
        
        # Select plotting style
        available_styles = plt.style.available
        preferred_styles = ['seaborn', 'ggplot', 'fivethirtyeight', 'bmh']
        selected_style = 'classic'

        for style in preferred_styles:
            if style in available_styles:
                selected_style = style
                break

        plt.style.use(selected_style)

        # Get numeric columns for plotting
        numeric_columns = df_full.select_dtypes(include=['number']).columns
        n = len(numeric_columns)
        if n == 0:
            print("No numeric columns to plot")
            return

        # Set up subplot grid
        colors = plt.cm.tab10.colors
        cols = min(4, n)
        rows = math.ceil(n / cols)
        fig, axes = plt.subplots(rows, cols, figsize=(cols * 6, rows * 4.5))
        axes = axes.flatten() if n > 1 else [axes]

        # Create histogram for each numeric column
        for i, col in enumerate(numeric_columns):
            col_data = df_full[col].dropna()
            unique_vals = col_data.unique()
            num_unique = len(unique_vals)
            bins = num_unique if num_unique <= 15 else 'auto'

            # Calculate statistical measures
            mean_val = col_data.mean()
            median_val = col_data.median()
            std_val = col_data.std()
            skewness = col_data.skew()
            kurt = col_data.kurtosis()

            # Plot histogram
            axes[i].hist(col_data, bins=bins,
                        edgecolor='white',
                        linewidth=1.2,
                        color=colors[i % len(colors)],
                        alpha=0.8)

            # Add statistical lines
            axes[i].axvline(mean_val, color='red', linestyle='--', linewidth=1.5, label=f'Mean: {mean_val:.2f}')
            axes[i].axvline(median_val, color='green', linestyle='--', linewidth=1.5, label=f'Median: {median_val:.2f}')

            # Customize plot appearance
            axes[i].set_title(f"Distribution of {col}", pad=15, fontsize=12, fontweight='bold')
            axes[i].set_xlabel(col, labelpad=10)
            axes[i].set_ylabel('Frequency', labelpad=10)
            axes[i].grid(axis='y', alpha=0.3)

            # Add statistics text box
            stats_text = (f"Mean: {mean_val:.2f}\n"
                        f"Median: {median_val:.2f}\n"
                        f"Std. Dev.: {std_val:.2f}\n"
                        f"Skewness: {skewness:.2f}\n"
                        f"Kurtosis: {kurt:.2f}")

            axes[i].text(0.95, 0.95, stats_text,
                        transform=axes[i].transAxes,
                        ha='right', va='top',
                        bbox=dict(facecolor='white', alpha=0.8, edgecolor='gray'))

            # Adjust x-axis ticks for discrete variables
            if num_unique <= 15:
                axes[i].set_xticks(np.linspace(col_data.min(), col_data.max(), num=min(10, num_unique)))

            axes[i].legend(loc='upper left', bbox_to_anchor=(1, 1))

        # Remove empty subplots
        for j in range(i+1, len(axes)):
            fig.delaxes(axes[j])

        # Add main title and adjust layout
        fig.suptitle(f"Distribution Analysis - {filename}\n(Style: {selected_style})",
                    y=1.02, fontsize=14, fontweight='bold')
        plt.tight_layout()

        # Save plot if directory specified
        if save_dir:
            save_path = save_dir / f"histogram_{Path(filename).stem}.png"
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            print(f"Histogram saved: {save_path}")

        plt.close()

    except Exception as e:
        print(f"Error plotting {filename}: {e}")

# Source Facet

In [None]:
def dataset_evaluation(dataset_name):
   """Evaluate a Kaggle dataset's reliability and gather comprehensive metadata."""
   owner_slug, dataset_slug = dataset_name.split('/')
   ref = f"{owner_slug}/{dataset_slug}"

   # 1. Basic dataset information
   results = api.dataset_list(search=dataset_slug, user=owner_slug)
   dataset_info = None

   for ds in results:
       if ds.ref == ref:
           dataset_info = {
               "dataset_name": ds.title,
               "subtitle": getattr(ds, "subtitle", ""),
               "description": getattr(ds, "description", ""),
               "license": getattr(ds, "license_name", "Unknown"),
               "author": ds.creator_name,
               "kaggle_id": ref,
               "kaggle_url": f"https://www.kaggle.com/datasets/{ref}",
               "total_downloads": ds.download_count,
               "votes": getattr(ds, 'vote_count', 0),
               "is_private": ds.is_private,
               "is_featured": ds.is_featured,
               "usability_rating": getattr(ds, "usabilityRating", None),
               "download_date": datetime.now().date().isoformat(),
               "creation_date": getattr(ds, "creationDate", "Not available"),
               "last_updated": getattr(ds, "lastUpdated", "Not available")
           }
           break

   if not dataset_info:
       raise ValueError(f"Dataset {ref} not found")

   # 2. Author information (reputation and activity)
   def get_all_datasets(user):
       """Retrieve all datasets published by a user across multiple pages."""
       page = 1
       all_datasets = []
       while True:
           try:
               datasets = api.dataset_list(user=user, page=page)
               if not datasets:
                   break
               all_datasets.extend(datasets)
               page += 1
           except:
               break
       return all_datasets

   def get_all_kernels(user):
       """Retrieve all kernels/notebooks published by a user across multiple pages."""
       page = 1
       all_kernels = []
       while True:
           try:
               kernels = api.kernels_list(user=user, page=page)
               if not kernels:
                   break
               all_kernels.extend(kernels)
               page += 1
           except:
               break
       return all_kernels

   def get_user_followers(username):
       """Get user's follower count, following count, and tier information."""
       try:
           user_info = api.user_read(username)
           followers = getattr(user_info, "followerCount", "Not available")
           following = getattr(user_info, "followingCount", "Not available")
           tier = getattr(user_info, "tier", "Not available")
           return followers, following, tier
       except:
           return "Not available", "Not available", "Not available"

   try:
       # Gather comprehensive author statistics
       author_datasets = get_all_datasets(owner_slug)
       total_datasets = len(author_datasets)
       author_notebooks = get_all_kernels(owner_slug)
       total_notebooks = len(author_notebooks)
       followers, following, tier = get_user_followers(owner_slug)

       # Calculate aggregate metrics
       total_downloads = sum(getattr(ds, 'download_count', 0) for ds in author_datasets)
       total_votes = sum(getattr(ds, 'vote_count', 0) for ds in author_datasets)
       notebook_votes = sum(getattr(nb, 'voteCount', 0) for nb in author_notebooks)

       author_stats = {
           "total_datasets": total_datasets,
           "total_notebooks": total_notebooks,
           "total_dataset_downloads": total_downloads,
           "total_dataset_votes": total_votes,
           "total_notebook_votes": notebook_votes,
           "follower_count": followers,
           "following_count": following,
           "author_tier": tier,
           "avg_downloads_per_dataset": round(total_downloads / total_datasets, 2) if total_datasets > 0 else 0,
           "avg_votes_per_dataset": round(total_votes / total_datasets, 2) if total_datasets > 0 else 0
       }

   except Exception as e:
       print(f"Could not retrieve author statistics: {e}")
       author_stats = {"error": "Author information not available"}

   # 3. Notebooks using this dataset
   try:
       # Get notebooks that use this dataset across multiple pages
       notebooks = api.kernels_list(dataset=ref, page_size=100)
       all_notebooks = notebooks.copy()

       for page in range(2, 6):  # Check up to 5 pages
           try:
               more_notebooks = api.kernels_list(dataset=ref, page=page, page_size=100)
               if not more_notebooks:
                   break
               all_notebooks.extend(more_notebooks)
           except:
               break

       # Remove duplicates and sort by votes
       unique_notebooks = []
       seen_refs = set()
       for nb in all_notebooks:
           if nb.ref not in seen_refs:
               unique_notebooks.append(nb)
               seen_refs.add(nb.ref)

       sorted_notebooks = sorted(unique_notebooks, key=lambda x: getattr(x, 'voteCount', 0), reverse=True)
       total_notebooks_using = len(unique_notebooks)

   except Exception as e:
       print(f"Could not retrieve notebooks: {e}")
       total_notebooks_using = 0

   # 4. Dataset versions (traceability)
   try:
       versions = api.dataset_list_versions(ref)
       version_info = {
           "total_versions": len(versions),
           "current_version": versions[0].versionNumber if versions else 1,
           "version_history": []
       }

       # Get details for the 5 most recent versions
       for version in versions[:5]:
           version_info["version_history"].append({
               "version": version.versionNumber,
               "creation_date": getattr(version, "creationDate", "Not available"),
               "status": getattr(version, "status", "Not available")
           })
   except Exception as e:
       print(f"Could not retrieve version information: {e}")
       version_info = {"error": "Version information not available"}

   # 5. Build the reliability assessment with comprehensive criteria
   reliability_assessment = {
       "1_author_info": {
           "author": dataset_info["author"],
           "statistics": author_stats,
           "assessment": "✓ Available" if "error" not in author_stats else "✗ Not available"
       },
       "2_publication_date": {
           "creation_date": dataset_info["creation_date"],
           "last_updated": dataset_info["last_updated"],
           "assessment": "✓ Temporal information available" if dataset_info["creation_date"] != "Not available" else "⚠ Temporal information not available"
       },
       "3_license": {
           "license": dataset_info["license"],
           "assessment": "⚠ Unknown license" if dataset_info["license"] == "Unknown" else f"✓ License: {dataset_info['license']}"
       },
       "4_external_source": {
           "description": dataset_info["description"],
           "assessment": "⚠ No detailed description" if not dataset_info["description"] else "✓ Description available"
       },
       "5_traceability": {
           "versions": version_info,
           "assessment": f"✓ {version_info.get('total_versions', 0)} versions available" if "error" not in version_info else "⚠ No version information"
       },
       "6_description": {
           "title": dataset_info["dataset_name"],
           "subtitle": dataset_info["subtitle"],
           "description": dataset_info["description"],
           "assessment": "✓ Clear title and subtitle" if dataset_info["subtitle"] else "⚠ Limited description"
       },
       "7_community_feedback": {
           "votes": dataset_info.get("votes", 0),
           "downloads": dataset_info.get("total_downloads", 0),
           "featured": dataset_info.get("is_featured", False),
           "total_notebooks": total_notebooks_using,
           "assessment": f"✓ {dataset_info.get('votes', 0)} votes, {dataset_info.get('total_downloads', 0)} downloads"
       }
   }

   return {
       "dataset_info": dataset_info,
       "reliability_assessment": reliability_assessment
   }

In [None]:
def run_data_source_facet_pipeline():
    """Ejecuta el pipeline enfocado en Data Facet y Source Facet"""
    
    for ref in kernel_refs:
        print(f"\n[INFO] ========== Processing Data and Source Facets for: {ref} ==========")
        
        name = safe_name(ref)
        kernel_dir = os.path.join(base_dir, name)
        os.makedirs(kernel_dir, exist_ok=True)

        # Create directory structure
        datasets_dir = Path(kernel_dir) / "datasets"
        datasets_dir.mkdir(exist_ok=True)
        
        cleaned_dir = datasets_dir / "cleaned"
        cleaned_dir.mkdir(exist_ok=True)
        
        reports_dir = Path(kernel_dir) / "reports"
        reports_dir.mkdir(exist_ok=True)

        # *** AGREGADO: Directorio para visualizaciones ***
        visualizations_dir = Path(kernel_dir) / "visualizations"
        visualizations_dir.mkdir(exist_ok=True)

        try:
            # 1. Download kernel metadata
            api.kernels_pull(kernel=ref, path=kernel_dir, metadata=True)
            metadata_path = os.path.join(kernel_dir, "kernel-metadata.json")
            
            if not os.path.exists(metadata_path):
                print(f"[WARN] No metadata found for {ref}")
                continue
                
            with open(metadata_path, "r", encoding="utf-8") as f:
                meta = json.load(f)

            # 2. SOURCE FACET - Process datasets with reliability analysis
            dataset_sources = meta.get("dataset_sources", [])
            
            for ds_ref in dataset_sources:
                try:
                    owner, slug = ds_ref.split("/", 1)
                except ValueError:
                    print(f"[WARN] Invalid dataset reference: {ds_ref}")
                    continue
                    
                print(f"[INFO] Processing dataset: {ds_ref}")
                
                # Download dataset
                try:
                    print(f"[INFO] Downloading dataset {ds_ref} into {datasets_dir}")
                    api.dataset_download_files(ds_ref, path=str(datasets_dir), unzip=True)
                except Exception as e:
                    print(f"[ERROR] Failed to download {ds_ref}: {e}")
                    continue

                # SOURCE FACET - Run reliability assessment
                print(f"[INFO] Running SOURCE FACET reliability assessment for {ds_ref}...")
                try:
                    reliability_eval = dataset_evaluation(ds_ref)
                    reliability_path = reports_dir / f"source_facet_reliability_{slug}.json"
                    safe_json_dump(reliability_eval, reliability_path, indent=2, ensure_ascii=False)
                    print(f"[INFO] SOURCE FACET report saved: {reliability_path}")
                except Exception as e:
                    print(f"[WARN] Could not generate SOURCE FACET report for {ds_ref}: {e}")

            # 3. DATA FACET - Data Quality Analysis
            print(f"[INFO] Running DATA FACET quality analysis...")
            
            if datasets_dir.exists() and any(datasets_dir.glob('*.csv')):
                # Analyze all CSV files in datasets directory
                analysis_report = analyze_multiple_csvs(datasets_dir)

                # Save DATA FACET analysis
                data_facet_path = reports_dir / "data_facet_analysis.json"
                safe_json_dump(analysis_report, data_facet_path, indent=2, ensure_ascii=False)
                print(f"[INFO] DATA FACET analysis saved: {data_facet_path}")

                # Process each CSV file for quality and cleaning
                quality_decisions = []
                
                for filename, file_data in analysis_report["files_analysis"].items():
                    if file_data.get('errors'):
                        print(f"[WARN] Skipping {filename} due to errors: {file_data['errors']}")
                        continue

                    # Calculate quality scores
                    scores = calculate_global_score(file_data)
                    print(f"[INFO] DATA FACET quality scores for {filename}: {scores}")

                    dataset_path = datasets_dir / filename
                    decision = {
                        "filename": filename,
                        "quality_scores": scores,
                        "action_taken": "none"
                    }

                    # *** AGREGADO: Generate visualizations for original dataset ***
                    print(f"[INFO] Generating visualizations for {filename}...")
                    try:
                        plot_histograms(file_data, save_dir=visualizations_dir)
                        print(f"[INFO] Visualizations generated for {filename}")
                    except Exception as e:
                        print(f"[WARN] Could not generate visualizations for {filename}: {e}")

                    # Clean dataset if below threshold
                    if scores["global_score"] < ALERT_THRESHOLD and dataset_path.exists():
                        print(f"[ALERT] DATA FACET: {filename} ({scores['global_score']:.2f}) below threshold. Cleaning...")
                        try:
                            clean_path = clean_dataset(dataset_path, cleaned_dir)
                            
                            # Re-analyze cleaned dataset
                            new_dataset_info = extract_csv_metadata(clean_path)
                            new_scores = calculate_global_score(new_dataset_info)
                            print(f"[INFO] DATA FACET: New scores after cleaning: {new_scores}")
                            
                            decision.update({
                                "action_taken": "cleaned",
                                "original_score": scores["global_score"],
                                "cleaned_score": new_scores["global_score"],
                                "improvement": new_scores["global_score"] - scores["global_score"]
                            })
                            
                            if new_scores["global_score"] >= ALERT_THRESHOLD:
                                print(f"[SUCCESS] DATA FACET: Cleaning improved {filename}")
                                
                                # *** AGREGADO: Generate visualizations for cleaned dataset ***
                                print(f"[INFO] Generating visualizations for cleaned {filename}...")
                                try:
                                    plot_histograms(new_dataset_info, save_dir=visualizations_dir)
                                    print(f"[INFO] Visualizations generated for cleaned {filename}")
                                except Exception as e:
                                    print(f"[WARN] Could not generate visualizations for cleaned {filename}: {e}")
                            
                        except Exception as e:
                            print(f"[ERROR] DATA FACET: Failed to clean {filename}: {e}")
                            decision["action_taken"] = "clean_failed"
                            decision["error"] = str(e)
                    else:
                        if scores["global_score"] >= ALERT_THRESHOLD:
                            print(f"[SUCCESS] DATA FACET: {filename} meets quality standards")
                            decision["action_taken"] = "passed_quality_check"

                    quality_decisions.append(decision)

                # Save DATA FACET quality decisions
                quality_decisions_path = reports_dir / "data_facet_quality_decisions.json"
                safe_json_dump({
                    "timestamp": datetime.now().isoformat(),
                    "kernel_ref": ref,
                    "threshold_used": ALERT_THRESHOLD,
                    "weights_used": WEIGHTS,
                    "decisions": quality_decisions
                }, quality_decisions_path, indent=2, ensure_ascii=False)
                print(f"[INFO] DATA FACET quality decisions saved: {quality_decisions_path}")

            else:
                print(f"[WARN] No CSV files found for DATA FACET analysis")

            # Generate combined report
            combined_report_path = reports_dir / "data_source_facet_summary.json"
            combined_report = {
                "pipeline_execution": {
                    "timestamp": datetime.now().isoformat(),
                    "kernel_ref": ref,
                    "facets_processed": ["DATA_FACET", "SOURCE_FACET"]
                },
                "data_facet_summary": {
                    "csv_files_analyzed": len([f for f in datasets_dir.glob('*.csv')]) if datasets_dir.exists() else 0,
                    "files_cleaned": len([d for d in quality_decisions if d.get("action_taken") == "cleaned"]) if 'quality_decisions' in locals() else 0,
                    "files_passed_quality": len([d for d in quality_decisions if d.get("action_taken") == "passed_quality_check"]) if 'quality_decisions' in locals() else 0,
                    # *** AGREGADO: Información sobre visualizaciones ***
                    "visualizations_generated": len([f for f in visualizations_dir.glob('*.png')]) if visualizations_dir.exists() else 0
                },
                "source_facet_summary": {
                    "datasets_evaluated": len(dataset_sources),
                    "reliability_reports_generated": len([f for f in reports_dir.glob('source_facet_reliability_*.json')])
                }
            }
            
            safe_json_dump(combined_report, combined_report_path, indent=2, ensure_ascii=False)
            print(f"[INFO] Combined DATA and SOURCE FACET summary saved: {combined_report_path}")

            print(f"[SUCCESS] ========== Completed Data and Source Facets for {ref} ==========")

        except Exception as e:
            print(f"[ERROR] Failed to process {ref}: {e}")

    print(f"\n[SUCCESS] Data and Source Facet pipeline completed!")
    print(f"[INFO] Results saved in: {base_dir}/")
    print(f"[INFO] Each project contains:")
    print(f"  • DATA FACET: CSV quality analysis, cleaned datasets, and visualizations")
    print(f"  • SOURCE FACET: Dataset reliability assessments")
    print(f"  • VISUALIZATIONS: Histogram plots for data distribution analysis")

In [None]:
run_data_source_facet_pipeline()