In [1]:
import os

# пути к Java и Spark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/ubuntu/_practice/spark-3.5.4-bin-hadoop3"
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["SPARK_HOME"], "bin")

In [2]:
import findspark
findspark.init()

import pyspark
print(pyspark.__version__)

from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]").setAppName("PlacesDistance Marchuk"))

print("SparkContext запущен:", sc)

3.5.4


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/08 17:46:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


SparkContext запущен: <SparkContext master=local[*] appName=PlacesDistance>


In [3]:
import math
import csv

# Формула гаверсинуса
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Радиус Земли в километрах
    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    a = math.sin(dlat / 2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    return R * c

# Функция для парсинга CSV 
def parse_csv_line(line):
    reader = csv.reader([line])
    return next(reader)

In [4]:
# как скопировать файл в hdfs
#hdfs dfs -mkdir /dataset
#hdfs dfs -put /home/ubuntu/_practice/places.csv /dataset/places.csv

# Загрузим данные places.csv:

In [6]:
file_path = "/dataset/places.csv" 
data = sc.textFile(file_path)

# Заголовки и данные
header = data.first()
rows = data.filter(lambda line: line != header).map(parse_csv_line)


                                                                                

In [7]:
#Рассчитаем расстояние от заданной точки:

In [8]:
# Координаты точки
origin_lat, origin_lng = 55.751244, 37.618423

# Расчет расстояний
distances_from_origin = rows.map(lambda row: (row[1], haversine(origin_lat, origin_lng, float(row[13]), float(row[12]))))
top_10_nearby = distances_from_origin.takeOrdered(10, key=lambda x: x[1])

print("Первые 10 расстояний от заданной точки:")
for place, distance in top_10_nearby:
    print(f"{place}: {distance:.2f} км")

Первые 10 расстояний от заданной точки:
Мареа: 1.06 км
Стейк Хаус «Бизон»: 1.06 км
ЛяМур: 1.07 км
БИБЛИОТЕКА Shisha Lounge: 1.07 км
Му-Му: 1.07 км
Unlock cafe: 1.07 км
Папа Джонс: 1.07 км
Jimmy Poy: 1.07 км
Шоколадница: 1.07 км
Настоишная: 1.07 км


In [9]:
#Рассчитаем расстояние между всеми заведениями:

In [10]:
# Создаем пары заведений и рассчитываем расстояния
pairs = rows.cartesian(rows).filter(lambda x: x[0][0] != x[1][0])  # Исключаем одинаковые объекты
distances_between_places = pairs.map(lambda x: ((x[0][1], x[1][1]), 
                                                haversine(float(x[0][13]), float(x[0][12]), float(x[1][13]), float(x[1][12]))))

print("Первые 10 расстояний между заведениями:")
for (place1, place2), distance in distances_between_places.take(10):
    print(f"{place1} - {place2}: {distance:.2f} км")

Первые 10 расстояний между заведениями:
МУ-МУ - КОМБИНАТ ПИТАНИЯ МГТУ ИМ.Н.Э.БАУМАНА: 0.68 км
МУ-МУ - Дом 12: 1.48 км
МУ-МУ - Чито-Ра: 1.45 км
МУ-МУ - Бар- буфет «Николай»: 1.58 км
МУ-МУ - Флорентини: 1.52 км
МУ-МУ - Beer Gik: 2.98 км
МУ-МУ - Погребок: 2.98 км
МУ-МУ - Пробка Гриль: 2.89 км
МУ-МУ - TEMPO DI PASTA: 2.90 км
МУ-МУ - Хлеб насущный: 2.90 км


In [11]:
# Найдем топ-10 самых близких и самых отдаленных заведений:

In [13]:
# Сортируем по расстоянию
top_10_closest = distances_between_places.takeOrdered(10, key=lambda x: x[1])
top_10_farthest = distances_between_places.takeOrdered(10, key=lambda x: -x[1])

print("10 наиболее близких заведений:")
for (place1, place2), distance in top_10_closest:
    print(f"{place1} - {place2}: {distance:.2f} км")

print("\n10 наиболее отдаленных заведений:")
for (place1, place2), distance in top_10_farthest:
    print(f"{place1} - {place2}: {distance:.2f} км")

                                                                                

10 наиболее близких заведений:
Beer Gik - Погребок: 0.00 км
Beer Gik - Kozlovna: 0.00 км
Beer Gik - Па-Паэлья: 0.00 км
Погребок - Beer Gik: 0.00 км
TEMPO DI PASTA - Хлеб насущный: 0.00 км
Хлеб насущный - TEMPO DI PASTA: 0.00 км
Погребок - Kozlovna: 0.00 км
Погребок - Па-Паэлья: 0.00 км
Глав Пив Маг - Beermood: 0.00 км
Beermood - Глав Пив Маг: 0.00 км

10 наиболее отдаленных заведений:
МНИТИ - Calabash Club: 5.46 км
МНИТИ - Залечь на дно: 5.46 км
МНИТИ - Политех: 5.46 км
Calabash Club - МНИТИ: 5.46 км
Залечь на дно - МНИТИ: 5.46 км
Политех - МНИТИ: 5.46 км
МНИТИ - Антикафе Checkpoint: 5.46 км
Антикафе Checkpoint - МНИТИ: 5.46 км
МНИТИ - Шоколадница: 5.40 км
Шоколадница - МНИТИ: 5.40 км


                                                                                










=============== Задание 2 ===============


   1. Рассчитайте средний рейтинг товаров из набора данных.
   2. Сопоставьте полученные данные из предыдущего пункта с наименованием товаров.
   3. Сформируйте RDD товаров с рейтингом меньшим 3. Выведите топ-10 товаров с наименьшим рейтингом.
   4. Сохраните результат в постоянное хранилище.

Замечание: для парсинга товаров используйте функцию eval.


1. расчёт среднего рейтинга товаров

hdfs dfs -put /home/ubuntu/_practice/Electronics_5.json /dataset/Electronics_5.json
hdfs dfs -put /home/ubuntu/_practice/meta_Electronics.json /dataset/meta_Electronics.json

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

# Создаем сессию
spark = SparkSession.builder.appName("Electronics Ratings Marchuk").getOrCreate()

# Загружаем данные
reviews_path = "/dataset/Electronics_5.json"
meta_path = "/dataset/meta_Electronics.json"

reviews_df = spark.read.json(reviews_path)
meta_df = spark.read.json(meta_path)

# 1. Рассчитайте средний рейтинг товаров
avg_ratings_df = reviews_df.groupBy("asin").agg(avg("overall").alias("average_rating"))

# 2. Сопоставьте с наименованием товаров
product_ratings_df = avg_ratings_df.join(meta_df.select("asin", "title"), on="asin", how="inner")

# 3. Создайте RDD товаров с рейтингом < 3 и выведите топ-10
low_rated_products_rdd = product_ratings_df.filter(col("average_rating") < 3).rdd
top_10_low_rated = low_rated_products_rdd.takeOrdered(10, key=lambda row: row["average_rating"])

# Выводим топ-10 товаров
print("Top-10 товаров с рейтингом ниже 3:")
for product in top_10_low_rated:
    print(f"Название: {product['title']}, Рейтинг: {product['average_rating']}")

# 4. Сохраните результат в постоянное хранилище
output_path = "hdfs://localhost:9000/dataset/low_rated_products"
low_rated_products_rdd.saveAsTextFile(output_path)

# Останавливаем Spark-сессию
spark.stop()

25/01/08 17:50:55 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Top-10 товаров с рейтингом ниже 3:
Название: ATI TV Wonder 200 PCI Video Card w/PVR Capabilities, Рейтинг: 1.0
Название: StarTech HDMISPL1HH 1 feet Standard HDMI Cable - 1x HDMI (M) to 2x HDMI (F) (Discontinued by Manufacturer), Рейтинг: 1.0
Название: GE 24746 Futura HDTV Ready Antenna, Рейтинг: 1.0
Название: Agfa ePhoto SMILE 0.2MP Digital Camera, Рейтинг: 1.0
Название: Dynex-DX-AP100 Adapter Mini DVI to Mini-DIN, Рейтинг: 1.0
Название: RCA DRC8335 DVD Recorder &amp; VCR Combo With Built-In Tuner, Рейтинг: 1.0
Название: Maxtor OneTouch 4 - 1 TB USB 2.0 Desktop External Hard Drive STM310005OTA3E1-RK, Рейтинг: 1.0
Название: Zeikos 57-in-1 USB 2.0 Flash Memory Card Reader ZE-CR201, Рейтинг: 1.0
Название: NEEWER&reg; Photographic Barn Door &amp; Honeycomb Grid &amp; Gel Set for Alienbees Alienbee Flash, Рейтинг: 1.0
Название: NEW SYLVANIA HD1Z SDSDHCMMC 720P HARD DRIVE POCKET VIDEO DIGITAL CAMERACAMCORDER W4X DIGITAL ZOOM HDMI &amp; 2 LCD PEACOCK BLUE, Рейтинг: 1.0


                                                                                









=============== Задание 3 ===============

1. Вычислите косинусное сходство между рейтингами фильмов.
2. Для фильма с movieId равным 589 сформируйте RDD со значениями сходства с остальными фильмами
3. Добавьте наименования фильмов.
4. Выведите топ-10 наиболее похожих фильмов.

Пример расчета посредством SQL (без нормализации):
SELECT r1.prodId, r2.prodId, SUM(r1.val * r2.val) AS sim
FROM rating AS r1 JOIN rating AS r2 ON r1.userId = r2.userId
WHERE r1.prodId < r2.prodId
GROUP BY r1.prodId, r2.prodId

Загрузка данных

hdfs dfs -put /home/ubuntu/_practice/movies.csv /dataset/movies.csv
hdfs dfs -put /home/ubuntu/_practice/links.csv /dataset/links.csv
hdfs dfs -put /home/ubuntu/_practice/ratings.csv /dataset/ratings.csv
hdfs dfs -put /home/ubuntu/_practice/tags.csv /dataset/tags.csv

In [18]:
#from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum

# Создаем Spark-сессию
spark = SparkSession.builder.appName("Movies shojest Marchuk").getOrCreate()

# Загружаем данные
ratings_path = "/dataset/ratings.csv"
movies_path = "/dataset/movies.csv"

ratings_df = spark.read.csv(ratings_path, header=True, inferSchema=True)
movies_df = spark.read.csv(movies_path, header=True, inferSchema=True)

In [30]:
from pyspark.sql.functions import col, lit, sum, sqrt, pow, when

# Создаем Spark-сессию
spark = SparkSession.builder.appName("Movie Similarity Marchuk").getOrCreate()

# Загружаем данные
ratings_path = "/dataset/ratings.csv"
movies_path = "/dataset/movies.csv"

ratings_df = spark.read.csv(ratings_path, header=True, inferSchema=True)
movies_df = spark.read.csv(movies_path, header=True, inferSchema=True)

# Удаляем дублирующие записи
ratings_df = ratings_df.dropDuplicates(["userId", "movieId"])

# Преобразуем данные в форматы удобные для расчетов
ratings_aliased = ratings_df.alias("r1").join(
    ratings_df.alias("r2"), on="userId"
).where(col("r1.movieId") < col("r2.movieId"))

similarity_df = ratings_aliased.groupBy("r1.movieId", "r2.movieId").agg(
    sum(col("r1.rating") * col("r2.rating")).alias("dot_product"),
    sqrt(sum(pow(col("r1.rating"), 2))).alias("norm_r1"),
    sqrt(sum(pow(col("r2.rating"), 2))).alias("norm_r2"),
).withColumn(
    "cosine_similarity",
    col("dot_product") / (col("norm_r1") * col("norm_r2"))
)

# Косинусное сходство
similarity_df.show(10, truncate=False)

25/01/08 18:25:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/01/08 18:25:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/08 18:25:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/08 18:25:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/08 18:25:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/08 18:25:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/08 18:25:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/08 18:25:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/01/08 18:25:54 WARN RowBasedKeyValueBatch: Calling spill()

+-------+-------+-----------+------------------+------------------+------------------+
|movieId|movieId|dot_product|norm_r1           |norm_r2           |cosine_similarity |
+-------+-------+-----------+------------------+------------------+------------------+
|1208   |2000   |561.5      |26.65520587052368 |21.857492994394395|0.9637566141949288|
|1348   |2139   |70.0       |7.533259586659682 |9.433981132056603 |0.9849634244894794|
|599    |1103   |82.5       |8.48528137423857  |10.161200716450788|0.956847375878929 |
|599    |920    |65.0       |7.937253933193772 |8.888194417315589 |0.9213603870522182|
|599    |3160   |99.0       |9.38083151964686  |11.090536506409418|0.9515712295346307|
|1721   |6377   |992.25     |29.62262648719725 |34.92134018046845 |0.9591944178885445|
|3896   |161582 |59.5       |7.365459931328117 |8.32165848854662  |0.970749567218328 |
|4963   |7323   |206.5      |14.38749456993816 |14.84082207965583 |0.9671123399067233|
|1259   |1895   |107.0      |11.01135777277

                                                                                

2. Для фильма с movieId равным 589 сформируйте RDD со значениями сходства с остальными фильмами

In [31]:
# фильтруем сходство с фильмом 589
movie_589_similarity = similarity_df.filter((col("r1.movieId") == 589) | (col("r2.movieId") == 589))

# Упрощаем movieId (оставляем только сравниваемые фильмы)
movie_589_similarity = movie_589_similarity.withColumn(
    "similar_movieId",
    when(col("r1.movieId") != 589, col("r1.movieId")).otherwise(col("r2.movieId"))
).select("similar_movieId", "cosine_similarity")

# Сходство с фильмом 589
movie_589_similarity.show(10, truncate=False)

+---------------+------------------+
|similar_movieId|cosine_similarity |
+---------------+------------------+
|4896           |0.9493927639264939|
|1391           |0.9264611057456877|
|54503          |0.9718665168836798|
|43928          |0.8642944121755471|
|5669           |0.9470464185036753|
|2580           |0.9719296158512897|
|12             |0.8575362261996874|
|7839           |1.0               |
|4808           |0.9959355660022436|
|5875           |1.0               |
+---------------+------------------+
only showing top 10 rows



                                                                                

3. Добавьте наименования фильмов.

In [32]:
# добавляем названия фильмов
movie_589_similarity_with_titles = movie_589_similarity.join(
    movies_df, movie_589_similarity["similar_movieId"] == movies_df["movieId"], "inner"
).select("title", "cosine_similarity")

# Промежуточный вывод: Сходство с названиями
movie_589_similarity_with_titles.show(10, truncate=False)

+----------------------------------------------------------------------------------------------+------------------+
|title                                                                                         |cosine_similarity |
+----------------------------------------------------------------------------------------------+------------------+
|Harry Potter and the Sorcerer's Stone (a.k.a. Harry Potter and the Philosopher's Stone) (2001)|0.9493927639264939|
|Mars Attacks! (1996)                                                                          |0.9264611057456877|
|Superbad (2007)                                                                               |0.9718665168836798|
|Ultraviolet (2006)                                                                            |0.8642944121755471|
|Bowling for Columbine (2002)                                                                  |0.9470464185036753|
|Go (1999)                                                              

4. Выведите топ-10 наиболее похожих фильмов.

In [33]:
top_10_similar_movies = movie_589_similarity_with_titles.orderBy(
    col("cosine_similarity").desc()
).limit(10)

# Итоговый вывод
top_10_similar_movies.show(10, truncate=False)

+----------------------------------------------------+------------------+
|title                                               |cosine_similarity |
+----------------------------------------------------+------------------+
|Hoffa (1992)                                        |1.0000000000000002|
|Coal Miner's Daughter (1980)                        |1.0000000000000002|
|Rare Exports: A Christmas Tale (Rare Exports) (2010)|1.0000000000000002|
|Man Who Fell to Earth, The (1976)                   |1.0000000000000002|
|Red Corner (1997)                                   |1.0000000000000002|
|Stunt Man, The (1980)                               |1.0000000000000002|
|Death at a Funeral (2007)                           |1.0000000000000002|
|Abandoned, The (2006)                               |1.0               |
|Cranes Are Flying, The (Letyat zhuravli) (1957)     |1.0               |
|Pyromaniac's Love Story, A (1995)                   |1.0               |
+-------------------------------------