In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType, IntegerType

In [None]:
def calculate_weight(helpful_yes, helpful_no):
    return helpful_yes / (helpful_yes + helpful_no)

In [None]:
spark = SparkSession.builder.appName('Yandex_pr').master(
    'spark://spark-master:7077').getOrCreate()


In [None]:
df_products = spark.read.csv(
    '/user-data/combined/products.csv', header=True, inferSchema=True)

df_products = df_products.withColumn(
    'rating', df_products['rating'].cast(FloatType()))
df_products = df_products.withColumn(
    'rating_count', df_products['rating_count'].cast(IntegerType()))


In [None]:
df_reviews = spark.read.csv(
    '/user-data/combined/reviews.csv', header=True, inferSchema=True)

df_reviews = df_reviews.withColumn(
    'helpful_yes', df_reviews['helpful_yes'].cast(FloatType()))
df_reviews = df_reviews.withColumn(
    'helpful_no', df_reviews['helpful_no'].cast(FloatType()))
df_reviews = df_reviews.withColumn(
    'stars', df_reviews['stars'].cast(IntegerType()))

df_reviews = df_reviews.filter(df_reviews['helpful_yes'] > 0) \
    .filter(df_reviews['helpful_no'] > 0)


In [None]:
rdd_reviews = df_reviews.rdd.filter(lambda line: type(line[4]) is int) \
    .map(lambda line: (line[1], int(line[4]) * calculate_weight(line[6], line[7]))) \
    .map(lambda line: (line[0], (line[1], 1))) \
    .reduceByKey(lambda val1, val2: (val1[0] + val2[0], val1[1] + val2[1])) \
    .mapValues(lambda x: x[0] / x[1])


In [None]:
df_reviews = spark.createDataFrame(
    rdd_reviews, schema=['_key', 'rating_by_reviews'])


In [None]:
df_join = df_products.join(df_reviews, df_products.key == df_reviews._key)
df_join = df_join.drop('_key')


In [None]:
df_join.write.csv('/user-data/combined/new_products.csv', header=True)