In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession \
    .builder\
    .master('local')\
    .config('spark.mongodb.input.uri', 'mongodb://127.0.0.1:27017/propertify')\
    .config('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/propertify')\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.1')\
    .getOrCreate()

sc = SparkContext.getOrCreate("local")
locale = spark._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))

property_df = spark.read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("database", "learns")\
    .option("collection", "crawler")\
    .load()


In [3]:
property_df.printSchema()
property_df.show(5)
property_df.count()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)

+--------------------+--------------------+--------------------+
|                 _id|               title|                 url|
+--------------------+--------------------+--------------------+
|[5eb51e052fcfa6e7...|How to find the c...|/questions/616754...|
|[5eb51e052fcfa6e7...|MySQL to return l...|/questions/616754...|
|[5eb51e052fcfa6e7...|mysql - Count of ...|/questions/616754...|
|[5eb51e052fcfa6e7...|Upload multiple f...|/questions/616754...|
|[5eb51e052fcfa6e7...|How to resize a f...|/questions/616754...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



50

In [6]:
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.feature import StopWordsRemover, VectorAssembler
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.feature import IDF, HashingTF

from pyspark.ml import Pipeline, PipelineModel

In [7]:
regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'title', outputCol = 'token')
stopWordsRemover = StopWordsRemover(inputCol = 'token', outputCol = 'nostopwrd')

In [8]:
# HashingTF = HashingTF(inputCol="nostopwrd", outputCol="rawFeature" qw  cwkcw  
countVectorizer = CountVectorizer(inputCol="nostopwrd", outputCol="rawFeature")
iDF = IDF(inputCol="rawFeature", outputCol="idf_vec")

pipeline = Pipeline(stages=[regexTokenizer, stopWordsRemover, countVectorizer, iDF])

In [9]:
pipeline_mdl = pipeline.fit(property_df)

In [10]:
property_trf_df = pipeline_mdl.transform(property_df)
property_trf_df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- token: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- nostopwrd: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeature: vector (nullable = true)
 |-- idf_vec: vector (nullable = true)



In [11]:
property_trf_df.select('_id', 'title', 'url', 'idf_vec').show(20)

+--------------------+--------------------+--------------------+--------------------+
|                 _id|               title|                 url|             idf_vec|
+--------------------+--------------------+--------------------+--------------------+
|[5eb51e052fcfa6e7...|How to find the c...|/questions/616754...|(281,[5,14,15,56,...|
|[5eb51e052fcfa6e7...|MySQL to return l...|/questions/616754...|(281,[17,18,126,1...|
|[5eb51e052fcfa6e7...|mysql - Count of ...|/questions/616754...|(281,[0,18,36,146...|
|[5eb51e052fcfa6e7...|Upload multiple f...|/questions/616754...|(281,[12,23,46,58...|
|[5eb51e052fcfa6e7...|How to resize a f...|/questions/616754...|(281,[15,39,131],...|
|[5eb51e052fcfa6e7...|How to convert fo...|/questions/616754...|(281,[9,19,50,61,...|
|[5eb51e052fcfa6e7...|Android BaseAdapt...|/questions/616754...|(281,[22,44,54,89...|
|[5eb51e052fcfa6e7...|Xpath results in ...|/questions/616754...|(281,[30,118,203,...|
|[5eb51e052fcfa6e7...|How to migrate da...|/questions/

In [12]:
import numpy as np

def cosine_sim(vec1, vec2):
    return np.dot(vec1, vec2) / np.sqrt(np.dot(vec1, vec1)) / np.sqrt(np.dot(vec2, vec2))

In [13]:
from pyspark.sql.functions import col, isnan

def getBusinessDetails(in_bus):
    
    a = in_bus.alias("a")
    b = property_df.alias("b")
    
    return a.join(b, col("a.property_id") == col("b._id"), 'inner').select([col('a.'+xx) for xx in a.columns] + [col('b.title'), col('b.url')]).orderBy("a.score", ascending=False)

In [14]:
all_property_vecs = property_trf_df.select('_id', 'idf_vec').rdd.map(lambda x: (x[0], x[1])).collect() #change Word2Vec

In [15]:
import pyspark.sql.functions as f


def get_keywords_recomendations(key_words, sim_bus_limit):
    input_words_df = sc.parallelize([(0, key_words)]).toDF(['_id', 'title'])
    input_words_df = pipeline_mdl.transform(input_words_df)
    input_key_words_vec = input_words_df.select('idf_vec').collect()[0][0]    #change Word2Vec
    sim_property_byword_rdd = sc.parallelize((i[0], float(cosine_sim(input_key_words_vec, i[1]))) for i in all_property_vecs)
    sim_property_byword_df = spark.createDataFrame(sim_property_byword_rdd) \
         .withColumnRenamed('_1', 'property_id') \
         .withColumnRenamed('_2', 'score') \
         .orderBy("score", ascending=False)
    a = sim_property_byword_df.filter((f.col('score')>0) & (~f.isnan('score'))).limit(sim_bus_limit)
    return getBusinessDetails(a)


In [18]:
key_words = 'Mysql'

keywords_recom_df = get_keywords_recomendations(key_words, 20)
keywords_recom_df.show()

+--------------------+-------------------+--------------------+--------------------+
|         property_id|              score|               title|                 url|
+--------------------+-------------------+--------------------+--------------------+
|[5eb51e052fcfa6e7...| 0.3719859549546635|MySQL to return l...|/questions/616754...|
|[5eb51e052fcfa6e7...|0.30751922102747997|mysql - Count of ...|/questions/616754...|
+--------------------+-------------------+--------------------+--------------------+



In [19]:
import json

keywords_recom_df.toJSON().map(lambda j: json.loads(j)).collect()

[{'property_id': {'oid': '5eb51e052fcfa6e75e3c002f'},
  'score': 0.3719859549546635,
  'title': 'MySQL to return list of people who are not already friends',
  'url': '/questions/61675447/mysql-to-return-list-of-people-who-are-not-already-friends'},
 {'property_id': {'oid': '5eb51e052fcfa6e75e3c0030'},
  'score': 0.30751922102747997,
  'title': 'mysql - Count of days in a month with groupby Month data',
  'url': '/questions/61675444/mysql-count-of-days-in-a-month-with-groupby-month-data'}]