<a href="https://colab.research.google.com/github/Sergey2Gnezdilov/test_jb_rep/blob/main/Pyspark_3_5_1_pydeequ_test_env.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

In [None]:
!pip install pydeequ

In [None]:
!pip install findspark datasets tqdm prefect

In [12]:
!ls /usr/lib/jvm/java*

/usr/lib/jvm/java-1.11.0-openjdk-amd64:
bin  conf  docs  include  jmods  legal	lib  man  release

/usr/lib/jvm/java-11-openjdk-amd64:
bin  conf  docs  include  jmods  legal	lib  man  release


In [None]:
!wget https://downloads.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz

In [14]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

In [16]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
os.environ['SPARK_VERSION'] = '3.5.1'
import pydeequ

In [24]:
spark = (SparkSession.builder
    .appName("DeduplicationQuality")
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .getOrCreate())

In [23]:
from pydeequ.checks import *
from pydeequ.verification import *

In [25]:
import pandas as pd
data = {
    "Name": ["Alice", "Bob", "Charlie"],
    "Age": [25, 30, 35],
    "City": ["New York", "Los Angeles", "Chicago"]
}
df = pd.DataFrame(data)
print(df)

      Name  Age         City
0    Alice   25     New York
1      Bob   30  Los Angeles
2  Charlie   35      Chicago


In [28]:
spark_df = spark.createDataFrame(df)
spark_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- City: string (nullable = true)



In [30]:
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
                    .onData(spark_df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("Name")) \
                    .addAnalyzer(Distinctness("Name")) \
                    .addAnalyzer(Mean("Age")) \
                    .addAnalyzer(Compliance("City", "City == 'New York'")) \
                    .addAnalyzer(Correlation("Name", "City")) \
                    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

+-------+--------+------------+------------------+
| entity|instance|        name|             value|
+-------+--------+------------+------------------+
| Column|    Name|Distinctness|               1.0|
| Column|    Name|Completeness|               1.0|
| Column|     Age|        Mean|              30.0|
|Dataset|       *|        Size|               3.0|
| Column|    City|  Compliance|0.3333333333333333|
+-------+--------+------------+------------------+



In [None]:
!pip install datasets transformers  # Install necessary libraries


In [32]:
from datasets import load_dataset

In [33]:
dataset = load_dataset("SergGN/test_KStack")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


train-00000-of-00001.parquet:   0%|          | 0.00/27.6k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/10 [00:00<?, ? examples/s]

In [34]:
pandas_df = dataset["train"].to_pandas()

In [36]:
# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)
spark_df.show()

+--------------------+--------------+---------+-------+----------------------+--------------------+------+-------------+-----+-----+--------------------+----+--------------------+------------------+
|                path|         owner|  repo_id|is_fork|languages_distribution|             content|issues|main_language|forks|stars|          commit_sha|size|                name|           license|
+--------------------+--------------+---------+-------+----------------------+--------------------+------+-------------+-----+-----+--------------------+----+--------------------+------------------+
|app/src/main/kotl...|    mtrewartha| 62930438|  false|                  NULL|package io.trewar...|   1.0|       Kotlin|    2|   18|63f1e5813ae535145...|  94|          positional|       MIT License|
|demo-app/src/main...|         0xera|746013123|  false|  {"Kotlin": 30604,...|package com.zero....|   0.0|       Kotlin|    0|    8|f1be77798e430e91c...| 704|          parcelable|       MIT License|
|app/

In [39]:
spark_df.printSchema()

root
 |-- path: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- repo_id: long (nullable = true)
 |-- is_fork: boolean (nullable = true)
 |-- languages_distribution: string (nullable = true)
 |-- content: string (nullable = true)
 |-- issues: double (nullable = true)
 |-- main_language: string (nullable = true)
 |-- forks: long (nullable = true)
 |-- stars: long (nullable = true)
 |-- commit_sha: string (nullable = true)
 |-- size: long (nullable = true)
 |-- name: string (nullable = true)
 |-- license: string (nullable = true)



In [49]:
from pydeequ.checks import *
from pydeequ.verification import *
from pydeequ.analyzers import *

verification_result = VerificationSuite(spark) \
    .onData(spark_df) \
    .addCheck(
        Check(spark, CheckLevel.Error, "Data Quality Checks")
        .hasSize(lambda x: x > 0, "Dataset should not be empty")
        .isComplete("content", "Content should not be null")
        .isUnique("path", "File paths should be unique")
        # Correct way to specify long data type
        #.hasDataType("size", ConstrainableDataTypes.Long, "Size should be a long")
        .isNonNegative("size", "Size should be non-negative")
        # Correct way to specify long data type
        #.hasDataType("repo_id", ConstrainableDataTypes.Long, "Repo ID should be a long")
        .isComplete("repo_id", "Repo ID should not be null")
        .isUnique("repo_id", "Repo IDs should be unique")
        #.hasDataType("is_fork", DataType.Boolean, "is_fork should be a boolean")
        # ... other checks ...
    ) \
    .run()

# Get and display results
check_result_df = VerificationResult.checkResultsAsDataFrame(spark, verification_result)
print("\nData Quality Verification Results:")
check_result_df.show(truncate=False)

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/content/spark-3.5.3-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.10/dist-packages/pydeequ/scala_utils.py", line 38, in apply
    return self.lambda_function(arg)
TypeError: 'str' object is not callable



Data Quality Verification Results:
+-------------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------------+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|check              |check_level|check_status|constraint                                                                                                                  |constraint_status|constraint_message                                                                                 



In [None]:
!pip install datasketch huggingface-hub dpu-utils prefect

In [57]:
import json
import multiprocessing as mp
import re
from collections import defaultdict
from functools import partial
from typing import Dict, List, Optional, Set, Tuple, Type

from prefect import task, flow
from datasets import Dataset, load_dataset
from tqdm import tqdm
from datasketch import MinHash, MinHashLSH
from dpu_utils.utils.iterators import ThreadedIterator

import pydeequ #pip install git+https://github.com/awslabs/python-deequ.git
from pydeequ.analyzers import *
from pydeequ.checks import *
from pydeequ.verification import *
from pydeequ.suggestions import *

# Add these to your existing imports
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

# Constants
NON_ALPHA = re.compile("[^A-Za-z_0-9]")
MIN_NUM_TOKENS = 10
NUM_PERM = 256
PATH_COLUMN = "path"
CONTENT = "content"

# Dataset Configuration Constants
HF_SOURCE_DATASET = "JetBrains/KStack"  # Hugging Face source dataset
DATASET_SIZE_LIMIT = 10000  # Limit dataset size for testing/resource management
JACCARD_THRESHOLD = 0.85  # Jaccard similarity threshold
OUTPUT_DATASET_PATH = "./data/data-near-dedup"  # Local output path
HF_REPO_NAME = "JB_KStack_near_dedup"  # Repository name for upload
HF_ORG = "SergGN"  # Organization or username for HF upload
TEST_SAMPLE_SIZE = 7000  # Size for test runs

@task(name="prepare_dataset", retries=2)
def prepare_dataset(
    dataset_name: str,
    size_limit: int = DATASET_SIZE_LIMIT,
    test_run: bool = False
) -> Dataset:
    """
    Load and prepare dataset with size limitations

    Args:
        dataset_name: Name of the dataset in Hugging Face hub
        size_limit: Maximum number of samples to process
        test_run: If True, uses a very small subset for testing
    """
    ds = load_dataset(dataset_name, split="train")

    if test_run:
        ds = ds.select(range(min(TEST_SAMPLE_SIZE, len(ds))))
    elif size_limit:
        ds = ds.select(range(min(size_limit, len(ds))))

    print(f"Prepared dataset size: {len(ds)}")
    return ds

@task(name="make_duplicate_clusters")
def make_duplicate_clusters(dataset: Dataset, jaccard_threshold: float) -> List[List[Dict]]:
    """Find duplicate clusters in the dataset"""
    print("Starting duplicate cluster creation")
    duplication_index = DuplicationIndex(jaccard_threshold)

    # Process each item in the dataset
    for idx in tqdm(range(len(dataset)), desc="Processing items"):
        try:
            content = dataset[idx][CONTENT]
            path = dataset[idx][PATH_COLUMN]

            # Create MinHash
            tokens = [t for t in NON_ALPHA.split(content) if len(t.strip()) > 0]
            if len(tokens) < MIN_NUM_TOKENS:
                continue

            minhash = MinHash(num_perm=NUM_PERM)
            for token in tokens:
                minhash.update(token.encode('utf-8'))

            duplication_index.add((idx, path), minhash)

        except Exception as e:
            print(f"Error processing item {idx}: {str(e)}")
            continue

    return duplication_index.get_duplicate_clusters()

class DuplicationIndex:
    """Wrapper class for MinHashLSH operations"""
    def __init__(self, jaccard_threshold: float = JACCARD_THRESHOLD):
        self._duplication_jaccard_threshold = jaccard_threshold
        self._num_perm = NUM_PERM
        self._index = MinHashLSH(
            threshold=self._duplication_jaccard_threshold,
            num_perm=self._num_perm
        )
        self._duplicate_clusters = defaultdict(set)

    def add(self, code_key: Tuple, min_hash: MinHash) -> None:
        close_duplicates = self._index.query(min_hash)
        if code_key in self._index.keys:
            return

        self._index.insert(code_key, min_hash)
        if close_duplicates:
            for base_duplicate in close_duplicates:
                if base_duplicate in self._duplicate_clusters:
                    self._duplicate_clusters[base_duplicate].add(code_key)
                    break
            else:
                self._duplicate_clusters[close_duplicates[0]].add(code_key)

    def get_duplicate_clusters(self) -> List[List[Dict]]:
        duplicate_clusters = []
        for base, duplicates in self._duplicate_clusters.items():
            cluster = [base] + list(duplicates)
            cluster = [{"base_index": el[0], "path": el[1]} for el in cluster]
            duplicate_clusters.append(cluster)
        return duplicate_clusters

@task(name="process_extremes")
def process_extremes(
    duplicate_clusters: List[List[Dict]],
    dataset: Dataset,
    jaccard_threshold: float
) -> List[List[Dict]]:
    """Process clusters to find extremes"""
    extremes_list = []

    for cluster in tqdm(duplicate_clusters, desc="Processing clusters"):
        extremes = find_cluster_extremes(cluster, dataset, jaccard_threshold)
        extremes_list.append(extremes)

    return extremes_list

def find_cluster_extremes(
    cluster: List[Dict],
    dataset: Dataset,
    jaccard_threshold: float
) -> List[Dict]:
    """Find reduced cluster representation"""
    extremes = []
    for element1 in cluster:
        code1 = dataset[element1["base_index"]][CONTENT]
        for element2 in extremes:
            code2 = dataset[element2["base_index"]][CONTENT]
            tokens1 = set([t for t in NON_ALPHA.split(code1) if len(t.strip()) > 0])
            tokens2 = set([t for t in NON_ALPHA.split(code2) if len(t.strip()) > 0])
            similarity = len(tokens1 & tokens2) / len(tokens1 | tokens2)
            if similarity >= jaccard_threshold:
                element2["copies"] += 1
                break
        else:
            element1["copies"] = 1
            extremes.append(element1)
    return extremes

@task(name="setup_spark")
def setup_spark() -> SparkSession:
    """Initialize Spark session with PyDeequ configuration"""
    spark = (SparkSession
        .builder
        .config("spark.jars.packages", pydeequ.deequ_maven_coord)
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
        .getOrCreate())
    return spark

@task(name="verify_data_quality")
def verify_data_quality(dataset: Dataset, spark: SparkSession) -> bool:
    """Verify data quality using PyDeequ"""
    # Convert Dataset to Spark DataFrame
    schema = StructType([
        StructField(CONTENT, StringType(), True),
        StructField(PATH_COLUMN, StringType(), True)
    ])

    df = spark.createDataFrame(
        dataset.to_pandas(),
        schema=schema
    )

    # Define checks
    check = Check(spark, CheckLevel.Error, "Data Quality Check")

    checkResult = (VerificationSuite(spark)
        .onData(df)
        .addCheck(
            check
            .hasSize(lambda x: x >= 100)  # Ensure minimum dataset size
            .isComplete(CONTENT)  # No null values in content
            .isComplete(PATH_COLUMN)  # No null values in path
            #.hasDataType("size", ConstrainableDataTypes.Long, "Size should be a long")
            #.hasDataType(PATH_COLUMN, ConstrainableDataTypes.String, "Path should be a string")
            .isUnique(PATH_COLUMN, "Path should be unique")
            .containsURL(PATH_COLUMN)  # Path should be URL-like
            .hasMinLength(CONTENT, lambda x: x >= MIN_NUM_TOKENS)  # Minimum content length
        )
        .run()
    )

    # Analyze data quality
    analysisResult = (AnalysisRunner(spark)
        .onData(df)
        .addAnalyzer(Size())
        .addAnalyzer(Completeness(CONTENT))
        .addAnalyzer(Completeness(PATH_COLUMN))
        #.addAnalyzer(UniqueValueRatio(PATH_COLUMN))
        .run()
    )

    # Print analysis results
    analysis_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
    print("\nData Quality Analysis Results:")
    analysis_df.show()

    # Print verification results
    verification_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
    print("\nData Quality Verification Results:")
    verification_df.show()

    # Return True if all checks passed
    return checkResult.status == "Success"

@flow(name="Near-Deduplication")
def deduplicate_dataset_flow(
    dataset_name: str = HF_SOURCE_DATASET,
    size_limit: int = DATASET_SIZE_LIMIT,
    jaccard_threshold: float = JACCARD_THRESHOLD,
    test_run: bool = False
) -> Tuple[Dataset, List[List[Dict]]]:

    """Main flow for dataset deduplication with data quality checks"""
    # Setup Spark
    spark = setup_spark()

    try:
        # Prepare dataset
        dataset = prepare_dataset(
            dataset_name=dataset_name,
            size_limit=size_limit,
            test_run=test_run
        )

        # Verify data quality
        quality_check_passed = verify_data_quality(dataset, spark)
        if not quality_check_passed:
            print("Data quality checks failed. Please review the verification results.")

        # Continue with existing deduplication logic
        duplicate_clusters = make_duplicate_clusters(dataset, jaccard_threshold)

        # Get all duplicate indices
        duplicate_indices = set(
            x["base_index"] for cluster in duplicate_clusters for x in cluster
        )

        # Process extremes
        extremes_clusters = process_extremes(duplicate_clusters, dataset, jaccard_threshold)

        # Build extreme dictionary
        extreme_dict = {}
        for extremes in extremes_clusters:
            for element in extremes:
                extreme_dict[element["base_index"]] = element

        # Filter dataset
        remove_indices = duplicate_indices - set(extreme_dict.keys())
        ds_filter = dataset.filter(
            lambda x, idx: idx not in remove_indices,
            with_indices=True
        )

        # Update duplicate clusters
        for cluster in duplicate_clusters:
            for element in cluster:
                element["is_extreme"] = element["base_index"] in extreme_dict
                if element["is_extreme"]:
                    element["copies"] = extreme_dict[element["base_index"]]["copies"]

        return ds_filter, duplicate_clusters

    finally:
        # Clean up Spark session
        spark.stop()


dedup_ds, clusters = deduplicate_dataset_flow(
    dataset_name=HF_SOURCE_DATASET,
    size_limit=DATASET_SIZE_LIMIT
)

print(f"Final deduplicated dataset size: {len(dedup_ds)}")
print(f"Number of duplicate clusters: {len(clusters)}")

Resolving data files:   0%|          | 0/24 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/24 [00:00<?, ?it/s]

Loading dataset shards:   0%|          | 0/24 [00:00<?, ?it/s]

Prepared dataset size: 10000



Data Quality Analysis Results:
+-------+--------+------------+-------+
| entity|instance|        name|  value|
+-------+--------+------------+-------+
|Dataset|       *|        Size|10000.0|
| Column| content|Completeness|    1.0|
| Column|    path|Completeness|    1.0|
+-------+--------+------------+-------+


Data Quality Verification Results:
+------------------+-----------+------------+--------------------+-----------------+--------------------+
|             check|check_level|check_status|          constraint|constraint_status|  constraint_message|
+------------------+-----------+------------+--------------------+-----------------+--------------------+
|Data Quality Check|      Error|       Error|SizeConstraint(Si...|          Success|                    |
|Data Quality Check|      Error|       Error|CompletenessConst...|          Success|                    |
|Data Quality Check|      Error|       Error|CompletenessConst...|          Success|                    |
|Data Quality C



Data quality checks failed. Please review the verification results.
Starting duplicate cluster creation


Processing items: 100%|██████████| 10000/10000 [01:28<00:00, 113.41it/s]


Processing clusters: 100%|██████████| 18/18 [00:00<00:00, 247.87it/s]


Filter:   0%|          | 0/10000 [00:00<?, ? examples/s]

Final deduplicated dataset size: 9970
Number of duplicate clusters: 18
