In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m24.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=0e74b9dc220ed2b5cb29a64b980d99be1a9117d1ea56bafab524535bb5c26958
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
from pyspark.sql import SparkSession


spark = SparkSession.builder\
    .master("local[2]")\
    .appName("Lesson_5")\
    .config("spark.executor.instances",2)\
    .config("spark.executor.memory",'2g')\
    .config("spark.executor.cores",1)\
    .getOrCreate()
    
sc = spark.sparkContext

# Оконные функции

Оконная функция - функция, которая работает с выделенным набором строк (окном, партицией) и выполняет вычисление для этого набора строк в отдельном столбце. 

Партиции (окна из набора строк) - это набор строк, указанный для оконной функции по одному из столбцов или группе столбцов таблицы. Партиции для каждой оконной функции в запросе могут быть разделены по различным колонкам таблицы.

In [None]:
# Предположим, что есть таблицы:
from pyspark.sql import functions as F
from pyspark.sql.types import *

data =[
    ("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"),\
    ("Orange",2000,"USA"), ("Orange",2000,"USA"), ("Banana",400,"China"),\
    ("Carrots",1200,"China"), ("Beans",1500,"China"), ("Orange",4000,"China"),\
    ("Banana",2000,"Canada"), ("Carrots",2000,"Canada"), ("Beans",2000,"Mexico")
    ]
columns = ["Product","Amount", "Country"]

df = spark.createDataFrame(data=data, schema=columns)
df.printSchema()
df.show(truncate=False)

root
 |-- Product: string (nullable = true)
 |-- Amount: long (nullable = true)
 |-- Country: string (nullable = true)

+-------+------+-------+
|Product|Amount|Country|
+-------+------+-------+
|Banana |1000  |USA    |
|Carrots|1500  |USA    |
|Beans  |1600  |USA    |
|Orange |2000  |USA    |
|Orange |2000  |USA    |
|Banana |400   |China  |
|Carrots|1200  |China  |
|Beans  |1500  |China  |
|Orange |4000  |China  |
|Banana |2000  |Canada |
|Carrots|2000  |Canada |
|Beans  |2000  |Mexico |
+-------+------+-------+



In [None]:
# в sql оконная функция записывается следующим образом
df.registerTempTable('df')

spark.sql('''
select *, 
row_number() over( partition by Country order by Amount ) as rn from df
''').show()



+-------+------+-------+---+
|Product|Amount|Country| rn|
+-------+------+-------+---+
| Banana|  2000| Canada|  1|
|Carrots|  2000| Canada|  2|
| Banana|   400|  China|  1|
|Carrots|  1200|  China|  2|
|  Beans|  1500|  China|  3|
| Orange|  4000|  China|  4|
|  Beans|  2000| Mexico|  1|
| Banana|  1000|    USA|  1|
|Carrots|  1500|    USA|  2|
|  Beans|  1600|    USA|  3|
| Orange|  2000|    USA|  4|
| Orange|  2000|    USA|  5|
+-------+------+-------+---+



In [None]:
# в спарке следующим образом
from pyspark.sql import Window


windSpec = Window()\
    .partitionBy('Country')\
    .orderBy('Amount')
    # .rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)

df.withColumn('rn', F.row_number().over(windSpec)).show()

+-------+------+-------+---+
|Product|Amount|Country| rn|
+-------+------+-------+---+
| Banana|  2000| Canada|  1|
|Carrots|  2000| Canada|  2|
| Banana|   400|  China|  1|
|Carrots|  1200|  China|  2|
|  Beans|  1500|  China|  3|
| Orange|  4000|  China|  4|
|  Beans|  2000| Mexico|  1|
| Banana|  1000|    USA|  1|
|Carrots|  1500|    USA|  2|
|  Beans|  1600|    USA|  3|
| Orange|  2000|    USA|  4|
| Orange|  2000|    USA|  5|
+-------+------+-------+---+



# Самостоятельная работа к уроку
На уроке мы попробовали оконные и пользовательские функции. Теперь закрепим полученные знания.

## Данные: [google drive: raw_sales.csv](https://drive.google.com/file/d/1G2N7Mnt4-Tqz4JdJxutGDMbJiOr32kZp/view?usp=sharing)

 Каждая строчка это продажа жилья, которая состоит из следующих полей (думаю описание не требуется):
*   date of sale
*   price
*   property type
*   number of bedrooms
*   4digit postcode

In [None]:
!wget 'https://drive.google.com/uc?export=download&id=1xbtFBPz50OBoyYFVGd-B03pCTJdCax07' -O raw_sales.csv

--2023-01-23 12:12:59--  https://drive.google.com/uc?export=download&id=1xbtFBPz50OBoyYFVGd-B03pCTJdCax07
Resolving drive.google.com (drive.google.com)... 173.194.217.100, 173.194.217.139, 173.194.217.113, ...
Connecting to drive.google.com (drive.google.com)|173.194.217.100|:443... connected.
HTTP request sent, awaiting response... 303 See Other
Location: https://doc-0c-5k-docs.googleusercontent.com/docs/securesc/ha0ro937gcuc7l7deffksulhg5h7mbp1/98tto341fq2o32hfickf06864c3c2rmh/1674475950000/04099736791713398091/*/1xbtFBPz50OBoyYFVGd-B03pCTJdCax07?e=download&uuid=fc78e771-fe5b-4a7f-b34d-0919fb974561 [following]
--2023-01-23 12:12:59--  https://doc-0c-5k-docs.googleusercontent.com/docs/securesc/ha0ro937gcuc7l7deffksulhg5h7mbp1/98tto341fq2o32hfickf06864c3c2rmh/1674475950000/04099736791713398091/*/1xbtFBPz50OBoyYFVGd-B03pCTJdCax07?e=download&uuid=fc78e771-fe5b-4a7f-b34d-0919fb974561
Resolving doc-0c-5k-docs.googleusercontent.com (doc-0c-5k-docs.googleusercontent.com)... 142.251.162.132, 

In [None]:
df = spark.read.csv('raw_sales.csv', header=True, inferSchema=True)
df.printSchema()
df.show(truncate=False)

root
 |-- datesold: timestamp (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- propertyType: string (nullable = true)
 |-- bedrooms: integer (nullable = true)

+-------------------+--------+-------+------------+--------+
|datesold           |postcode|price  |propertyType|bedrooms|
+-------------------+--------+-------+------------+--------+
|2007-02-07 00:00:00|2607    |525000 |house       |4       |
|2007-02-27 00:00:00|2906    |290000 |house       |3       |
|2007-03-07 00:00:00|2905    |328000 |house       |3       |
|2007-03-09 00:00:00|2905    |380000 |house       |4       |
|2007-03-21 00:00:00|2906    |310000 |house       |3       |
|2007-04-04 00:00:00|2905    |465000 |house       |4       |
|2007-04-24 00:00:00|2607    |399000 |house       |3       |
|2007-04-30 00:00:00|2606    |1530000|house       |4       |
|2007-05-24 00:00:00|2902    |359000 |house       |3       |
|2007-05-25 00:00:00|2906    |320000 |house       |3   

## Задание 1
Добавьте к таблице следующие поля:
*  Средняя стомость 10 проданных домов до текущего в том же районе (4digit postcode)
*  Средняя стомость 10 проданных домов после текущего в том же районе (4digit postcode)
*  Стоимость последнего проданного дома до текущего


In [None]:
from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import col, asc,desc

windSpec_before = Window()\
    .partitionBy('postcode')\
    .orderBy('datesold')

df = df.withColumn('average_price_before', F.avg('price').over(windSpec_before))

In [None]:
windSpec_after = Window()\
    .partitionBy('postcode')\
    .orderBy(col('datesold').desc())

df = df.withColumn('average_price_after', F.avg('price').over(windSpec_after))

In [None]:
df = df.withColumn('previous_sold_price', F.lag('price', 1).over(windSpec_before))

In [None]:
df.show(10)

+-------------------+--------+-------+------------+--------+--------------------+-------------------+-------------------+
|           datesold|postcode|  price|propertyType|bedrooms|average_price_before|average_price_after|previous_sold_price|
+-------------------+--------+-------+------------+--------+--------------------+-------------------+-------------------+
|2007-07-08 00:00:00|    2600| 327000|       house|       1|            327000.0| 1028204.3785488959|               null|
|2007-08-16 00:00:00|    2600| 790000|       house|       4|            558500.0| 1029312.1263823065|             327000|
|2007-12-05 00:00:00|    2600| 825000|       house|       3|   647333.3333333334| 1029690.7848101265|             790000|
|2008-01-21 00:00:00|    2600| 315000|        unit|       1|            564250.0|  1030015.175911252|             825000|
|2008-04-24 00:00:00|    2600| 292500|       house|       1|            509900.0| 1031150.1206349207|             315000|
|2008-05-30 00:00:00|   

## Задание 2
В итоге у вас таблица с колонками (или нечто похожее):
*   price
*   Среднегодовая цена
*  Средняя стомость 10 проданных домов до текущего в том же районе (4digit postcode) (1 балл)
*  Средняя стомость 10 проданных домов после текущего в том же районе (4digit postcode) (1 балл)
*  Стоимость последнего проданного дома до текущего ((1 балл)
*  и др.

Посчитайте кол-во уникальных значений в каждой строчке (unique(row)) (ипользуйте udf). Попробуйте сделать то же самое используя pandas udf.

In [None]:
df.select(F.approx_count_distinct('postcode')).show()

+-------------------------------+
|approx_count_distinct(postcode)|
+-------------------------------+
|                             27|
+-------------------------------+



In [None]:
df.select(F.approx_count_distinct('propertyType')).show()

+-----------------------------------+
|approx_count_distinct(propertyType)|
+-----------------------------------+
|                                  2|
+-----------------------------------+



In [None]:
df.select(F.approx_count_distinct('bedrooms')).show()

+-------------------------------+
|approx_count_distinct(bedrooms)|
+-------------------------------+
|                              6|
+-------------------------------+



# Задание 3
SQL like case when или if elif else

Создайте колонку, в которой в которой будет отображаться "+", "-" или "=", если "Средняя стомость 10 проданных домов до текущего в том же районе" больше, меньше или равно "Средняя стомость 10 проданных домов после текущего в том же районе (4digit postcode)", соотвественно.

Если одно из полей Null, запишите в эту колонку "Нет данных"

In [None]:
df.withColumn('ifelse', F.when(F.col('postcode').isNull() | F.col('price').isNull(), F.lit('Нет данных')).when(F.col('average_price_before') > F.col('average_price_after'), F.lit('+')).when(F.col('average_price_before') < F.col('average_price_after'), '-').otherwise('=')).show(10)

+-------------------+--------+-------+------------+--------+--------------------+-------------------+-------------------+------+
|           datesold|postcode|  price|propertyType|bedrooms|average_price_before|average_price_after|previous_sold_price|ifelse|
+-------------------+--------+-------+------------+--------+--------------------+-------------------+-------------------+------+
|2007-07-08 00:00:00|    2600| 327000|       house|       1|            327000.0| 1028204.3785488959|               null|     -|
|2007-08-16 00:00:00|    2600| 790000|       house|       4|            558500.0| 1029312.1263823065|             327000|     -|
|2007-12-05 00:00:00|    2600| 825000|       house|       3|   647333.3333333334| 1029690.7848101265|             790000|     -|
|2008-01-21 00:00:00|    2600| 315000|        unit|       1|            564250.0|  1030015.175911252|             825000|     -|
|2008-04-24 00:00:00|    2600| 292500|       house|       1|            509900.0| 1031150.1206349

# Задание 4

In [None]:
# Создаём датасет для примеров
dataset_1 = [
  {
    'key_1' : 'abc',
    'value_1' : 10,
    'value_2' : 20
  },
  {
    'key_1' : 'def',
    'value_1' : 100,
    'value_2' : 300
  }
]

dataset_2 = [
  {
    'key_2' : 'abc',
    'value_1' : 5.5,
    'value_2' : 2.2
  },
  {
    'key_2' : 'xyz',
    'value_1' : 10.1,
    'value_2' : 13.5
  }
]

df1 = spark.createDataFrame(dataset_1)
print('df1')
df1.show()

df2 = spark.createDataFrame(dataset_2)
print('df2')
df2.show()

df1
+-----+-------+-------+
|key_1|value_1|value_2|
+-----+-------+-------+
|  abc|     10|     20|
|  def|    100|    300|
+-----+-------+-------+

df2
+-----+-------+-------+
|key_2|value_1|value_2|
+-----+-------+-------+
|  abc|    5.5|    2.2|
|  xyz|   10.1|   13.5|
+-----+-------+-------+



## Задание 4.1

In [None]:
# Создайте джойн, чтобы получить следующую таблицу
# +---+-------+-------+
# |key|value_1|value_2|
# +---+-------+-------+
# |abc|     10|     20|
# +---+-------+-------+

In [None]:
df1.join(df2, on=df1.key_1 == df2.key_2, how='semi').show()

+-----+-------+-------+
|key_1|value_1|value_2|
+-----+-------+-------+
|  abc|     10|     20|
+-----+-------+-------+



## Задание 4.2

In [None]:
# Создайте джойн, чтобы получить следующую таблицу
# +---+-------+-------+
# |key|value_1|value_2|
# +---+-------+-------+
# |def|    100|    300|
# +---+-------+-------+

In [None]:
df1.join(df2, on=df1.key_1 == df2.key_2, how='anti').show()

+-----+-------+-------+
|key_1|value_1|value_2|
+-----+-------+-------+
|  def|    100|    300|
+-----+-------+-------+



## Задание 4.3

In [None]:
# Создайте Inner джойн с условиями ---hidden---, для df1 и df2, соответсвенно
# В  итоге получится таблица
# +---+-------+-------+---+-------+-------+
# |key|value_1|value_2|key|value_1|value_2|
# +---+-------+-------+---+-------+-------+
# |abc|     10|     20|abc|    5.5|    2.2|
# |abc|     10|     20|xyz|   10.1|   13.5|
# |def|    100|    300|abc|    5.5|    2.2|
# |def|    100|    300|xyz|   10.1|   13.5|
# +---+-------+-------+---+-------+-------+

In [None]:
df1.join(df2, on=df1.value_2 > df2.value_2, how='inner').show()

+-----+-------+-------+-----+-------+-------+
|key_1|value_1|value_2|key_2|value_1|value_2|
+-----+-------+-------+-----+-------+-------+
|  abc|     10|     20|  abc|    5.5|    2.2|
|  abc|     10|     20|  xyz|   10.1|   13.5|
|  def|    100|    300|  abc|    5.5|    2.2|
|  def|    100|    300|  xyz|   10.1|   13.5|
+-----+-------+-------+-----+-------+-------+

