<a href="https://colab.research.google.com/github/Dev-180Memes/text-deduplication/blob/main/Text_Deduplication.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import hashlib
import time
import psutil
import os
from collections import defaultdict
import re

In [2]:
def get_memory_usage():
  process = psutil.Process(os.getpid())
  return process.memory_info().rss / 1024 / 1024

In [10]:
class SimHash:
  def __init__(self, text, hash_size=64):
    self.hash_size = hash_size
    self.simhash = self._compute_simhash(text)

  def _tokenize(self, text):
    text = text.lower()
    tokens = re.findall(r'\w+', text)
    return tokens

  def _hash(self, token):
    return int(hashlib.sha1(token.encode('utf-8')).hexdigest(), 16)

  def _compute_simhash(self, text):
    v = [0] * self.hash_size
    tokens = self._tokenize(text)
    weights = defaultdict(int)
    for token in tokens:
      weights[token] += 1

    for token, weight in weights.items():
      token_hash = self._hash(token)
      for i in range(self.hash_size):
        bit = (token_hash >> i) & 1
        v[i] += weight if bit else -weight

    fingerprint = 0
    for i in range(self.hash_size):
      if v[i] > 0:
        fingerprint |= 1 << i
    return fingerprint

  def hamming_distance(self, other_simhash):
    x = self.simhash ^ other_simhash
    distance = 0
    while x:
      distance += x & 1
      x >>= 1
    return distance

In [11]:
class CDCTTTD:
  def __init__(self, text, min_size=64, max_size=256, window_size=48):
    self.min_size = min_size
    self.max_size = max_size
    self.window_size = window_size
    self.hash = self._compute_hash(text)

  def _rolling_hash(self, window):
    return int(hashlib.md5(window.encode('utf-8')).hexdigest(), 16)

  def _compute_hash(self, text):
    if len(text) < self.min_size:
      return hashlib.sha1(text.encode('utf-8')).hexdigest()

    chunks = []
    start = 0
    pos = self.min_size

    while pos < len(text):
      window = text[pos-self.window_size:pos] if pos >= self.window_size else text[:pos]
      hash_val = self._rolling_hash(window)

      primary_div = 2**13
      backup_div = 2**11

      if (pos >= self.min_size and hash_val % primary_div == 0) or (pos >= self.max_size and hash_val % backup_div == 0):
        chunks.append(text[start:pos])
        start = pos
      pos += 1

    if start < len(text):
      chunks.append(text[start:])

    return hashlib.sha1(''.join(chunks).encode('utf-8')).hexdigest()

In [17]:
def evaluate_deduplication(input_file, output_simhash, output_cdctttd, simhash_threshold=3):
  with open(input_file, 'r', encoding='utf-8') as f:
    lines = [line.strip() for line in f.readlines() if line.strip()]

  total_lines = len(lines)
  metrics = {
      'simhash': {
          'ratio': 0,
          'time': 0,
          'memory': 0
      },
      'cdctttd': {
          'ratio': 0,
          'time': 0,
          'memory': 0
      }
  }

  start_time = time.time()
  start_memory = get_memory_usage()

  simhash_seen = []
  simhash_unique = []
  for line in lines:
    simhash_obj = SimHash(line)  # Renamed to simhash_obj
    is_duplicate = False
    for seen_hash in simhash_seen:
      if simhash_obj.hamming_distance(seen_hash) <= simhash_threshold:
        is_duplicate = True
        break
    if not is_duplicate:
      simhash_seen.append(simhash_obj.simhash)
      simhash_unique.append(line)

  # Use 'simhash' as the key instead of the simhash object
  metrics['simhash']['time'] = time.time() - start_time
  metrics['simhash']['memory'] = get_memory_usage() - start_memory
  metrics['simhash']['ratio'] = (total_lines - len(simhash_unique)) / total_lines if total_lines > 0 else 0

  with open(output_simhash, 'w', encoding='utf-8') as f:
    f.writelines(line + '\n' for line in simhash_unique)

  start_time = time.time()
  start_memory = get_memory_usage()

  cdctttd_seen = set()
  cdctttd_unique = []
  for line in lines:
    cdctttd_obj = CDCTTTD(line) # Renamed to cdctttd_obj
    if cdctttd_obj.hash not in cdctttd_seen:
      cdctttd_seen.add(cdctttd_obj.hash)
      cdctttd_unique.append(line)

  # Use 'cdctttd' as the key instead of the cdctttd object
  metrics['cdctttd']['time'] = time.time() - start_time
  metrics['cdctttd']['memory'] = get_memory_usage() - start_memory
  metrics['cdctttd']['ratio'] = (total_lines - len(cdctttd_unique)) / total_lines if total_lines > 0 else 0

  with open(output_cdctttd, 'w', encoding='utf-8') as f:
    f.writelines(line + '\n' for line in cdctttd_unique)

  return metrics

In [18]:
input_file = 'input.txt'
output_simhash = 'output_simhash.txt'
output_cdctttd = 'output_cdctttd.txt'

In [19]:
sample_content = """This is a test document
    This is a TEST document!
    Another different document
    This is a test Document?
    Completely unique content"""

In [20]:
with open(input_file, 'w', encoding='utf-8') as f:
        f.write(sample_content)

In [21]:
results = evaluate_deduplication(input_file, output_simhash, output_cdctttd)

In [22]:
print("Evaluation Metrics:")
print("\nSimHash:")
print(f"Deduplication Ratio: {results['simhash']['ratio']:.2%}")
print(f"Execution Time: {results['simhash']['time']:.4f} seconds")
print(f"Memory Utilization: {results['simhash']['memory']:.2f} MB")

print("\nCDC-TTTD:")
print(f"Deduplication Ratio: {results['cdctttd']['ratio']:.2%}")
print(f"Execution Time: {results['cdctttd']['time']:.4f} seconds")
print(f"Memory Utilization: {results['cdctttd']['memory']:.2f} MB")

Evaluation Metrics:

SimHash:
Deduplication Ratio: 40.00%
Execution Time: 0.0010 seconds
Memory Utilization: 0.00 MB

CDC-TTTD:
Deduplication Ratio: 0.00%
Execution Time: 0.0002 seconds
Memory Utilization: 0.00 MB
