In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json,col
from pyspark.sql.types import *
from os.path import abspath

spark = SparkSession\
        .builder\
        .appName("pyspark-notebook")\
        .master("spark://spark-master:7077")\
        .config("spark.executor.memory", "512m")\
        .config("hive.metastore.uris", "thrift://hive-metastore:9083")\
        .config("spark.sql.warehouse.dir", "/user/hive/warehouse")\
        .enableHiveSupport()\
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

## 3. The Data

In [2]:
data = spark.read.csv("AB_NYC_2019.csv", sep=",", header=True, inferSchema=True)
data.show(5)


+----+--------------------+-------+-----------+-------------------+-------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|  id|                name|host_id|  host_name|neighbourhood_group|neighbourhood|latitude|longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|
+----+--------------------+-------+-----------+-------------------+-------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|2539|Clean & quiet apt...|   2787|       John|           Brooklyn|   Kensington|40.64749|-73.97237|   Private room|  149|             1|                9| 2018-10-19|             0.21|                             6|             365|
|2595|Skylit Midtown Ca...|   2845|   Jennifer|          Manhatt

In [3]:
data.describe

<bound method DataFrame.describe of DataFrame[id: string, name: string, host_id: string, host_name: string, neighbourhood_group: string, neighbourhood: string, latitude: string, longitude: string, room_type: string, price: string, minimum_nights: string, number_of_reviews: string, last_review: string, reviews_per_month: string, calculated_host_listings_count: string, availability_365: int]>

In [4]:
data.columns

['id',
 'name',
 'host_id',
 'host_name',
 'neighbourhood_group',
 'neighbourhood',
 'latitude',
 'longitude',
 'room_type',
 'price',
 'minimum_nights',
 'number_of_reviews',
 'last_review',
 'reviews_per_month',
 'calculated_host_listings_count',
 'availability_365']

In [5]:
data.count()

49079

In [None]:
data.write.saveAsTable('hive_price_table')

In [None]:
ndf= spark.sql("""
select * from hive_price_table
""")
ndf.show(10)

In [None]:
avg_variance_price=spark.sql("""
SELECT AVG(price) asavg_price,
VARIANCE(price) AS variance_price
FROM hive_price_table
WHERE price IS NOT NULL;
""")
print(avg_variance_price.head())

In [3]:

import time
from joblib import Parallel, delayed, cpu_count
from mapper import mapper
from reducer import reducer
import numpy as np

In [2]:
%%time

# Загружаем данные
path = "AB_NYC_2019.csv"
prices_pandas = mapper(path)

# Разделяем данные на N частей
N = cpu_count()
chunks = np.array_split(prices_pandas, N)

# Запускаем reducer параллельно для каждой части
results = Parallel(n_jobs=N)(delayed(reducer)(chunk) for chunk in chunks)

# Разбираем результаты
total_count = sum(r[2] for r in results)  # Всего элементов
overall_mean = sum(r[0] * r[2] for r in results) / total_count  # Общее среднее

# Общая дисперсия
variance_price = (
    sum(r[2] * r[1] for r in results)  # ∑(n_i * σ_i^2)
    + sum(r[2] * (r[0] - overall_mean) ** 2 for r in results)  # ∑ n_i * (x̄_i - x̄)^2
) / total_count

# Итоговое среднее
avg_price = overall_mean

print(f'avg_price={avg_price}, variance_price={variance_price}')
print(f'Currents count={N}')


avg_price=152.232218341763, variance_price=56901.13550261286
Currents count=8
CPU times: user 309 ms, sys: 156 ms, total: 465 ms
Wall time: 13.7 s
