In [1]:
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='pyspark-shell'
os.environ["PYSPARK_PYTHON"]='python3'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))
#os.environ["PYSPARK_PYTHON"] = 'python3'
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Python version 3.6.4 (default, Jan 28 2018 00:00:00)
SparkSession available as 'spark'.


In [2]:
data = spark.read.json('/labs/lab07data/DO_record_per_line.json')

In [3]:
data.columns

['cat', 'desc', 'id', 'lang', 'name', 'provider']

In [4]:
data.printSchema()

root
 |-- cat: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- name: string (nullable = true)
 |-- provider: string (nullable = true)



In [5]:
# Курсы,  по которым нужно выдать решение
given = [
    [8150, u'en', u'StatLearning: Statistical Learning'], 
    [25679, u'en', u'Video Lighting Basics - Udemy'], 
    [7791, u'es', u'Programaci\xf3n CNC - Fresadoras'], 
    [23111, u'es', u'C\xf3mo Crear un Blog Gratis en Google Blogger - Udemy'], 
    [1396, u'ru', u'\u0412\u0432\u0435\u0434\u0435\u043d\u0438\u0435 \u0432\u043e \u0432\u0441\u0442\u0440\u043e\u0435\u043d\u043d\u044b\u0435 \u0441\u0438\u0441\u0442\u0435\u043c\u044b \u0438 Windows Embedded CE'], 
    [1348, u'ru', u'\u0422\u0435\u0445\u043d\u043e\u043b\u043e\u0433\u0438\u044f Microsoft ADO .NET']
]

In [6]:
courses_langs = [ (a[0],a[1]) for a in given]

## HashingTF + TFIDF + dot product + l2_norm (via Spark's inner funcs, FAST)

In [7]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer

In [8]:
# Токенизация, как в задании
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, FloatType


In [9]:
# Токенизация, как в задании
import pyspark.sql.functions as f
import re

@f.udf(ArrayType(StringType()))
def re_tokenizer(text):
    regex = re.compile(r'[\w\d]{2,}', re.U)
    return regex.findall(text.lower())


In [10]:
wordsData_udf = data.withColumn('words', re_tokenizer('desc'))

In [11]:
#? Токенизация 2: Плохо парсит русскую кодировку 
tokenizer = Tokenizer(inputCol="desc", outputCol="words")
wordsData = tokenizer.transform(data)

In [12]:
# Adding Term Frequencies using HashingTF function
hashingTF = HashingTF(inputCol="words",
                      outputCol="TFFeatures",
                      numFeatures=10000, )
featurizedData = hashingTF.transform(wordsData_udf)

In [13]:
# Calculationg Inverse Document Frequencies
idf = IDF(inputCol="TFFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [14]:
# Нормализация векторов L2, после этого для Cosine_similarity будет достаточно 
# делать dot product нормализованных векторов
from pyspark.ml.feature import Normalizer
t = Normalizer(inputCol='features', outputCol='norm_features', p=2.0)
normalizedData = t.transform(rescaledData)

In [15]:
# For each given course caclulate cosine similarity to any other
# chose top 10
from tqdm import tqdm

df = normalizedData
dict_out = {}

for course_id, lang in tqdm(courses_langs):
    #get l2-normalized SV for current text
    vec1 = df.filter(df.id == int(course_id))\
             .collect()[0]['norm_features'].toArray()
    
    #we need to define it as lambda, so that it takes vec1 with it
    #If you declare it as a function with vec1 as a global var insde it, it won't work
    #I also could not manage to use broadcasts
    # and culd not manage to create a "constant" column of with vec1 in all rows
    dp = f.udf(lambda x: float(x.dot(vec1)), FloatType())
    
    #where('id <> {0} and lang = \'{1}\''.format(str(course_id), lang))\
    df_sim = df.where((df.id != int(course_id)) & (df.lang == lang))\
               .withColumn('cosine_sim', dp(df['norm_features']))\
               .orderBy(f.desc('cosine_sim'), f.asc('name') ,f.asc('id'))\
               .head(10)
                           
    list_out = [x['id'] for x in df_sim]
    dict_out.update({str(course_id): list_out})


100%|██████████| 6/6 [00:44<00:00,  7.35s/it]


In [16]:
dict_out

{'1348': [1257, 823, 819, 20307, 829, 1285, 1256, 20292, 1229, 810],
 '1396': [1006, 20314, 8215, 1235, 1347, 20102, 994, 934, 12202, 890],
 '23111': [9285, 13224, 9352, 6864, 26336, 26670, 9286, 387, 19404, 10668],
 '25679': [7297, 4466, 24891, 5019, 4799, 4290, 8588, 6243, 6129, 15232],
 '7791': [21853, 10738, 10035, 21107, 11474, 387, 386, 22051, 19153, 4096],
 '8150': [13273, 8145, 16837, 8146, 26907, 22411, 8306, 8142, 540, 542]}

dict_out # Верный вариант 8
{'1348': [1257, 823, 819, 20307, 829, 1285, 1256, 20292, 1229, 810],
 '1396': [1006, 20314, 8215, 1235, 1347, 20102, 994, 934, 12202, 890],
 '23111': [9285, 13224, 9352, 6864, 26336, 26670, 9286, 387, 19404, 10668],
 '25679': [7297, 4466, 24891, 5019, 4799, 4290, 8588, 6243, 6129, 15232],
 '7791': [21853, 10738, 10035, 21107, 11474, 387, 386, 22051, 19153, 4096],
 '8150': [13273, 8145, 16837, 8146, 26907, 22411, 8306, 8142, 540, 542]}

In [17]:
import json
with open('lab07s.json', 'w') as fout:
    fout.write(json.dumps(dict_out))