In [1]:
import gc
import re
import string
import logging
from copy import copy, deepcopy
import os
import underthesea
import numpy as np
import pyspark
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as f
from pyspark.sql.functions import col, udf, lit, greatest, monotonically_increasing_id, concat_ws
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, StructType, StructField, MapType, BooleanType
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, MinHashLSH
from pyspark.ml.linalg import Vectors, VectorUDT

In [None]:
spark = SparkSession.builder \
    .appName("test") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

In [12]:
current_dir = os.getcwd()
json_path_1 = os.path.join(current_dir, "bds.json")
json_path_2 = os.path.join(current_dir, "dothi.json")
json_path_3 = os.path.join(current_dir, "nhadatuytin.json")
json_path_4 = os.path.join(current_dir, "cafeland.json")
json = [json_path_1,json_path_2,json_path_3,json_path_4]


In [13]:
df = spark.read.json(json)

In [14]:
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- district: string (nullable = true)
 |    |-- full_address: string (nullable = true)
 |    |-- province: string (nullable = true)
 |    |-- ward: string (nullable = true)
 |-- contact_info: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- phone: string (nullable = true)
 |-- description: string (nullable = true)
 |-- estate_type: string (nullable = true)
 |-- extra_infos: struct (nullable = true)
 |    |-- Hướng ban công: string (nullable = true)
 |    |-- Hướng nhà: string (nullable = true)
 |    |-- Mặt tiền: string (nullable = true)
 |    |-- Pháp lý: string (nullable = true)
 |    |-- Số phòng: string (nullable = true)
 |    |-- Số phòng khách: string (nullable = true)
 |    |-- Số phòng ngủ: string (nullable = true)
 |    |-- Số phòng ngủ :: string (nullable = true)
 |    |-- Số toilet: string (nullable = true)
 |    |-- Số toilet :: string (nullable = tru

In [15]:
df = df.drop("_corrupt_record")
df.printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- district: string (nullable = true)
 |    |-- full_address: string (nullable = true)
 |    |-- province: string (nullable = true)
 |    |-- ward: string (nullable = true)
 |-- contact_info: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- phone: string (nullable = true)
 |-- description: string (nullable = true)
 |-- estate_type: string (nullable = true)
 |-- extra_infos: struct (nullable = true)
 |    |-- Hướng ban công: string (nullable = true)
 |    |-- Hướng nhà: string (nullable = true)
 |    |-- Mặt tiền: string (nullable = true)
 |    |-- Pháp lý: string (nullable = true)
 |    |-- Số phòng: string (nullable = true)
 |    |-- Số phòng khách: string (nullable = true)
 |    |-- Số phòng ngủ: string (nullable = true)
 |    |-- Số phòng ngủ :: string (nullable = true)
 |    |-- Số toilet: string (nullable = true)
 |    |-- Số toilet :: string (nullable = true)
 |    |-- Số tầng: string (nullable = true)


In [16]:
df = df.withColumn('Id', monotonically_increasing_id())


In [17]:
df = df.withColumn("text", concat_ws(' ', col('title'), col('description')))
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
df = tokenizer.transform(df)


In [18]:
hashingTF = HashingTF(inputCol="tokens", outputCol="tf")
df = hashingTF.transform(df)

idf = IDF(inputCol="tf", outputCol="tfidf")
idf_model = idf.fit(df)
df = idf_model.transform(df)

In [19]:
append_non_zero_udf = udf(lambda v: Vectors.sparse(len(v) + 1, list(v.indices) + [len(v)], list(v.values) + [1e-5]), VectorUDT())

df = df.withColumn("tfidf", append_non_zero_udf(col("tfidf")))

In [20]:
num_hash_tables = 5
minhashLSH = MinHashLSH(inputCol="tfidf", outputCol="hashes", numHashTables=num_hash_tables)
model = minhashLSH.fit(df)
df_duplicates = model.approxSimilarityJoin(df.select("Id", "tfidf"), df.select("Id", "tfidf"), 0.8, distCol="JaccardDistance") \
    .filter("datasetA.id < datasetB.id")  # Avoid comparing a row to itself

In [21]:
df_duplicates.show(20)

Py4JJavaError: An error occurred while calling o313.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 9.0 failed 1 times, most recent failure: Lost task 3.0 in stage 9.0 (TID 49) (HauMoon executor driver): java.net.SocketException: Connection reset by peer
	at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:425)
	at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:445)
	at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:831)
	at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:123)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
	at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:455)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:140)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:224)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:219)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.net.SocketException: Connection reset by peer
	at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:425)
	at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:445)
	at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:831)
	at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035)
	at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:123)
	at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
	at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)
