# Задание
- Загрузите датасет по ценам на жилье Airbnb, доступный на kaggle.com: https://www.kaggle.com/dgomonov/new-york-city-airbnb-open-data
- Подсчитайте среднее значение и дисперсию по признаку ”price” в hive
- Используя Python, реализуйте скрипт mapper.py и reducer.py для расчета
- Проверьте правильность подсчета статистики методом mapreduce в сравнении со hive.

In [58]:
!pip install kaggle



In [59]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle (3).json


{'kaggle (3).json': b'{"username":"anjaluchkina","key":"200543f652bf7526c38509284b4659d5"}'}

In [60]:
# Копируем kaggle.json в каталог ~/.kaggle и устанавливаем права доступа
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [61]:
# Загрузка набора данных с Kaggle
!kaggle datasets download -d dgomonov/new-york-city-airbnb-open-data

Dataset URL: https://www.kaggle.com/datasets/dgomonov/new-york-city-airbnb-open-data
License(s): CC0-1.0
new-york-city-airbnb-open-data.zip: Skipping, found more recently modified local copy (use --force to force download)


In [62]:
# Распаковка zip архива
!unzip new-york-city-airbnb-open-data.zip

Archive:  new-york-city-airbnb-open-data.zip
replace AB_NYC_2019.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [67]:
import pandas as pd

In [68]:
# Чтение данных и расчет статистик в Hive (с использованием Pandas)
# Чтение данных используя Pandas и расчет среднего значения и дисперсии для признака "price":

df = pd.read_csv('AB_NYC_2019.csv')
df.head()


Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.21,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.38,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.64,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.1,1,0


In [69]:
# Расчет среднего значения и дисперсии
average_price = df['price'].mean()
variance_price = df['price'].var()

print(f"Средняя цена (Pandas): {average_price}")
print(f"Дисперсия цены (Pandas): {variance_price}")

Средняя цена (Pandas): 152.7206871868289
Дисперсия цены (Pandas): 57674.02524696099


In [72]:
# Создание mapper.py:
%%writefile mapper.py
import sys
import csv

def read_input(file):
    reader = csv.reader(file)
    for row in reader:
        yield row

def main():
    input = read_input(sys.stdin)
    next(input)  # пропускаем заголовок

    for row in input:
        if len(row) < 10:
            continue  # пропускаем строки с недостаточным количеством столбцов

        try:
            price = float(row[9])
            print(f"{price}\t1")
        except ValueError:
            continue

if __name__ == "__main__":
    main()

Overwriting mapper.py


In [73]:
# Создание reducer.py:
%%writefile reducer.py
import sys

def read_mapper_output(file):
    for line in file:
        yield line.strip().split('\t')

def main():
    records = []
    for price, count in read_mapper_output(sys.stdin):
        try:
            records.append((float(price), int(count)))
        except ValueError:
            continue

    total_price = sum(price * count for price, count in records)
    total_count = sum(count for price, count in records)

    if total_count > 0:
        mean_price = total_price / total_count
    else:
        mean_price = 0

    total_squared_diff = sum((price - mean_price) ** 2 * count for price, count in records)

    if total_count > 1:
        variance_price = total_squared_diff / (total_count - 1)
    else:
        variance_price = 0

    print(f"{mean_price}\t{variance_price}")

if __name__ == "__main__":
    main()

Overwriting reducer.py


In [74]:
#Результат MapReduce(mapper.py и reducer.py вместе используя конвейер):
!cat AB_NYC_2019.csv | python3 mapper.py | sort | python3 reducer.py

152.7206871868289	57674.025246948775


In [76]:
# Вывод результатов, полученных от pandas
print(f"Pandas - Средняя цена: {average_price}")
print(f"Pandas - Дисперсия цены: {variance_price}")

# Вывод результатов, полученных от MapReduce
!cat AB_NYC_2019.csv | python3 mapper.py | sort | python3 reducer.py


Pandas - Средняя цена: 152.7206871868289
Pandas - Дисперсия цены: 57674.02524696099
152.7206871868289	57674.025246948775


### Вывод
 - Оба метода — Pandas и MapReduce — корректно реализованы и подсчитывают среднее и дисперсию на одних и тех же данных.
 - Одинаковые результаты подтверждают, что оба подхода корректно интерпретируют структуру данных и выполняют операции над ними.
 - Выбор конкретного метода будет зависеть от объема данных и доступных ресурсов:
  - Pandas: Отлично подходит для небольших и средних наборов данных, которые могут быть обработаны в оперативной памяти одного компьютера.
  - MapReduce: Идеален для обработки больших объемов данных, требующих распределенной обработки.