# Imports

In [1]:
from pathlib import Path
import sys

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
from pyspark.sql.functions import udf

ROOT_DIR = Path('../').resolve()
if ROOT_DIR not in sys.path:
    sys.path.append(str(ROOT_DIR))

In [2]:
DF_PATH = '/'.join([str(ROOT_DIR), 'data', 'brooklyn_sales_map.csv'])

APP_NAME = 'lab_1_app'

YEAR_BUILT = 'year_built'
YEAR_BUILT_DEVIATION = '_'.join([YEAR_BUILT, 'deviation'])

SALE_PRICE = 'sale_price'
ZIP_CODE = 'zip_code'

NEIGHBORHOOD = 'neighborhood'
BUILDING_CLASS_CATEGORY = 'building_class_category'
ADDRESS = 'address9'
BUILDING_NUM = 'building_num'
STREET_NAME = 'street_name'

In [3]:
spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
spark.version

'3.2.1'

# Data Loading

In [4]:
df = spark.read.csv(DF_PATH, header=True, inferSchema=True)

In [5]:
df.columns

['_c0',
 'borough1',
 'neighborhood',
 'building_class_category',
 'tax_class',
 'block',
 'lot',
 'easement',
 'building_class',
 'address9',
 'apartment_number',
 'zip_code',
 'residential_units',
 'commercial_units',
 'total_units',
 'land_sqft',
 'gross_sqft',
 'year_built',
 'tax_class_at_sale',
 'building_class_at_sale',
 'sale_price',
 'sale_date',
 'year_of_sale',
 'Borough23',
 'CD',
 'CT2010',
 'CB2010',
 'SchoolDist',
 'Council',
 'ZipCode',
 'FireComp',
 'PolicePrct',
 'HealthCent',
 'HealthArea',
 'SanitBoro',
 'SanitDistr',
 'SanitSub',
 'Address37',
 'ZoneDist1',
 'ZoneDist2',
 'ZoneDist3',
 'ZoneDist4',
 'Overlay1',
 'Overlay2',
 'SPDist1',
 'SPDist2',
 'SPDist3',
 'LtdHeight',
 'SplitZone',
 'BldgClass',
 'LandUse',
 'Easements',
 'OwnerType',
 'OwnerName',
 'LotArea',
 'BldgArea',
 'ComArea',
 'ResArea',
 'OfficeArea',
 'RetailArea',
 'GarageArea',
 'StrgeArea',
 'FactryArea',
 'OtherArea',
 'AreaSource',
 'NumBldgs',
 'NumFloors',
 'UnitsRes',
 'UnitsTotal',
 'LotFro

In [6]:
df.select(*df.columns[:10]).show()

+---+--------+--------------------+-----------------------+---------+-----+----+--------+--------------+--------------------+
|_c0|borough1|        neighborhood|building_class_category|tax_class|block| lot|easement|building_class|            address9|
+---+--------+--------------------+-----------------------+---------+-----+----+--------+--------------+--------------------+
|  1|       3|  DOWNTOWN-METROTECH|   28  COMMERCIAL CO...|        4|  140|1001|    null|            R5|      330 JAY STREET|
|  2|       3|DOWNTOWN-FULTON F...|   29  COMMERCIAL GA...|        4|   54|   1|    null|            G7|       85 JAY STREET|
|  3|       3|    BROOKLYN HEIGHTS|   21  OFFICE BUILDINGS|        4|  204|   1|    null|            O6| 29 COLUMBIA HEIGHTS|
|  4|       3|          MILL BASIN|    22  STORE BUILDINGS|        4| 8470|  55|    null|            K6|       5120 AVENUE U|
|  5|       3|    BROOKLYN HEIGHTS|        26 OTHER HOTELS|        4|  230|   1|    null|            H8|     21 CLARK 

In [7]:
# adress9: отсечь от номера дома улицу и сгруппировать.
# 2  lab вариант 2
# Защищать второй раз не надо.
# Machine Learning with PysSpark - книжка пригодится

In [8]:
df.select(*df.columns[:5]).describe().show()

+-------+------------------+--------+-----------------+-----------------------+------------------+
|summary|               _c0|borough1|     neighborhood|building_class_category|         tax_class|
+-------+------------------+--------+-----------------+-----------------------+------------------+
|  count|            390883|  390883|           390883|                 390800|            383949|
|   mean|          195442.0|     3.0|           3009.0|                   null|1.6015349828238465|
| stddev|112838.34697329922|     0.0|8.660254037844387|                   null|0.9477514176991289|
|    min|                 1|       3|             3004|    01 ONE FAMILY HOMES|                 1|
|    max|            390883|       3|  WYCKOFF HEIGHTS|   49 CONDO WAREHOUS...|                 4|
+-------+------------------+--------+-----------------+-----------------------+------------------+



In [9]:
df.select(*df.columns[:5]).printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- borough1: integer (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- building_class_category: string (nullable = true)
 |-- tax_class: string (nullable = true)



# Task 1

In [37]:
year_built_avg_df = df.agg(avg(YEAR_BUILT))
year_built_avg_df.show()

+------------------+
|   avg(year_built)|
+------------------+
|1701.6663067976863|
+------------------+



In [38]:
year_built_avg = year_built_avg_df.collect()[0][0]
year_built_avg

1701.6663067976863

In [39]:
def count_deviation(year: int, year_avg: float) -> float:
    """Count a deviation of year_built feature.
    
    The deviation is a difference between year value and \
        mean value of the feature.
    Args:
        year: A year of house building.
        year_avg: A precalculated mean value of the year_built feature.
        
    Returns:
        A deviation difference between year value and mean value of the feature.
    """
    deviation = year - year_avg
    return deviation

In [40]:
count_deviation_udf = udf(lambda row: count_deviation(row, year_built_avg))

In [41]:
new_df = df.withColumn(YEAR_BUILT_DEVIATION, count_deviation_udf(df[YEAR_BUILT]))
new_df.select(YEAR_BUILT, YEAR_BUILT_DEVIATION).show()

+----------+--------------------+
|year_built|year_built_deviation|
+----------+--------------------+
|      2002|   300.3336932023137|
|         0| -1701.6663067976863|
|      1924|  222.33369320231373|
|      1970|   268.3336932023137|
|      1927|  225.33369320231373|
|         0| -1701.6663067976863|
|      1928|  226.33369320231373|
|      2012|   310.3336932023137|
|         0| -1701.6663067976863|
|      1912|  210.33369320231373|
|         0| -1701.6663067976863|
|         0| -1701.6663067976863|
|      2009|   307.3336932023137|
|      1967|   265.3336932023137|
|      1920|  218.33369320231373|
|      1992|   290.3336932023137|
|      1920|  218.33369320231373|
|      2014|   312.3336932023137|
|         0| -1701.6663067976863|
|      1962|   260.3336932023137|
+----------+--------------------+
only showing top 20 rows



# Task 2

In [10]:
df.select(ADDRESS).show()

+--------------------+
|            address9|
+--------------------+
|      330 JAY STREET|
|       85 JAY STREET|
| 29 COLUMBIA HEIGHTS|
|       5120 AVENUE U|
|     21 CLARK STREET|
|     329 KENT AVENUE|
|     16 COURT STREET|
|     250 N 10 STREET|
|     418 KENT AVENUE|
|        9-47 HALL ST|
|  20 NORTH 12 STREET|
|          625 FULTON|
|236 LIVINGSTON ST...|
|  55 PROSPECT STREET|
|   486 FULTON STREET|
|     90 SANDS STREET|
|      19 KENT AVENUE|
|246 NORTH 8TH   S...|
|  8717 FOSTER AVENUE|
|     77 SANDS STREET|
+--------------------+
only showing top 20 rows



In [11]:
def get_building_num(address: str) -> float:
    """Gets a building number from an address.
    
    Args:
        address: A building address.
        
    Returns:
        A building number.
    """
    splitted_address = address.strip().split(' ')
    building_num = splitted_address[0]
    return building_num

In [12]:
def get_street_name(address: str) -> float:
    """Gets a building number from an address.
    
    Args:
        address: A building address.
        
    Returns:
        A street name.
    """
    splitted_address = address.strip().split(' ')
    street_num = ' '.join(splitted_address[1:])
    return street_num

In [13]:
get_building_num_udf = udf(get_building_num)
get_street_name_udf = udf(get_street_name)

In [14]:
new_df = df.withColumn(BUILDING_NUM, get_building_num_udf(
    df[ADDRESS])).withColumn(STREET_NAME, get_street_name_udf(df[ADDRESS]))
new_df.select(ADDRESS, BUILDING_NUM, STREET_NAME).show()

+--------------------+------------+------------------+
|            address9|building_num|       street_name|
+--------------------+------------+------------------+
|      330 JAY STREET|         330|        JAY STREET|
|       85 JAY STREET|          85|        JAY STREET|
| 29 COLUMBIA HEIGHTS|          29|  COLUMBIA HEIGHTS|
|       5120 AVENUE U|        5120|          AVENUE U|
|     21 CLARK STREET|          21|      CLARK STREET|
|     329 KENT AVENUE|         329|       KENT AVENUE|
|     16 COURT STREET|          16|      COURT STREET|
|     250 N 10 STREET|         250|       N 10 STREET|
|     418 KENT AVENUE|         418|       KENT AVENUE|
|        9-47 HALL ST|        9-47|           HALL ST|
|  20 NORTH 12 STREET|          20|   NORTH 12 STREET|
|          625 FULTON|         625|            FULTON|
|236 LIVINGSTON ST...|         236| LIVINGSTON STREET|
|  55 PROSPECT STREET|          55|   PROSPECT STREET|
|   486 FULTON STREET|         486|     FULTON STREET|
|     90 S

In [16]:
new_df.groupBy(STREET_NAME).count().show()

+-------------------+-----+
|        street_name|count|
+-------------------+-----+
|       STARR STREET|  157|
|   EAST 38TH STREET|  296|
|      TIFFANY PLACE|  420|
|        KING STREET|   43|
|            HANCOCK|   53|
|        10TH AVENUE|  173|
|      WELDON STREET|   78|
|    RAPELYEA STREET|    4|
|  LINCOLN PLACE, 4A|   10|
|EASTERN PARKWAY, 8D|    2|
|     WEST 12 STREET|   85|
|  EIGHTH AVENUE, 3R|    1|
|  HICKS STREET, 14L|    2|
|           56TH ST.|    2|
|  TOMPKINS PLACE, 3|    3|
|  LINCOLN PLACE, 1L|    5|
|   12TH STREET, K1L|    2|
| PROSPECT PLACE, 1L|    1|
|        VISTA PLACE|   16|
|     PARK PLACE, 2L|    3|
+-------------------+-----+
only showing top 20 rows



# Task 3

In [None]:
df_sorted = df.orderBy(SALE_PRICE, ZIP_CODE, ascending=[True, False])
df_sorted.select(SALE_PRICE, ZIP_CODE).show()

+----------+--------+
|sale_price|zip_code|
+----------+--------+
|       0.0|   33803|
|       0.0|   33803|
|       0.0|   11416|
|       0.0|   11416|
|       0.0|   11416|
|       0.0|   11416|
|       0.0|   11416|
|       0.0|   11416|
|       0.0|   11416|
|       0.0|   11416|
|       0.0|   11416|
|       0.0|   11416|
|       0.0|   11249|
|       0.0|   11249|
|       0.0|   11249|
|       0.0|   11249|
|       0.0|   11249|
|       0.0|   11249|
|       0.0|   11249|
|       0.0|   11249|
+----------+--------+
only showing top 20 rows



# Task 4

In [None]:
df_grouped = df.groupby([NEIGHBORHOOD, BUILDING_CLASS_CATEGORY]).mean(SALE_PRICE)
df_grouped.show()

+--------------------+-----------------------+------------------+
|        neighborhood|building_class_category|   avg(sale_price)|
+--------------------+-----------------------+------------------+
|    BROOKLYN HEIGHTS|    22  STORE BUILDINGS|1748589.4137931035|
|             MIDWOOD|   05  TAX CLASS 1 V...|455718.34920634923|
|   WILLIAMSBURG-EAST|   08 RENTALS - ELEV...|          834672.5|
|        BOROUGH PARK|   15  CONDOS - 2-10...| 355174.0700116686|
|    PARK SLOPE SOUTH|   46  CONDO STORE B...|         1325000.0|
|      BRIGHTON BEACH|    21 OFFICE BUILDINGS|          576720.0|
|  BEDFORD STUYVESANT|   05 TAX CLASS 1 VA...| 545808.8823529412|
|  WILLIAMSBURG-SOUTH|   04  TAX CLASS 1 C...|          300978.8|
|        BOROUGH PARK|   04  TAX CLASS 1 C...| 288998.2962962963|
|           FLATLANDS|   29 COMMERCIAL GAR...|          195000.0|
|      SHEEPSHEAD BAY|   15 CONDOS - 2-10 ...|          477428.8|
|         BOERUM HILL|        17  CONDO COOPS|          720000.0|
|      BRI