In [169]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lag
from pyspark.sql.functions import when
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
from pyspark.sql.functions import round

spark = SparkSession.builder\
    .master("local[2]")\
    .appName("Lesson_2")\
    .config("spark.executor.instances",2)\
    .config("spark.executor.memory",'2g')\
    .config("spark.executor.cores",1)\
    .getOrCreate()


# Самостоятельная работа к уроку 4
На уроке мы попробовали оконные и пользовательские функции. Теперь закрепим полученные знания.



## Данные: [google drive: raw_sales.csv](https://drive.google.com/file/d/1G2N7Mnt4-Tqz4JdJxutGDMbJiOr32kZp/view?usp=sharing)

 Каждая строчка это продажа жилья, которая состоит из следующих полей (думаю описание не требуется):
*   date of sale
*   price
*   property type
*   number of bedrooms
*   4digit postcode


In [29]:
data = spark.read.csv('raw_sales.csv', header=True, inferSchema=True)
data = data.select(data.datesold.cast('date').alias('datesold'), 'postcode', 'price', 'propertyType', 'bedrooms')
data.printSchema()

root
 |-- datesold: date (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- propertyType: string (nullable = true)
 |-- bedrooms: integer (nullable = true)




## Задание 1
Добавьте к таблице следующие поля:
*  Средняя стомость 10 проданных домов до текущего в том же районе (4digit postcode) (1 балл)
*  Средняя стомость 10 проданных домов после текущего в том же районе (4digit postcode) (1 балл)
*  Стоимость последнего проданного дома до текущего (1 балл)


In [116]:
data.createOrReplaceTempView('window')
w = spark.sql('''SELECT *, AVG(price)
                        OVER (PARTITION BY postcode 
                        ORDER BY datesold
                        ROWS BETWEEN 10 PRECEDING AND 1 PRECEDING ) 
                        AS last10_avg_price,
                        AVG(price) 
                        OVER (PARTITION BY postcode
                        ORDER BY datesold
                        ROWS BETWEEN 1 FOLLOWING AND 10 FOLLOWING )
                        AS fw10_avg_price, 
                        LAG(price) 
                        OVER (PARTITION BY postcode 
                        ORDER BY datesold) AS prev_price
                        FROM window''')


w.select('datesold', 'postcode', 
                  'propertyType', 'bedrooms', 
                  'price', round('last10_avg_price', 0).alias('last10_avg_price'), 
                  round('fw10_avg_price', 0).alias('fw_10_avg_price'), 
                  'window.prev_price').show(n=15)

+----------+--------+------------+--------+------+----------------+---------------+----------+
|  datesold|postcode|propertyType|bedrooms| price|last10_avg_price|fw_10_avg_price|prev_price|
+----------+--------+------------+--------+------+----------------+---------------+----------+
|2007-07-02|    2914|       house|       5|800000|            null|       502800.0|      null|
|2008-06-17|    2914|       house|       4|600000|        800000.0|       486800.0|    800000|
|2008-08-29|    2914|       house|       4|465000|        700000.0|       487800.0|    600000|
|2008-09-02|    2914|       house|       4|541000|        621667.0|       481450.0|    465000|
|2008-09-05|    2914|       house|       3|395000|        601500.0|       495950.0|    541000|
|2008-09-05|    2914|       house|       4|552000|        560200.0|       500750.0|    395000|
|2008-09-17|    2914|       house|       3|410000|        558833.0|       505350.0|    552000|
|2008-09-26|    2914|       house|       4|755000|

In [151]:
# a = spark.sql("""SELECT year(datesold) year, round(avg(price), 0) average_price FROM window group by year""")
# b = spark.sql("""SELECT *, YEAR(datesold) year FROM window""")
# a.join(b, ['year'], how='full').show()

In [111]:
window = Window.partitionBy('postcode').orderBy('datesold')

data = data.withColumn('last10_avg', avg('price')\
        .over(window.rowsBetween(Window.currentRow - 10, Window.currentRow - 1)))\
    .withColumn('fw10_avg', avg('price')\
        .over(window.rowsBetween(Window.currentRow + 1, Window.currentRow + 10)))\
    .withColumn('prev_price', lag('price')\
        .over(window))
 
    
data.show(n=10)

+----------+--------+------+------------+--------+-----------------+--------+----------+
|  datesold|postcode| price|propertyType|bedrooms|       last10_avg|fw10_avg|prev_price|
+----------+--------+------+------------+--------+-----------------+--------+----------+
|2007-07-02|    2914|800000|       house|       5|             null|502800.0|      null|
|2008-06-17|    2914|600000|       house|       4|         800000.0|486800.0|    800000|
|2008-08-29|    2914|465000|       house|       4|         700000.0|487800.0|    600000|
|2008-09-02|    2914|541000|       house|       4|621666.6666666666|481450.0|    465000|
|2008-09-05|    2914|395000|       house|       3|         601500.0|495950.0|    541000|
|2008-09-05|    2914|552000|       house|       4|         560200.0|500750.0|    395000|
|2008-09-17|    2914|410000|       house|       3|558833.3333333334|505350.0|    552000|
|2008-09-26|    2914|755000|       house|       4|537571.4285714285|474250.0|    410000|
|2008-10-14|    2914|



## Задание 2
В итоге у вас таблица с колонками (или нечто похожее):
*   price
*  Средняя стомость 10 проданных домов до текущего в том же районе (4digit postcode) (1 балл)
*  Средняя стомость 10 проданных домов после текущего в том же районе (4digit postcode) (1 балл)
*  Стоимость последнего проданного дома до текущего ((1 балл)
*  и др.

Посчитайте кол-во уникальных значений в каждой строчке (unique(row)). (2 балла)


In [158]:
print('UNIQUE COUNTS by column:')
for row in data.columns:
    cnt = data.select(row).distinct().count()
    print(row, ':', cnt)

UNIQUE COUNTS by column:




datesold : 3582




postcode : 27




price : 2554




propertyType : 2




bedrooms : 6




last10_avg : 16097




fw10_avg : 16100




prev_price : 2553





## Задание 3
SQL like case when или if elif else

Создайте колонку, в которой в которой будет отображаться "+", "-" или "=", если "Средняя стомость 10 проданных домов до текущего в том же районе" больше, меньше или равно "Средняя стомость 10 проданных домов после текущего в том же районе (4digit postcode)", соотвественно.

Если одно из полей Null, запишите в эту колонку "Нет данных"

In [168]:
data = data.withColumn("rolling_avg_comp", when(data.last10_avg > data.fw10_avg, '+')
                                          .when(data.last10_avg < data.fw10_avg, '-')
                                          .when(data.last10_avg == data.fw10_avg, '=')
                                          .otherwise('Нет данных')).show()

+----------+--------+------+------------+--------+-----------------+--------+----------+----------------+
|  datesold|postcode| price|propertyType|bedrooms|       last10_avg|fw10_avg|prev_price|rolling_avg_comp|
+----------+--------+------+------------+--------+-----------------+--------+----------+----------------+
|2007-07-02|    2914|800000|       house|       5|             null|502800.0|      null|      Нет данных|
|2008-06-17|    2914|600000|       house|       4|         800000.0|486800.0|    800000|               +|
|2008-08-29|    2914|465000|       house|       4|         700000.0|487800.0|    600000|               +|
|2008-09-02|    2914|541000|       house|       4|621666.6666666666|481450.0|    465000|               +|
|2008-09-05|    2914|395000|       house|       3|         601500.0|495950.0|    541000|               +|
|2008-09-05|    2914|552000|       house|       4|         560200.0|500750.0|    395000|               +|
|2008-09-17|    2914|410000|       house|     