In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 60.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=e190c33c2d6a4c351964828fad89adfbbe735090f6955fc0d5d11cca6570f1c1
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master('local[4]')\
        .appName('Lesson_3')\
        .config('spark.ui.port', '4050')\
        .config('spark.executor.instances', 2)\
        .config('spark.executor.memory', '5g')\
        .config('spark.executor.cores', 2)\
        .getOrCreate()
sc = spark.sparkContext

## Самостоятельная работа к уроку 3

## Данные: [google drive: raw_sales.csv](https://drive.google.com/file/d/1G2N7Mnt4-Tqz4JdJxutGDMbJiOr32kZp/view?usp=sharing)

 Каждая строчка это продажа жилья, которая состоит из следующих полей:
*   date of sale
*   price
*   property type
*   number of bedrooms
*   4digit postcode

In [3]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

data = spark.read.csv('raw_sales.csv', header=True, inferSchema=True)

In [4]:
data.show(5)

+-------------------+--------+------+------------+--------+
|           datesold|postcode| price|propertyType|bedrooms|
+-------------------+--------+------+------------+--------+
|2007-07-08 00:00:00|    2600|327000|       house|       1|
|2007-08-16 00:00:00|    2600|790000|       house|       4|
|2007-12-05 00:00:00|    2600|825000|       house|       3|
|2008-01-21 00:00:00|    2600|315000|        unit|       1|
|2008-04-24 00:00:00|    2600|292500|       house|       1|
+-------------------+--------+------+------------+--------+
only showing top 5 rows



### Задание 1
Добавим к таблице следующие поля:

- Средняя стомость 10 проданных домов до текущего в том же районе (4digit postcode)
- Средняя стомость 10 проданных домов после текущего в том же районе (4digit postcode)
- Стоимость последнего проданного дома до текущего

In [5]:
from pyspark.sql.window import Window

windSpec = Window.partitionBy('postcode').orderBy('datesold')\

list_1 = data.withColumn('up_to_current', F.avg('price').over(windSpec.rowsBetween(-10, Window.currentRow - 1)))\
    .withColumn('after_current', F.avg('price').over(windSpec.rowsBetween(Window.currentRow, 9)))\
    .withColumn('last_sold', F.last('price').over(windSpec.rowsBetween(-1, -1)))

In [6]:
list_1.show(11)

+-------------------+--------+-------+------------+--------+-----------------+-------------+---------+
|           datesold|postcode|  price|propertyType|bedrooms|    up_to_current|after_current|last_sold|
+-------------------+--------+-------+------------+--------+-----------------+-------------+---------+
|2007-07-08 00:00:00|    2600| 327000|       house|       1|             null|     669050.0|     null|
|2007-08-16 00:00:00|    2600| 790000|       house|       4|         327000.0|     708350.0|   327000|
|2007-12-05 00:00:00|    2600| 825000|       house|       3|         558500.0|     698350.0|   790000|
|2008-01-21 00:00:00|    2600| 315000|        unit|       1|647333.3333333334|     679350.0|   825000|
|2008-04-24 00:00:00|    2600| 292500|       house|       1|         564250.0|     742850.0|   315000|
|2008-05-30 00:00:00|    2600| 329000|        unit|       2|         509900.0|     786600.0|   292500|
|2008-06-19 00:00:00|    2600| 765000|       house|       5|         4797

In [21]:
windSpec = Window.partitionBy('postcode').orderBy('datesold')\


table_1 = data.withColumn('up_to_current', F.avg('price').over(windSpec.rowsBetween(-10, Window.currentRow - 1)))\
    .withColumn('after_current', F.avg('price').over(windSpec.rowsBetween(Window.currentRow, 9)))\
    .withColumn('last_sold', F.last('price').over(windSpec.rowsBetween(-1, -1)))

## Задание 2

In [7]:
# Найдем среднюю цену жилья для каждого года
list_2 = data.withColumn('year', F.year('datesold')).groupBy('year').agg(F.avg('price').alias('avg_price_year'))

In [11]:
# Приджойним эти данные к таблице из задания 1
result = list_1.join(list_2, on = [F.year(list_1.datesold) == (list_2.year)], how = 'left')

In [12]:
result.show(30)

+-------------------+--------+-------+------------+--------+-----------------+-------------+---------+----+-----------------+
|           datesold|postcode|  price|propertyType|bedrooms|    up_to_current|after_current|last_sold|year|   avg_price_year|
+-------------------+--------+-------+------------+--------+-----------------+-------------+---------+----+-----------------+
|2007-07-08 00:00:00|    2600| 327000|       house|       1|             null|     669050.0|     null|2007|522377.2108843537|
|2007-08-16 00:00:00|    2600| 790000|       house|       4|         327000.0|     708350.0|   327000|2007|522377.2108843537|
|2007-12-05 00:00:00|    2600| 825000|       house|       3|         558500.0|     698350.0|   790000|2007|522377.2108843537|
|2008-01-21 00:00:00|    2600| 315000|        unit|       1|647333.3333333334|     679350.0|   825000|2008|493814.1627543036|
|2008-04-24 00:00:00|    2600| 292500|       house|       1|         564250.0|     742850.0|   315000|2008|493814.1627

## Задание 3
В итоге у нас таблица с колонками:
*   price
*   Среднегодовая цена
*  Средняя стомость 10 проданных домов до текущего в том же районе (4digit postcode)
*  Средняя стомость 10 проданных домов после текущего в том же районе (4digit postcode)
*  Стоимость последнего проданного дома до текущего

Посчитаем кол-во уникальных значений в каждой строчке (unique(row)).

In [10]:
@F.udf(returnType=IntegerType())
def get_unique(value):
  return len(set(value))

result.withColumn('unique_el', get_unique(F.array(result.columns))).show(10)

+-------------------+--------+-------+------------+--------+-----------------+-------------+---------+----+-----------------+---------+
|           datesold|postcode|  price|propertyType|bedrooms|    up_to_current|after_current|last_sold|year|   avg_price_year|unique_el|
+-------------------+--------+-------+------------+--------+-----------------+-------------+---------+----+-----------------+---------+
|2007-07-08 00:00:00|    2600| 327000|       house|       1|             null|     669050.0|     null|2007|522377.2108843537|        9|
|2007-08-16 00:00:00|    2600| 790000|       house|       4|         327000.0|     708350.0|   327000|2007|522377.2108843537|       10|
|2007-12-05 00:00:00|    2600| 825000|       house|       3|         558500.0|     698350.0|   790000|2007|522377.2108843537|       10|
|2008-01-21 00:00:00|    2600| 315000|        unit|       1|647333.3333333334|     679350.0|   825000|2008|493814.1627543036|       10|
|2008-04-24 00:00:00|    2600| 292500|       hou

In [13]:
spark.stop()