# Условие:
- Загрузите датасет по ценам на жилье Airbnb, доступный на kaggle.com: https://www.kaggle.com/dgomonov/new-york-city-airbnb-open-data
- Подсчитайте среднее значение и дисперсию по признаку ”price” в hive. Используя Python, реализуйте скрипт mapper.py и reducer.py для расчета
- Проверьте правильность подсчета статистики методом mapreduce в сравнении со hive.

## 1. Установим соединение со SPARK

In [1]:
from pyspark.sql import SparkSession

# Настройка Spark для работы с Hive
spark = SparkSession \
    .builder \
    .appName("pyspark-notebook") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "512m") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

24/10/21 07:51:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## 2. Загрузим датасет по ценам на жилье Airbnb, доступный на kaggle.com из CSV файла

In [2]:
# Чтение данных с помощью Spark в DataFrame
data = spark.read.csv(path="AB_NYC_2019.csv", header=True, inferSchema=True)
# Отобразим структуру данных для проверки
data.printSchema()



root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: integer (nullable = true)



                                                                                

In [3]:
# Проверка данных
data.show(5)

+----+--------------------+-------+-----------+-------------------+-------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|  id|                name|host_id|  host_name|neighbourhood_group|neighbourhood|latitude|longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|
+----+--------------------+-------+-----------+-------------------+-------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|2539|Clean & quiet apt...|   2787|       John|           Brooklyn|   Kensington|40.64749|-73.97237|   Private room|  149|             1|                9| 2018-10-19|             0.21|                             6|             365|
|2595|Skylit Midtown Ca...|   2845|   Jennifer|          Manhatt

## 3. Подсчитаем среднее значение и дисперсию по признаку ”price”

### 3.1. С использованием pyspark.sql.functions

In [4]:
# Расчет среднего значения и дисперсии по признаку ”price” в spark.

from pyspark.sql.functions import mean, variance

# Выбор нужного столбца "price" для расчета статистики
price_data = data.select("price").filter(data.price > 0)

# Вычисление среднего значения и дисперсии
price_stats = price_data.agg(
    mean("price").alias("mean_price"),
    variance("price").alias("variance_price")
)

# Вывод результата
price_stats.show()

+------------------+------------------+
|        mean_price|    variance_price|
+------------------+------------------+
|152.26648115562466|56909.864820032206|
+------------------+------------------+



### 3.2. С использованием функций mapper и reducer, реализованных в python:

- Mapper: Парсит строки из CSV (мы используем библиотеку csv для этого) и выбирает поле price. Возвращает кортежи с ключом "price", значением и счётчиком.
- Reducer: Суммирует значения для расчета среднего и дисперсии. Мы сначала собираем результаты из RDD (mapped_rdd.collect()), а затем применяем reducer для окончательного вычисления.

In [5]:
import csv
from io import StringIO

# Фильтрация строк с положительным значением 'price' и преобразование в RDD
# rdd_data = data.select("price").rdd.filter(lambda row: row.price is not None and row.price > 0)
# Преобразуем столбец 'price' в тип float и фильтруем строки, где 'price' больше 0
rdd_data = data.select(data["price"].cast("float")).rdd.filter(lambda row: row.price is not None and row.price > 0)


# Mapper функция, которая возвращает данные в формате (ключ, (значение, count, квадрат значения))
def mapper(row):
    price = row.price
    return ("price", (price, 1, price ** 2))


# Reducer функция для суммирования значений
def reducer(accum, new_val):
    # accum = (сумма цен, количество, сумма квадратов)
    total_price = accum[0] + new_val[0]
    total_count = accum[1] + new_val[1]
    sum_of_squares = accum[2] + new_val[2]
    return (total_price, total_count, sum_of_squares)


# Применение mapper
mapped_rdd = rdd_data.map(mapper)

# Агрегация результатов с помощью reduceByKey
reduced_rdd = mapped_rdd.reduceByKey(reducer)


# Применение финальной обработки для вычисления среднего и дисперсии
def final_calculation(data):
    total_price, total_count, sum_of_squares = data[1]
    mean_price = total_price / total_count
    variance = (sum_of_squares / total_count) - (mean_price ** 2)
    return {"mean_price": mean_price, "variance": variance}


# Собираем результаты
final_result = reduced_rdd.map(final_calculation).collect()

# Вывод результатов
print(f"Средняя цена: {final_result[0]['mean_price']}")
print(f"Дисперсия цены: {final_result[0]['variance']}")

Средняя цена: 152.26648115562466
Дисперсия цены: 56908.7003999965


### 3.1. С использованием SQL-запроса (чтение данных из Hive):

#### Сохраняем данные в БД

In [6]:
# Удаление таблицы, если она существует и сохранение данных в таблицу ab_nyc_2019
spark.sql("DROP TABLE IF EXISTS ab_nyc_2019")

data.write.saveAsTable('ab_nyc_2019')

                                                                                

#### Получаем данные из БД sql-запросом

CAST(price AS DOUBLE) >= 0 — Это условие гарантирует, что будут учитываться только строки, где price является числовым значением и больше либо равно нулю.
Другие условия:
price IS NOT NULL — Условие для исключения строк, где price пустой.
price != '' — Условие для исключения пустых строк.
price RLIKE '^[0-9]+$' — Используем регулярное выражение для фильтрации строк, содержащих только цифры.

In [7]:
# Запрос в Spark SQL, который фильтрует строки с числовыми значениями в поле price
# для проверки корректных данных и вычисления средней цены и дисперсии
query = """
SELECT 
    COUNT(*) AS total_rows, 
    COUNT(CASE WHEN CAST(price AS DOUBLE) > 0 THEN 1 END) AS valid_prices, 
    SUM(CASE WHEN (price IS NOT NULL AND price != '' AND price RLIKE '^[0-9]+$' AND CAST(price AS DOUBLE) > 0) 
             THEN CAST(price AS DOUBLE) 
             END) AS total_price,
    AVG(CASE WHEN CAST(price AS DOUBLE) > 0 THEN CAST(price AS DOUBLE) END) AS avg_price,
    -- Дисперсия: (среднее квадратичное отклонение минус квадрат среднего значения)
    (SUM(CASE WHEN CAST(price AS DOUBLE) > 0 THEN (CAST(price AS DOUBLE) - 
    (SELECT AVG(CAST(price AS DOUBLE)) FROM ab_nyc_2019 WHERE CAST(price AS DOUBLE) > 0)) * 
    (CAST(price AS DOUBLE) - 
    (SELECT AVG(CAST(price AS DOUBLE)) FROM ab_nyc_2019 WHERE CAST(price AS DOUBLE) > 0)) END) / 
    COUNT(CASE WHEN CAST(price AS DOUBLE) > 0 THEN 1 END)) AS variance_price
FROM ab_nyc_2019;
"""

hive_df = spark.sql(query)
hive_df.show()

+----------+------------+-----------+------------------+-----------------+
|total_rows|valid_prices|total_price|         avg_price|   variance_price|
+----------+------------+-----------+------------------+-----------------+
|     49079|       48874|  7441872.0|152.26648115562466|56908.70039999702|
+----------+------------+-----------+------------------+-----------------+

