In [None]:
import pyspark

In [None]:
conf = pyspark.SparkConf()
conf.setMaster('spark://spark-master:7077')
spark_context = pyspark.SparkContext(conf=conf)

In [None]:
# Открытие текстового файла и превращение его в RDD.
# К докеру со spark-worker примаунчена папка /user-data с вашего диска, в которой лежат скачанные данные.
data = spark_context.textFile('/user-data/combined/reviews.csv')
# Берём первую строку — для дальнейшей фильтрации. Первая строка отвечает за заголовки, они нас не интересуют.
header_data = data.first()
# Начались вычисления. Первая функция пропускает все данные, кроме первой строки с заголовками.
# Функция map разделяет строку на колонки, используя разделитель `,`.
original_data = data.filter(lambda row: row != header_data).map(
    lambda line: line.split(","))
original_data.take(10)

In [None]:
result_data = original_data.filter(lambda line: len(line) > 5)
result_data.take(1)

In [None]:
# Всего
print(result_data.count())
# Ненужных данных
print(result_data.filter(lambda line: not line[4].isdigit()).count())

In [None]:
# В случае с combined таких строк 494 из общего числа — 22168. Это меньше 3% данных. 
# В этой задаче таким количеством данных можно пренебречь. 
# Пропустите эти строки, добавив фильтр и изменив в нём условие на обратное.
filtered_data = result_data.filter(lambda line: line[4].isdigit())
# Тут всё ещё есть лишние данные. Нас интересуют всего два столбца — key и stars. 
# В этом примере их значения 0_bj и 3 соответственно.
# Отделите эти данные от всех остальных, дальше работа будет только с ними.
result_data = filtered_data.map(lambda line: (line[1], int(line[4])))
# Для расчёта среднего арифметического тоже нужно знать количество. Вызовите функцию, которая каждому элементу добавляет 1.
result_data = result_data.map(lambda line: (line[0], (line[1], 1)))
result_data.take(10)

In [None]:
reduced_data = result_data.reduceByKey(
    lambda val1, val2: (val1[0] + val2[0], val1[1] + val2[1]))
reduced_data = reduced_data.mapValues(lambda x: x[0] / x[1])
# Бренд / рейтинг
reduced_data.take(10)

In [None]:
  def calculate_weight(helpful_yes, helpful_no):
      return helpful_yes / (helpful_yes + helpful_no)

# Удаляем линии, где рейтинг или данные о полезности не float
filtered_additional_data = filtered_data.filter(lambda line: (line[4].replace('.','').replace('-','').isnumeric()))
filtered_additional_data = filtered_additional_data.filter(lambda line: (line[6].replace('.','').replace('-','').isnumeric()))
filtered_additional_data = filtered_additional_data.filter(lambda line: (line[7].replace('.','').replace('-','').isnumeric()))

# Если helpful_yes + helpful_no = 0, то тоже удаляем, чтобы не делить на ноль
filtered_additional_data = filtered_additional_data.filter(lambda line: float(line[6]) + float(line[7]) != 0)

# Оставляем только нужные данные и считаем измененный рейтинг
additional_data = filtered_additional_data.map(lambda line: (line[1], float(line[4]), float(line[6]), float(line[7])))
additional_data = additional_data.map(lambda line: (line[0], line[1] * calculate_weight(line[2], line[3])))
additional_data.take(10)
    

In [None]:
# Объединяем
combined_data = reduced_data.join(additional_data)
combined_data.take(10)

In [None]:
# Сохраняем в файл
file = '/home/developer/workspace/results'

local_data = combined_data.collect()

with open(file, "w") as file:
    for line in local_data:
        file.write(str(line) + "\n")