Favorite books on PySpark:
* Spark: The Definitive Guide https://learning.oreilly.com/library/view/spark-the-definitive/9781491912201/
* Learning Spark https://learning.oreilly.com/library/view/learning-spark-2nd/9781492050032/

Documentation:
* https://spark.apache.org/docs/latest/api/python/index.html

Articles:
* Best practices for caching in Spark SQL https://towardsdatascience.com/best-practices-for-caching-in-spark-sql-b22fb0f02d34
* Be in charge of Query Execution in Spark SQL https://towardsdatascience.com/be-in-charge-of-query-execution-in-spark-sql-c83d1e16b9b8
* Spark DataFrame Cache and Persist Explained https://sparkbyexamples.com/spark/spark-dataframe-cache-and-persist-explained/
* Should I always cache my RDD's and DataFrames? https://forums.databricks.com/questions/271/should-i-always-cache-my-rdds.html
* PySpark Dataframe Basics https://changhsinlee.com/pyspark-dataframe-basics/
* How to Turn Python Functions into PySpark Functions (UDF) https://changhsinlee.com/pyspark-udf/
* Approximate Algorithms in Apache Spark: HyperLogLog and Quantiles https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html
* Interactive Audience Analytics With Apache Spark and HyperLogLog https://databricks.com/blog/2015/10/13/interactive-audience-analytics-with-apache-spark-and-hyperloglog.html
* Introducing Pandas UDF for PySpark https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
* Spark by Examples https://sparkbyexamples.com/

Data used in this notebook is from "Sberbank Russian Housing Market" competition on Kaggle: https://www.kaggle.com/c/sberbank-russian-housing-market

In [100]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import os

# Creating Spark Session

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
spark

# Quick Overview

In [74]:
SBERBANK_BASE_DIR = "/Users/anton/datasets/sberbank_russian_housing_market"
SBERBANK_PATH_FULL = os.path.join(SBERBANK_BASE_DIR, "train.csv")
SBERBANK_PATH_SUBSET = os.path.join(SBERBANK_BASE_DIR, "train_subset.csv")

df_full = spark.read.option("inferSchema", "true").option("header", "true").csv(SBERBANK_PATH_FULL)
df_full.printSchema()

root
 |-- id: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- full_sq: integer (nullable = true)
 |-- life_sq: string (nullable = true)
 |-- floor: string (nullable = true)
 |-- max_floor: string (nullable = true)
 |-- material: string (nullable = true)
 |-- build_year: string (nullable = true)
 |-- num_room: string (nullable = true)
 |-- kitch_sq: string (nullable = true)
 |-- state: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- sub_area: string (nullable = true)
 |-- area_m: double (nullable = true)
 |-- raion_popul: integer (nullable = true)
 |-- green_zone_part: double (nullable = true)
 |-- indust_part: double (nullable = true)
 |-- children_preschool: integer (nullable = true)
 |-- preschool_quota: string (nullable = true)
 |-- preschool_education_centers_raion: integer (nullable = true)
 |-- children_school: integer (nullable = true)
 |-- school_quota: string (nullable = true)
 |-- school_education_centers_raion: integer (nu

In [75]:
with pd.option_context('display.max_columns', None): 
    display(df_full.limit(5).toPandas())

Unnamed: 0,id,timestamp,full_sq,life_sq,floor,max_floor,material,build_year,num_room,kitch_sq,state,product_type,sub_area,area_m,raion_popul,green_zone_part,indust_part,children_preschool,preschool_quota,preschool_education_centers_raion,children_school,school_quota,school_education_centers_raion,school_education_centers_top_20_raion,hospital_beds_raion,healthcare_centers_raion,university_top_20_raion,sport_objects_raion,additional_education_raion,culture_objects_top_25,culture_objects_top_25_raion,shopping_centers_raion,office_raion,thermal_power_plant_raion,incineration_raion,oil_chemistry_raion,radiation_raion,railroad_terminal_raion,big_market_raion,nuclear_reactor_raion,detention_facility_raion,full_all,male_f,female_f,young_all,young_male,young_female,work_all,work_male,work_female,ekder_all,ekder_male,ekder_female,0_6_all,0_6_male,0_6_female,7_14_all,7_14_male,7_14_female,0_17_all,0_17_male,0_17_female,16_29_all,16_29_male,16_29_female,0_13_all,0_13_male,0_13_female,raion_build_count_with_material_info,build_count_block,build_count_wood,build_count_frame,build_count_brick,build_count_monolith,build_count_panel,build_count_foam,build_count_slag,build_count_mix,raion_build_count_with_builddate_info,build_count_before_1920,build_count_1921-1945,build_count_1946-1970,build_count_1971-1995,build_count_after_1995,ID_metro,metro_min_avto,metro_km_avto,metro_min_walk,metro_km_walk,kindergarten_km,school_km,park_km,green_zone_km,industrial_km,water_treatment_km,cemetery_km,incineration_km,railroad_station_walk_km,railroad_station_walk_min,ID_railroad_station_walk,railroad_station_avto_km,railroad_station_avto_min,ID_railroad_station_avto,public_transport_station_km,public_transport_station_min_walk,water_km,water_1line,mkad_km,ttk_km,sadovoe_km,bulvar_ring_km,kremlin_km,big_road1_km,ID_big_road1,big_road1_1line,big_road2_km,ID_big_road2,railroad_km,railroad_1line,zd_vokzaly_avto_km,ID_railroad_terminal,bus_terminal_avto_km,ID_bus_terminal,oil_chemistry_km,nuclear_reactor_km,radiation_km,power_transmission_line_km,thermal_power_plant_km,ts_km,big_market_km,market_shop_km,fitness_km,swim_pool_km,ice_rink_km,stadium_km,basketball_km,hospice_morgue_km,detention_facility_km,public_healthcare_km,university_km,workplaces_km,shopping_centers_km,office_km,additional_education_km,preschool_km,big_church_km,church_synagogue_km,mosque_km,theater_km,museum_km,exhibition_km,catering_km,ecology,green_part_500,prom_part_500,office_count_500,office_sqm_500,trc_count_500,trc_sqm_500,cafe_count_500,cafe_sum_500_min_price_avg,cafe_sum_500_max_price_avg,cafe_avg_price_500,cafe_count_500_na_price,cafe_count_500_price_500,cafe_count_500_price_1000,cafe_count_500_price_1500,cafe_count_500_price_2500,cafe_count_500_price_4000,cafe_count_500_price_high,big_church_count_500,church_count_500,mosque_count_500,leisure_count_500,sport_count_500,market_count_500,green_part_1000,prom_part_1000,office_count_1000,office_sqm_1000,trc_count_1000,trc_sqm_1000,cafe_count_1000,cafe_sum_1000_min_price_avg,cafe_sum_1000_max_price_avg,cafe_avg_price_1000,cafe_count_1000_na_price,cafe_count_1000_price_500,cafe_count_1000_price_1000,cafe_count_1000_price_1500,cafe_count_1000_price_2500,cafe_count_1000_price_4000,cafe_count_1000_price_high,big_church_count_1000,church_count_1000,mosque_count_1000,leisure_count_1000,sport_count_1000,market_count_1000,green_part_1500,prom_part_1500,office_count_1500,office_sqm_1500,trc_count_1500,trc_sqm_1500,cafe_count_1500,cafe_sum_1500_min_price_avg,cafe_sum_1500_max_price_avg,cafe_avg_price_1500,cafe_count_1500_na_price,cafe_count_1500_price_500,cafe_count_1500_price_1000,cafe_count_1500_price_1500,cafe_count_1500_price_2500,cafe_count_1500_price_4000,cafe_count_1500_price_high,big_church_count_1500,church_count_1500,mosque_count_1500,leisure_count_1500,sport_count_1500,market_count_1500,green_part_2000,prom_part_2000,office_count_2000,office_sqm_2000,trc_count_2000,trc_sqm_2000,cafe_count_2000,cafe_sum_2000_min_price_avg,cafe_sum_2000_max_price_avg,cafe_avg_price_2000,cafe_count_2000_na_price,cafe_count_2000_price_500,cafe_count_2000_price_1000,cafe_count_2000_price_1500,cafe_count_2000_price_2500,cafe_count_2000_price_4000,cafe_count_2000_price_high,big_church_count_2000,church_count_2000,mosque_count_2000,leisure_count_2000,sport_count_2000,market_count_2000,green_part_3000,prom_part_3000,office_count_3000,office_sqm_3000,trc_count_3000,trc_sqm_3000,cafe_count_3000,cafe_sum_3000_min_price_avg,cafe_sum_3000_max_price_avg,cafe_avg_price_3000,cafe_count_3000_na_price,cafe_count_3000_price_500,cafe_count_3000_price_1000,cafe_count_3000_price_1500,cafe_count_3000_price_2500,cafe_count_3000_price_4000,cafe_count_3000_price_high,big_church_count_3000,church_count_3000,mosque_count_3000,leisure_count_3000,sport_count_3000,market_count_3000,green_part_5000,prom_part_5000,office_count_5000,office_sqm_5000,trc_count_5000,trc_sqm_5000,cafe_count_5000,cafe_sum_5000_min_price_avg,cafe_sum_5000_max_price_avg,cafe_avg_price_5000,cafe_count_5000_na_price,cafe_count_5000_price_500,cafe_count_5000_price_1000,cafe_count_5000_price_1500,cafe_count_5000_price_2500,cafe_count_5000_price_4000,cafe_count_5000_price_high,big_church_count_5000,church_count_5000,mosque_count_5000,leisure_count_5000,sport_count_5000,market_count_5000,price_doc
0,1,2011-08-20,43,27,4,,,,,,,Investment,Bibirevo,6407578.0,155572,0.189727,7e-05,9576,5001,5,10309,11065,5,0,240.0,1,0,7,3,no,0,16,1,no,no,no,no,no,no,no,no,86206,40477,45729,21154,11007,10147,98207,52277,45930,36211,10580,25631,9576,4899,4677,10309,5463,4846,23603,12286,11317,17508,9425,8083,18654,9709,8945,211,25,0,0,0,2,184,0,0,0,211,0,0,0,206,5,1,2.590241,1.13126,13.57511887,1.131259906,0.1457,0.177975,2.158587,0.600973,1.080934,23.68346,1.804127,3.633334,5.419893032,65.03871639,1,5.419893,6.905893,1,0.274985,3.299822,0.992631,no,1.422391,10.918587,13.100618,13.675657,15.156211,1.422391,1,no,3.830951,5,1.305159,no,14.231961,101,24.292406,1,18.152338,5.718519,1.210027,1.062513,5.814135,4.308127,10.814172,1.676258,0.485841,3.065047,1.107594,8.148591,3.516513,2.392353,4.248036,0.974743,6.715026,0.88435,0.648488,0.637189,0.947962,0.177975,0.625783,0.628187,3.93204,14.053047,7.389498,7.023705,0.516838,good,0.0,0.0,0,0,0,0,0,,,,0,0,0,0,0,0,0,0,0,0,0,1,0,7.36,0.0,1,30500,3,55600,19,527.78,888.89,708.33,1,10,4,3,1,0,0,1,2,0,0,6,1,14.27,6.92,3,39554,9,171420,34,566.67,969.7,768.18,1,14,11,6,2,0,0,1,2,0,0,7,1,11.77,15.97,9,188854,19,1244891,36,614.29,1042.86,828.57,1,15,11,6,2,1,0,1,2,0,0,10,1,11.98,13.55,12,251554,23,1419204,68,639.68,1079.37,859.52,5,21,22,16,3,1,0,2,4,0,0,21,1,13.09,13.31,29,807385,52,4036616,152,708.57,1185.71,947.14,12,39,48,40,9,4,0,13,22,1,0,52,4,5850000
1,2,2011-08-23,34,19,3,,,,,,,Investment,Nagatinskij Zaton,9589337.0,115352,0.372602,0.049637,6880,3119,5,7759,6237,8,0,229.0,1,0,6,1,yes,1,3,0,no,no,no,no,no,no,no,no,76284,34200,42084,15727,7925,7802,70194,35622,34572,29431,9266,20165,6880,3466,3414,7759,3909,3850,17700,8998,8702,15164,7571,7593,13729,6929,6800,245,83,1,0,67,4,90,0,0,0,244,1,1,143,84,15,2,0.9367,0.647337,7.620630408,0.635052534,0.147754,0.273345,0.55069,0.065321,0.966479,1.317476,4.655004,8.648587,3.411993084,40.943917,2,3.641773,4.679745,2,0.065263,0.78316,0.698081,no,9.503405,3.103996,6.444333,8.13264,8.698054,2.887377,2,no,3.103996,4,0.694536,no,9.242586,32,5.706113,2,9.034642,3.489954,2.724295,1.246149,3.419574,0.72556,6.910568,3.424716,0.668364,2.000154,8.972823,6.127073,1.161579,2.543747,12.649879,1.477723,1.85256,0.686252,0.519311,0.688796,1.072315,0.273345,0.967821,0.471447,4.841544,6.829889,0.70926,2.35884,0.230287,excellent,25.14,0.0,0,0,0,0,5,860.0,1500.0,1180.0,0,1,3,0,0,1,0,0,1,0,0,0,0,26.66,0.07,2,86600,5,94065,13,615.38,1076.92,846.15,0,5,6,1,0,1,0,1,2,0,4,2,0,21.53,7.71,3,102910,7,127065,17,694.12,1205.88,950.0,0,6,7,1,2,1,0,1,5,0,4,9,0,22.37,19.25,4,165510,8,179065,21,695.24,1190.48,942.86,0,7,8,3,2,1,0,1,5,0,4,11,0,18.07,27.32,12,821986,14,491565,30,631.03,1086.21,858.62,1,11,11,4,2,1,0,1,7,0,6,19,1,10.26,27.47,66,2690465,40,2034942,177,673.81,1148.81,911.31,9,49,65,36,15,3,0,15,29,1,10,66,14,6000000
2,3,2011-08-27,43,29,2,,,,,,,Investment,Tekstil'shhiki,4808270.0,101708,0.11256,0.118537,5879,1463,4,6207,5580,7,0,1183.0,1,0,5,1,no,0,0,1,no,no,no,yes,no,no,no,no,101982,46076,55906,13028,6835,6193,63388,31813,31575,25292,7609,17683,5879,3095,2784,6207,3269,2938,14884,7821,7063,19401,9045,10356,11252,5916,5336,330,59,0,0,206,4,60,0,1,0,330,1,0,246,63,20,3,2.120999,1.637996,17.3515154,1.445959617,0.049102,0.158072,0.374848,0.453172,0.939275,4.91266,3.381083,11.99648,1.277658039,15.33189647,3,1.277658,1.70142,3,0.328756,3.945073,0.468265,no,5.6048,2.927487,6.963403,8.054252,9.067885,0.64725,3,no,2.927487,4,0.700691,no,9.540544,5,6.710302,3,5.777394,7.506612,0.772216,1.602183,3.682455,3.562188,5.752368,1.375443,0.733101,1.239304,1.978517,0.767569,1.952771,0.621357,7.682303,0.097144,0.841254,1.510089,1.486533,1.543049,0.391957,0.158072,3.178751,0.755946,7.922152,4.2732,3.156423,4.958214,0.190462,poor,1.67,0.0,0,0,0,0,3,666.67,1166.67,916.67,0,0,2,1,0,0,0,0,0,0,0,0,0,4.99,0.29,0,0,0,0,9,642.86,1142.86,892.86,2,0,5,2,0,0,0,0,1,0,0,5,3,9.92,6.73,0,0,1,2600,14,516.67,916.67,716.67,2,4,6,2,0,0,0,0,4,0,0,6,5,12.99,12.75,4,100200,7,52550,24,563.64,977.27,770.45,2,8,9,4,1,0,0,0,4,0,0,8,5,12.14,26.46,8,110856,7,52550,41,697.44,1192.31,944.87,2,9,17,9,3,1,0,0,11,0,0,20,6,13.69,21.58,43,1478160,35,1572990,122,702.68,1196.43,949.55,10,29,45,25,10,3,0,11,27,0,4,67,10,5700000
3,4,2011-09-01,89,50,9,,,,,,,Investment,Mitino,12583540.0,178473,0.194703,0.069753,13087,6839,9,13670,17063,10,0,,1,0,17,6,no,0,11,4,no,no,no,no,no,no,no,no,21155,9828,11327,28563,14680,13883,120381,60040,60341,29529,9083,20446,13087,6645,6442,13670,7126,6544,32063,16513,15550,3292,1450,1842,24934,12782,12152,458,9,51,12,124,50,201,0,9,2,459,13,24,40,130,252,4,1.489049,0.984537,11.56562408,0.963802007,0.179441,0.236455,0.07809,0.106125,0.451173,15.62371,2.01708,14.31764,4.2914325,51.49719001,4,3.816045,5.271136,4,0.131597,1.579164,1.200336,no,2.677824,14.606501,17.457198,18.309433,19.487005,2.677824,1,no,2.780449,17,1.999265,no,17.47838,83,6.734618,1,27.667863,9.522538,6.348716,1.767612,11.178333,0.583025,27.892717,0.811275,0.623484,1.950317,6.483172,7.385521,4.923843,3.549558,8.789894,2.163735,10.903161,0.622272,0.599914,0.934273,0.892674,0.236455,1.031777,1.561505,15.300449,16.990677,16.041521,5.029696,0.46582,good,17.36,0.57,0,0,0,0,2,1000.0,1500.0,1250.0,0,0,0,2,0,0,0,0,0,0,0,0,0,19.25,10.35,1,11000,6,80780,12,658.33,1083.33,870.83,0,3,4,5,0,0,0,0,0,0,0,3,1,28.38,6.57,2,11000,7,89492,23,673.91,1130.43,902.17,0,5,9,8,1,0,0,1,0,0,0,9,2,32.29,5.73,2,11000,7,89492,25,660.0,1120.0,890.0,0,5,11,8,1,0,0,1,1,0,0,13,2,20.79,3.57,4,167000,12,205756,32,718.75,1218.75,968.75,0,5,14,10,3,0,0,1,2,0,0,18,3,14.18,3.89,8,244166,22,942180,61,931.58,1552.63,1242.11,4,7,21,15,11,2,1,4,4,0,0,26,3,13100000
4,5,2011-09-05,77,77,4,,,,,,,Investment,Basmannoe,8398461.0,108171,0.015234,0.037316,5706,3240,7,6748,7770,9,0,562.0,4,2,25,2,no,0,10,93,no,no,no,yes,yes,no,no,no,28179,13522,14657,13368,7159,6209,68043,34236,33807,26760,8563,18197,5706,2982,2724,6748,3664,3084,15237,8113,7124,5164,2583,2581,11631,6223,5408,746,48,0,0,643,16,35,0,3,1,746,371,114,146,62,53,5,1.257186,0.87662,8.266305238,0.68885877,0.247901,0.376838,0.258289,0.236214,0.392871,10.68354,2.936581,11.90391,0.853960072,10.24752087,5,1.595898,2.156284,113,0.07148,0.857764,0.820294,no,11.616653,1.721834,0.04681,0.787593,2.578671,1.721834,4,no,3.133531,10,0.084113,yes,1.595898,113,1.423428,4,6.515857,8.671016,1.638318,3.63264,4.587917,2.60942,9.155057,1.969738,0.220288,2.544696,3.975401,3.610754,0.307915,1.864637,3.779781,1.121703,0.991683,0.892668,0.429052,0.077901,0.810801,0.376838,0.378756,0.121681,2.58437,1.112486,1.800125,1.339652,0.026102,excellent,3.56,4.44,15,293699,1,45000,48,702.22,1166.67,934.44,3,17,10,11,7,0,0,1,4,0,2,3,0,3.34,8.29,46,420952,3,158200,153,763.45,1272.41,1017.93,8,39,45,39,19,2,1,7,12,0,6,7,0,4.12,4.83,93,1195735,9,445900,272,766.8,1272.73,1019.76,19,70,74,72,30,6,1,18,30,0,10,14,2,4.53,5.02,149,1625130,17,564843,483,765.93,1269.23,1017.58,28,130,129,131,50,14,1,35,61,0,17,21,3,5.06,8.62,305,3420907,60,2296870,1068,853.03,1410.45,1131.74,63,266,267,262,149,57,4,70,121,1,40,77,5,8.38,10.92,689,8404624,114,3503058,2283,853.88,1411.45,1132.66,143,566,578,552,319,108,17,135,236,2,91,195,14,16331452


In [76]:
df = df_full.select("timestamp",       # date of transaction
                    "full_sq",         # total area
                    "life_sq",         # living area
                    "floor",
                    "sub_area",        # district
                    "green_zone_part",
                    "indust_part",
                    "price_doc").      # sale price

In [82]:
df.agg({"price_doc": "max"}).show()

+--------------+
|max(price_doc)|
+--------------+
|     111111112|
+--------------+



In [83]:
df.agg({"price_doc": "min"}).show()

+--------------+
|min(price_doc)|
+--------------+
|        100000|
+--------------+



In [78]:
df.write.mode("overwrite").option("header", "true").csv(SBERBANK_PATH_SUBSET)

In [79]:
df = spark.read.option("inferSchema", "true").option("header", "true").csv(SBERBANK_PATH_SUBSET)
df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- full_sq: integer (nullable = true)
 |-- life_sq: string (nullable = true)
 |-- floor: string (nullable = true)
 |-- sub_area: string (nullable = true)
 |-- green_zone_part: double (nullable = true)
 |-- indust_part: double (nullable = true)
 |-- price_doc: integer (nullable = true)



# Creating and Reading Data

In [85]:
numbers = spark.range(10)
numbers.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [256]:
spark.range(1).selectExpr("'Hello' as key", "0 as value").show()

+-----+-----+
|  key|value|
+-----+-----+
|Hello|    0|
+-----+-----+



In [87]:
import tempfile
import os

with tempfile.NamedTemporaryFile() as f:
    f.write(b'1\t"Anna"\n2\tBoris')
    f.flush()
    os.fsync(f)

    schema = StructType([
        StructField("ID", LongType(), nullable=False),
        StructField("Name", StringType(), nullable=False)
    ])

    spark.read.format("csv").schema(schema).option('sep', '\t').load(f.name).show()

+---+-----+
| ID| Name|
+---+-----+
|  1| Anna|
|  2|Boris|
+---+-----+



In [112]:
schema = StructType([
    StructField("Name", StringType(), nullable=False),
    StructField("ID", LongType(), nullable=False)
])
row = Row("Hello", 1)
spark.createDataFrame([row], schema).show()

+-----+---+
| Name| ID|
+-----+---+
|Hello|  1|
+-----+---+



In [286]:
spark.createDataFrame([
    (10, "Bob"),
    (20, "Ann")]).toDF("id", "name").show()

+---+----+
| id|name|
+---+----+
| 10| Bob|
| 20| Ann|
+---+----+



In [315]:
df = spark \
  .read \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .csv(SBERBANK_PATH_SUBSET)

In [316]:
df.createOrReplaceTempView("df")
spark.sql("SELECT count(*) FROM df").show()

+--------+
|count(1)|
+--------+
|   30471|
+--------+



# Showing Data

In [88]:
df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- full_sq: integer (nullable = true)
 |-- life_sq: string (nullable = true)
 |-- floor: string (nullable = true)
 |-- sub_area: string (nullable = true)
 |-- green_zone_part: double (nullable = true)
 |-- indust_part: double (nullable = true)
 |-- price_doc: integer (nullable = true)



In [95]:
df.schema

StructType(List(StructField(timestamp,TimestampType,true),StructField(full_sq,IntegerType,true),StructField(life_sq,StringType,true),StructField(floor,StringType,true),StructField(sub_area,StringType,true),StructField(green_zone_part,DoubleType,true),StructField(indust_part,DoubleType,true),StructField(price_doc,IntegerType,true)))

In [89]:
df.schema.names

['timestamp',
 'full_sq',
 'life_sq',
 'floor',
 'sub_area',
 'green_zone_part',
 'indust_part',
 'price_doc']

In [127]:
df.count() # number of rows

30471

In [90]:
df.take(2)

[Row(timestamp=datetime.datetime(2014, 3, 27, 0, 0), full_sq=35, life_sq='19', floor='3', sub_area='Ivanovskoe', green_zone_part=0.512707469, indust_part=0.000169676, price_doc=5300000),
 Row(timestamp=datetime.datetime(2014, 3, 27, 0, 0), full_sq=46, life_sq='NA', floor='12', sub_area='Tverskoe', green_zone_part=0.065444314, indust_part=7.81528e-05, price_doc=4648932)]

In [91]:
df.show(n=2)

+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
|          timestamp|full_sq|life_sq|floor|  sub_area|green_zone_part|indust_part|price_doc|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
|2014-03-27 00:00:00|     35|     19|    3|Ivanovskoe|    0.512707469| 1.69676E-4|  5300000|
|2014-03-27 00:00:00|     46|     NA|   12|  Tverskoe|    0.065444314| 7.81528E-5|  4648932|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
only showing top 2 rows



In [92]:
df.limit(2).collect()

[Row(timestamp=datetime.datetime(2014, 3, 27, 0, 0), full_sq=35, life_sq='19', floor='3', sub_area='Ivanovskoe', green_zone_part=0.512707469, indust_part=0.000169676, price_doc=5300000),
 Row(timestamp=datetime.datetime(2014, 3, 27, 0, 0), full_sq=46, life_sq='NA', floor='12', sub_area='Tverskoe', green_zone_part=0.065444314, indust_part=7.81528e-05, price_doc=4648932)]

In [99]:
df.first()

Row(timestamp=datetime.datetime(2014, 3, 27, 0, 0), full_sq=35, life_sq='19', floor='3', sub_area='Ivanovskoe', green_zone_part=0.512707469, indust_part=0.000169676, price_doc=5300000)

In [93]:
df.limit(2).toPandas()

Unnamed: 0,timestamp,full_sq,life_sq,floor,sub_area,green_zone_part,indust_part,price_doc
0,2014-03-27,35,19.0,3,Ivanovskoe,0.512707,0.00017,5300000
1,2014-03-27,46,,12,Tverskoe,0.065444,7.8e-05,4648932


# Operations

## Columns and expressions

> To Spark, columns are logical constructions that simply represent a value computed on a per-record basis by means of an expression. This means that to have a real value for a column, we need to have a row; and to have a row, we need to have a DataFrame. You cannot manipulate an individual column outside the context of a DataFrame

From "Spark: The Definitive Guide"

In [94]:
col('abc'), column('timestamp')

(Column<b'abc'>, Column<b'timestamp'>)

In [98]:
expr('full_sq > 35')

Column<b'(full_sq > 35)'>

## Basics

In [116]:
df.select("timestamp", "price_doc").show(2)

+-------------------+---------+
|          timestamp|price_doc|
+-------------------+---------+
|2014-03-27 00:00:00|  5300000|
|2014-03-27 00:00:00|  4648932|
+-------------------+---------+
only showing top 2 rows



In [119]:
df.select(expr("green_zone_part * 100").alias("green_zone_percent"), expr("price_doc / 75 as price_usd")).show(2)

+------------------+-----------------+
|green_zone_percent|        price_usd|
+------------------+-----------------+
|        51.2707469|70666.66666666667|
| 6.544431400000001|         61985.76|
+------------------+-----------------+
only showing top 2 rows



In [121]:
df.selectExpr("*", "green_zone_part * 100 as green_zone_percent", "price_doc / 75 as price_usd").show(2)

+-------------------+-------+-------+-----+----------+---------------+-----------+---------+------------------+-----------------+
|          timestamp|full_sq|life_sq|floor|  sub_area|green_zone_part|indust_part|price_doc|green_zone_percent|        price_usd|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+------------------+-----------------+
|2014-03-27 00:00:00|     35|     19|    3|Ivanovskoe|    0.512707469| 1.69676E-4|  5300000|        51.2707469|70666.66666666667|
|2014-03-27 00:00:00|     46|     NA|   12|  Tverskoe|    0.065444314| 7.81528E-5|  4648932| 6.544431400000001|         61985.76|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+------------------+-----------------+
only showing top 2 rows



In [125]:
df.selectExpr("avg(price_doc)", "count(*)").show(2)

+----------------+--------+
|  avg(price_doc)|count(1)|
+----------------+--------+
|7123035.27773949|   30471|
+----------------+--------+



In [128]:
df.select("timestamp", lit(5).alias("Five")).show(2)

+-------------------+----+
|          timestamp|Five|
+-------------------+----+
|2014-03-27 00:00:00|   5|
|2014-03-27 00:00:00|   5|
+-------------------+----+
only showing top 2 rows



In [132]:
df.withColumn("more_green_than_indust", expr("green_zone_part > indust_part")).show(2)

+-------------------+-------+-------+-----+----------+---------------+-----------+---------+----------------------+
|          timestamp|full_sq|life_sq|floor|  sub_area|green_zone_part|indust_part|price_doc|more_green_than_indust|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+----------------------+
|2014-03-27 00:00:00|     35|     19|    3|Ivanovskoe|    0.512707469| 1.69676E-4|  5300000|                  true|
|2014-03-27 00:00:00|     46|     NA|   12|  Tverskoe|    0.065444314| 7.81528E-5|  4648932|                  true|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+----------------------+
only showing top 2 rows



In [133]:
df.withColumnRenamed("timestamp", "selling_date").columns

['selling_date',
 'full_sq',
 'life_sq',
 'floor',
 'sub_area',
 'green_zone_part',
 'indust_part',
 'price_doc']

In [134]:
df.drop("sub_area").columns

['timestamp',
 'full_sq',
 'life_sq',
 'floor',
 'green_zone_part',
 'indust_part',
 'price_doc']

In [135]:
df.withColumn("green_zone_percent", expr("green_zone_part * 100").cast("long")).show(2)

+-------------------+-------+-------+-----+----------+---------------+-----------+---------+------------------+
|          timestamp|full_sq|life_sq|floor|  sub_area|green_zone_part|indust_part|price_doc|green_zone_percent|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+------------------+
|2014-03-27 00:00:00|     35|     19|    3|Ivanovskoe|    0.512707469| 1.69676E-4|  5300000|                51|
|2014-03-27 00:00:00|     46|     NA|   12|  Tverskoe|    0.065444314| 7.81528E-5|  4648932|                 6|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+------------------+
only showing top 2 rows



In [183]:
df.select(lower(col("sub_area"))).show(2)

+---------------+
|lower(sub_area)|
+---------------+
|     ivanovskoe|
|       tverskoe|
+---------------+
only showing top 2 rows



## Filtering

In [136]:
df.filter(col("floor") > 5).count()

17030

In [142]:
df.where(col("floor") > 5).where(expr("full_sq > 100")).select(expr("avg(price_doc)").cast("int")).show()

+---------------------------+
|CAST(avg(price_doc) AS INT)|
+---------------------------+
|                   21172852|
+---------------------------+



In [170]:
df.select(round(df.price_doc / 1_000_000, 3).alias("price in mln rbl")).show(5)

+----------------+
|price in mln rbl|
+----------------+
|             5.3|
|           4.649|
|           6.296|
|            6.53|
|            10.3|
+----------------+
only showing top 5 rows



In [164]:
df.where(df.sub_area == "Kuncevo").show(2)

+-------------------+-------+-------+-----+--------+---------------+-----------+---------+
|          timestamp|full_sq|life_sq|floor|sub_area|green_zone_part|indust_part|price_doc|
+-------------------+-------+-------+-----+--------+---------------+-----------+---------+
|2014-03-27 00:00:00|    115|     55|    2| Kuncevo|    0.070662054|0.035145267| 19074000|
|2014-03-28 00:00:00|     76|     42|    6| Kuncevo|    0.070662054|0.035145267|  2000000|
+-------------------+-------+-------+-----+--------+---------------+-----------+---------+
only showing top 2 rows



In [165]:
df.where(instr(df.sub_area, "Kun") >= 1).where((col("full_sq") > 100) | (col("floor") > 30)).show(2)

+-------------------+-------+-------+-----+--------+---------------+-----------+---------+
|          timestamp|full_sq|life_sq|floor|sub_area|green_zone_part|indust_part|price_doc|
+-------------------+-------+-------+-----+--------+---------------+-----------+---------+
|2014-03-27 00:00:00|    115|     55|    2| Kuncevo|    0.070662054|0.035145267| 19074000|
|2014-04-15 00:00:00|    114|     54|   16| Kuncevo|    0.070662054|0.035145267| 19967500|
+-------------------+-------+-------+-----+--------+---------------+-----------+---------+
only showing top 2 rows



## Date and time

In [190]:
df.sample(fraction=0.0002).select(col("timestamp"), date_add(col("timestamp"), 5), datediff(current_date(), col("timestamp"))).show(5)

+-------------------+----------------------+-----------------------------------+
|          timestamp|date_add(timestamp, 5)|datediff(current_date(), timestamp)|
+-------------------+----------------------+-----------------------------------+
|2014-04-24 00:00:00|            2014-04-29|                               2439|
|2014-06-16 00:00:00|            2014-06-21|                               2386|
|2014-08-12 00:00:00|            2014-08-17|                               2329|
|2014-11-11 00:00:00|            2014-11-16|                               2238|
|2011-11-21 00:00:00|            2011-11-26|                               3324|
+-------------------+----------------------+-----------------------------------+
only showing top 5 rows



In [191]:
df.filter(col("timestamp") > "2014-04-24").count()

12675

## Nulls

In [200]:
df_with_null = df.na.replace(["NA"], [None], "life_sq")
df_with_null.show(2)

+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
|          timestamp|full_sq|life_sq|floor|  sub_area|green_zone_part|indust_part|price_doc|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
|2014-03-27 00:00:00|     35|     19|    3|Ivanovskoe|    0.512707469| 1.69676E-4|  5300000|
|2014-03-27 00:00:00|     46|   null|   12|  Tverskoe|    0.065444314| 7.81528E-5|  4648932|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
only showing top 2 rows



In [201]:
df_with_null.where(col("life_sq").isNotNull()).show(2)

+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
|          timestamp|full_sq|life_sq|floor|  sub_area|green_zone_part|indust_part|price_doc|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
|2014-03-27 00:00:00|     35|     19|    3|Ivanovskoe|    0.512707469| 1.69676E-4|  5300000|
|2014-03-27 00:00:00|     65|     43|   11|    Perovo|    0.065408826|0.225824954| 10300000|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
only showing top 2 rows



In [203]:
df_with_null.count(), df_with_null.na.drop().count()

(30471, 24088)

In [206]:
df_with_null.na.fill({"life_sq": 0}).show(2)

+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
|          timestamp|full_sq|life_sq|floor|  sub_area|green_zone_part|indust_part|price_doc|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
|2014-03-27 00:00:00|     35|     19|    3|Ivanovskoe|    0.512707469| 1.69676E-4|  5300000|
|2014-03-27 00:00:00|     46|      0|   12|  Tverskoe|    0.065444314| 7.81528E-5|  4648932|
+-------------------+-------+-------+-----+----------+---------------+-----------+---------+
only showing top 2 rows



## Structs

In [209]:
df.selectExpr("(green_zone_part, indust_part) as area_parts", "*").limit(2).toPandas()

Unnamed: 0,area_parts,timestamp,full_sq,life_sq,floor,sub_area,green_zone_part,indust_part,price_doc
0,"(0.512707469, 0.000169676)",2014-03-27,35,19.0,3,Ivanovskoe,0.512707,0.00017,5300000
1,"(0.065444314, 7.81528e-05)",2014-03-27,46,,12,Tverskoe,0.065444,7.8e-05,4648932


In [213]:
df_with_struct = df.select(struct("green_zone_part", "indust_part").alias("area_parts"))
df_with_struct.limit(2).toPandas()

Unnamed: 0,area_parts
0,"(0.512707469, 0.000169676)"
1,"(0.065444314, 7.81528e-05)"


In [214]:
df_with_struct.select(col("area_parts").getField("green_zone_part")).show(2)

+--------------------------+
|area_parts.green_zone_part|
+--------------------------+
|               0.512707469|
|               0.065444314|
+--------------------------+
only showing top 2 rows



In [215]:
df_with_struct.select("area_parts.green_zone_part").show(2)

+---------------+
|green_zone_part|
+---------------+
|    0.512707469|
|    0.065444314|
+---------------+
only showing top 2 rows



In [216]:
df_with_struct.select("area_parts.*").show(2)

+---------------+-----------+
|green_zone_part|indust_part|
+---------------+-----------+
|    0.512707469| 1.69676E-4|
|    0.065444314| 7.81528E-5|
+---------------+-----------+
only showing top 2 rows



## Arrays

In [246]:
df_split = df.select(split(col("sub_area"), " ").alias("sub_area_split")).filter(size(col("sub_area_split")) > 1)
df_split.selectExpr("sub_area_split[1]").show(2)

+-----------------+
|sub_area_split[1]|
+-----------------+
|           Butovo|
|       Moskovskij|
+-----------------+
only showing top 2 rows



In [247]:
df_exploded = df_split.withColumn("id", monotonically_increasing_id()).withColumn("exploded", explode(col("sub_area_split")))
df_exploded.limit(5).toPandas()

Unnamed: 0,sub_area_split,id,exploded
0,"[Juzhnoe, Butovo]",0,Juzhnoe
1,"[Juzhnoe, Butovo]",0,Butovo
2,"[Poselenie, Moskovskij]",1,Poselenie
3,"[Poselenie, Moskovskij]",1,Moskovskij
4,"[Nagatinskij, Zaton]",2,Nagatinskij


In [249]:
# https://stackoverflow.com/questions/43357727/how-to-do-opposite-of-explode-in-pyspark
df_exploded.groupBy("id").agg(collect_list("exploded")).limit(2).toPandas()

Unnamed: 0,id,collect_list(exploded)
0,26,"[Poselenie, Moskovskij]"
1,29,"[Poselenie, Vnukovskoe]"


## Maps

In [253]:
df_map = df.select(create_map("sub_area", "price_doc").alias("m"))
df_map.limit(2).toPandas()

Unnamed: 0,m
0,{'Ivanovskoe': 5300000}
1,{'Tverskoe': 4648932}


In [255]:
df_map.selectExpr("m['Ivanovskoe']").show(2)

+-------------+
|m[Ivanovskoe]|
+-------------+
|      5300000|
|         null|
+-------------+
only showing top 2 rows



## Json

In [267]:
df_json = spark.range(1).selectExpr("""'{"key1": {"value1": [1, 2, 3]}, "key2": 5}' as json_str""")
df_json.collect()[0]['json_str']

'{"key1": {"value1": [1, 2, 3]}, "key2": 5}'

In [271]:
df_json.select(get_json_object(df_json.json_str, "$.key1.value1[2]")).show()

+-------------------------------------------+
|get_json_object(json_str, $.key1.value1[2])|
+-------------------------------------------+
|                                          3|
+-------------------------------------------+



In [277]:
df.selectExpr("(green_zone_part, indust_part) as area_parts").select(to_json(col("area_parts")).alias("json")).limit(1).collect()[0]["json"]

'{"green_zone_part":0.512707469,"indust_part":1.69676E-4}'

## UDF (User-Defined Functions)

In [278]:
def rub_to_usd(rub): return rub / 75
rub_to_usd(100)

1.3333333333333333

In [283]:
rub_to_usd_udf = udf(rub_to_usd) # register UDF
df.select("price_doc", rub_to_usd_udf(col("price_doc"))).show(2)

+---------+---------------------+
|price_doc|rub_to_usd(price_doc)|
+---------+---------------------+
|  5300000|    70666.66666666667|
|  4648932|             61985.76|
+---------+---------------------+
only showing top 2 rows



In [285]:
spark.udf.register("rub_to_usd", rub_to_usd)
df.selectExpr("price_doc", "rub_to_usd(price_doc)").show(2)

+---------+---------------------+
|price_doc|rub_to_usd(price_doc)|
+---------+---------------------+
|  5300000|    70666.66666666667|
|  4648932|             61985.76|
+---------+---------------------+
only showing top 2 rows



In [327]:
@udf('double')
def rub_to_usd_dec(rub): return rub / 75
df.select("price_doc", rub_to_usd_dec(col("price_doc"))).show(2)

+---------+-------------------------+
|price_doc|rub_to_usd_dec(price_doc)|
+---------+-------------------------+
|  5300000|        70666.66666666667|
|  4648932|                 61985.76|
+---------+-------------------------+
only showing top 2 rows



In [304]:
def rub_to_usd_pandas(rub: pd.Series) -> pd.Series:
    # https://stackoverflow.com/questions/58878848/java-lang-illegalargumentexception-when-applying-a-python-udf-to-a-spark-datafra
    # https://issues.apache.org/jira/browse/SPARK-29367
    #import os
    #os.environ['ARROW_PRE_0_15_IPC_FORMAT']='1'
    return rub / 75

rub_to_usd_pandas(pd.Series([100, 200, 300]))

0    1.333333
1    2.666667
2    4.000000
dtype: float64

In [317]:
rub_to_usd_pd_udf = pandas_udf(rub_to_usd_pandas, returnType=LongType())
df.select("price_doc", rub_to_usd_pd_udf(col("price_doc"))).show(2)

+---------+----------------------------+
|price_doc|rub_to_usd_pandas(price_doc)|
+---------+----------------------------+
|  5300000|                       70666|
|  4648932|                       61985|
+---------+----------------------------+
only showing top 2 rows



In [335]:
@pandas_udf('double', functionType=PandasUDFType.SCALAR)
def rub_to_usd_pd_dec(rub):
    return rub / 75
df.select("price_doc", rub_to_usd_pd_dec(col("price_doc"))).show(2)

+---------+----------------------------+
|price_doc|rub_to_usd_pd_dec(price_doc)|
+---------+----------------------------+
|  5300000|           70666.66666666667|
|  4648932|                    61985.76|
+---------+----------------------------+
only showing top 2 rows



## Statistical methods

In [143]:
df.select("sub_area").distinct().count()

146

In [181]:
df.stat.corr("full_sq", "price_doc")

0.3418404597538172

In [177]:
df.select(corr(unix_timestamp(col("timestamp")), col("price_doc"))).show()

+---------------------------------------------------------------+
|corr(unix_timestamp(timestamp, yyyy-MM-dd HH:mm:ss), price_doc)|
+---------------------------------------------------------------+
|                                            0.11675321295024581|
+---------------------------------------------------------------+



In [386]:
df.describe().toPandas()

Unnamed: 0,summary,full_sq,life_sq,floor,sub_area,green_zone_part,indust_part,price_doc
0,count,30471.0,30471.0,30471.0,30471,30471.0,30471.0,30471.0
1,mean,54.21426930524105,34.403271338425775,7.670802534318902,,0.2189222638481504,0.1188713093716776,7123035.27773949
2,stddev,38.03148732410504,52.28573347591002,5.319988520945879,,0.1750901595436135,0.118687512976336,4780111.329633794
3,min,0.0,0.0,0.0,Ajeroport,0.001879375,0.0,100000.0
4,max,5326.0,,,Zjuzino,0.852922841,0.521867054,111111112.0


In [145]:
df.sample(withReplacement=False, fraction=0.0002, seed=42).show()

+-------------------+-------+-------+-----+--------------------+---------------+-----------+---------+
|          timestamp|full_sq|life_sq|floor|            sub_area|green_zone_part|indust_part|price_doc|
+-------------------+-------+-------+-----+--------------------+---------------+-----------+---------+
|2012-10-19 00:00:00|     53|     29|   11|             Mar'ino|    0.188712765|0.090799103|  7500000|
|2014-09-19 00:00:00|     64|     NA|    9|Poselenie Moskovskij|    0.492839845|0.075778755|  6004650|
|2013-09-10 00:00:00|     39|     18|   12|Pokrovskoe Stresh...|    0.183968555|0.150817539|  6650000|
|2012-03-21 00:00:00|     40|     18|   16|            Brateevo|    0.398794394|        0.0|  6600000|
+-------------------+-------+-------+-----+--------------------+---------------+-----------+---------+



In [148]:
data_frames = df.randomSplit([0.7, 0.2, 0.1], seed=42)
len(data_frames), data_frames[0].count(), data_frames[1].count(), data_frames[2].count()

(3, 21367, 6116, 2988)

In [149]:
data_frames[0].union(data_frames[1]).union(data_frames[2]).count()

30471

In [150]:
df.sort(col("price_doc").desc()).show(5)

+-------------------+-------+-------+-----+--------------------+---------------+-----------+---------+
|          timestamp|full_sq|life_sq|floor|            sub_area|green_zone_part|indust_part|price_doc|
+-------------------+-------+-------+-----+--------------------+---------------+-----------+---------+
|2012-05-10 00:00:00|     55|     31|   12|         Teplyj Stan|    0.426396297|0.002211665|111111112|
|2015-03-12 00:00:00|    220|    144|    3|         Presnenskoe|    0.068202173|0.042031587| 95122496|
|2013-04-19 00:00:00|    185|     66|   33|             Ramenki|    0.169625419|0.019508796| 91066096|
|2014-05-29 00:00:00|    184|     85|   10|             Ramenki|    0.169625419|0.019508796| 80777440|
|2013-03-06 00:00:00|    206|    128|    2|Pokrovskoe Stresh...|    0.183968555|0.150817539| 78802248|
+-------------------+-------+-------+-----+--------------------+---------------+-----------+---------+
only showing top 5 rows



## Aggregation

* https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
* https://stackoverflow.com/questions/40006395/applying-udfs-on-groupeddata-in-pyspark-with-functioning-python-example
* https://stackoverflow.com/questions/48160252/user-defined-function-to-be-applied-to-window-in-pyspark
* https://danvatterott.com/blog/2018/09/06/python-aggregate-udfs-in-pyspark/
* https://stackoverflow.com/questions/61855832/remove-rows-from-pyspark-dataframe-by-type

In [289]:
df.select(count("sub_area"), countDistinct("sub_area"), approx_count_distinct("sub_area", 0.1)).show()

+---------------+------------------------+-------------------------------+
|count(sub_area)|count(DISTINCT sub_area)|approx_count_distinct(sub_area)|
+---------------+------------------------+-------------------------------+
|          30471|                     146|                            128|
+---------------+------------------------+-------------------------------+



In [291]:
df.groupBy("sub_area", "floor").count().show()

+--------------------+-----+-----+
|            sub_area|floor|count|
+--------------------+-----+-----+
|           Taganskoe|   10|    8|
|        Metrogorodok|    4|   11|
|          Nekrasovka|   18|    8|
|         Bogorodskoe|   10|    7|
|       Lomonosovskoe|   10|    4|
|             Zjuzino|   16|    3|
|          Ivanovskoe|   15|    1|
|Orehovo-Borisovo ...|   NA|    1|
|       Lomonosovskoe|   12|    1|
|Birjulevo Vostochnoe|    6|   26|
|Poselenie Shherbinka|    2|   29|
|            Tverskoe|   11|   26|
| Vostochnoe Degunino|    5|   15|
|Poselenie Desjono...|   10|   26|
|     Severnoe Butovo|    5|   14|
|  Severnoe Izmajlovo|    1|   19|
|            Ljublino|   12|   12|
|  Juzhnoe Medvedkovo|    9|   12|
|            Jasenevo|    6|   15|
|          Sokol'niki|    6|    4|
+--------------------+-----+-----+
only showing top 20 rows



In [293]:
df.groupBy("sub_area").agg(avg("price_doc"), first("green_zone_part")).show()

+--------------------+--------------------+-----------------------------+
|            sub_area|      avg(price_doc)|first(green_zone_part, false)|
+--------------------+--------------------+-----------------------------+
|  Nagatino-Sadovniki|   7430191.227848101|                  0.094680962|
|          Pechatniki|        5684482.8125|                  0.030696593|
|Poselenie Filimon...|   3374459.227822581|                  0.548991609|
|             Hovrino|   8546164.140449438|                  0.184282943|
|Orehovo-Borisovo ...|   6677480.558252427|                  0.387099854|
|     Juzhnoe Tushino|   7185202.171428571|                  0.222646321|
|         Presnenskoe|1.4032071415789474E7|                  0.068202173|
|           Basmannoe|1.1587719163265307E7|                  0.015233744|
|        Jaroslavskoe|           6335919.0|                  0.037753826|
|            Caricyno|   5840068.181818182|                  0.071119749|
|             Marfino|   6754174.58823

In [336]:
@pandas_udf(df.schema, functionType=PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def subtract_mean(pdf):
    return pdf.assign(price_doc=pdf.price_doc - pdf.price_doc.mean())

df.groupBy("sub_area").apply(subtract_mean).select("sub_area", "price_doc").show(10)

+------------------+---------+
|          sub_area|price_doc|
+------------------+---------+
|Nagatino-Sadovniki|   369808|
|Nagatino-Sadovniki|  -130191|
|Nagatino-Sadovniki| -1630191|
|Nagatino-Sadovniki|  -780191|
|Nagatino-Sadovniki| -1230191|
|Nagatino-Sadovniki| -5430191|
|Nagatino-Sadovniki|  1469808|
|Nagatino-Sadovniki|  4413808|
|Nagatino-Sadovniki| -1430191|
|Nagatino-Sadovniki| -3630191|
+------------------+---------+
only showing top 10 rows



In [345]:
@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def non_life_sq(full_sq, life_sq):
    return (full_sq - life_sq).mean()

df.withColumn("life_sq_int", df.life_sq.cast(LongType())).na.drop().groupBy("sub_area").agg(non_life_sq("full_sq", "life_sq_int").alias("non_life_sq_mean")).show(5)

+--------------------+------------------+
|            sub_area|  non_life_sq_mean|
+--------------------+------------------+
|  Nagatino-Sadovniki|19.098039215686274|
|          Pechatniki|17.979166666666668|
|             Hovrino|21.594285714285714|
|Poselenie Filimon...| 5.853658536585366|
|Orehovo-Borisovo ...| 17.43915343915344|
+--------------------+------------------+
only showing top 5 rows



## Window functions

In [295]:
window = Window \
    .partitionBy("sub_area") \
    .orderBy("price_doc") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [299]:
df.select("sub_area",
    "price_doc",
    max("price_doc").over(window),  # max price so far
    rank().over(window),
    dense_rank().over(window)).limit(5).toPandas()

Unnamed: 0,sub_area,price_doc,max(price_doc) OVER (PARTITION BY sub_area ORDER BY price_doc ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),RANK() OVER (PARTITION BY sub_area ORDER BY price_doc ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),DENSE_RANK() OVER (PARTITION BY sub_area ORDER BY price_doc ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
0,Nagatino-Sadovniki,990000,990000,1,1
1,Nagatino-Sadovniki,990000,990000,1,1
2,Nagatino-Sadovniki,1000000,1000000,3,2
3,Nagatino-Sadovniki,1000000,1000000,3,2
4,Nagatino-Sadovniki,1250000,1250000,5,3


In [303]:
spark.sql("""
    SELECT sub_area,
           price_doc,
           dense_rank(price_doc) OVER (PARTITION BY sub_area ORDER BY price_doc ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
    FROM df
""").limit(5).toPandas()

Unnamed: 0,sub_area,price_doc,DENSE_RANK() OVER (PARTITION BY sub_area ORDER BY price_doc ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
0,Nagatino-Sadovniki,990000,1
1,Nagatino-Sadovniki,990000,1
2,Nagatino-Sadovniki,1000000,2
3,Nagatino-Sadovniki,1000000,2
4,Nagatino-Sadovniki,1250000,3


## Joins

In [379]:
# Data is from https://www.domofond.ru/city-ratings/moskva-c3584
header = ["District", "Overall", "Ecology", "Cleanness", "Services", "Neighbours", "Childhood", "Leasure", "Shops", "Transport", "Safety", "LivingCost"]
data = """Kurkino,1,1,1,1,1,1,1,4,1,1,1
Strogino,2,3,5,3,5,3,2,11,26,11,4
Mitino,3,9,3,5,13,4,3,6,8,13,16
Marfino,4,2,2,9,11,27,85,68,39,3,2
Troparevo-Nikulino,5,6,27,7,4,41,8,25,10,14,7
Southern Butovo,6,12,11,8,10,8,32,32,13,23,17
Chertanovo Severnoye,7,10,14,11,23,16,12,38,21,21,8
Severnoe Butovo,8,18,15,15,7,24,17,2,31,32,24
Krylatskoe,9,5,8,13,21,19,4,105,66,7,68
Horoshevskoe,10,66,7,34,6,9,16,91,5,4,13
Lianozovo,11,35,12,4,59,2,7,1,25,74,29
Severniy,12,19,6,12,27,26,69,48,14,35,5
Ramenki,13,46,9,17,8,47,50,98,12,2,22
Lomonosovsky,14,43,25,52,15,18,28,41,32,12,10
Vernadsky Avenue,15,13,16,16,32,43,33,82,75,8,20
Novo-Peredelkino,16,21,13,6,36,22,15,9,78,53,40
Babushkinsky,17,11,36,43,53,33,11,34,24,34,14
Orekhovo-Borisovo Severnoye,18,41,10,10,51,11,26,19,59,33,19
Yasenevo,19,4,50,44,26,6,22,43,34,63,57
Teplyj Stan,20,22,51,18,20,35,47,28,30,45,3
Sokolniki,21,44,97,38,3,28,5,42,23,28,6
Gagarinsky,22,33,18,59,17,39,10,62,2,26,71
Akademicheskoe,23,48,28,31,30,17,36,67,18,20,9
Severnoe Tushino,24,7,49,23,58,23,14,57,42,54,34
Konkovo,25,8,46,24,54,29,25,15,38,51,54
Golovinsky,26,14,77,40,22,45,49,31,44,30,26
Kosino-Ukhtomsky,27,107,4,2,28,13,23,16,3,16,27
Obruchevsky,28,37,68,37,9,12,6,21,92,59,78
Ostankino,29,55,42,26,19,55,45,73,74,9,21
Filevsky Park,30,20,52,35,41,90,46,58,6,31,18
Brateevo,31,87,32,20,57,7,9,5,36,40,60
Novokosino,32,98,31,14,25,5,29,3,33,24,64
Preobrazhenskoe,33,25,37,48,74,54,63,54,48,46,11
Zyuzino,34,31,63,30,55,30,61,72,45,38,36
Fili-Davydkovo,35,32,34,78,24,38,68,52,68,29,39
Juzhnoe Tushino,36,16,38,63,95,63,19,56,65,39,67
Severnoe Medvedkovo,37,45,47,61,70,20,39,12,47,66,35
Sokol,38,62,33,27,12,101,60,106,49,18,31
Levoberezhny,39,39,84,65,38,44,40,102,81,17,23
Rostokino,40,23,17,25,46,70,37,107,11,68,99
Hamovniki,41,83,26,79,18,52,20,111,85,6,42
Solntsevo,42,29,39,19,35,65,62,49,103,60,45
Bibirevo,43,40,76,32,68,25,21,14,89,81,41
Novogireevo,44,56,71,69,48,21,35,7,29,78,69
Tagansky,45,100,29,29,56,31,18,79,27,27,12
Moskvorechye-Saburovo,46,68,20,39,45,48,83,104,7,25,84
Donskoy,47,74,85,100,14,15,30,53,9,56,48
Ivanovskoe,48,27,59,64,97,14,75,13,41,72,37
Izmailovo,49,15,107,103,40,59,34,23,53,62,28
Yuzhnoye-Medvedkovo,50,53,70,47,73,77,24,51,37,58,53
Cheryomushki,51,49,30,21,42,64,84,84,71,37,97
Tverskoy,52,99,19,22,2,75,73,112,58,5,86
Kotlovka,53,36,22,53,78,62,71,37,16,47,113
Shchukino,54,26,44,58,89,81,43,93,76,42,72
Losinoostrovsky,55,17,43,51,61,49,88,35,91,69,93
Voikovsky,56,59,86,71,43,51,58,36,35,70,43
Nagatinsky Zaton,57,69,57,41,31,78,65,97,83,41,32
Nagatino-Sadovniki,58,70,56,90,99,36,31,26,22,50,52
Vostochnoe Izmajlovo,59,30,89,102,49,37,48,18,43,87,44
Otradnoe,60,78,35,36,87,32,57,8,52,90,56
Yaroslavl,61,34,54,70,66,53,89,22,70,67,85
Presnensky,62,93,45,67,16,85,52,109,40,10,79
Alekseyevsky,63,77,21,42,103,93,38,65,28,52,62
Zyablikovo,64,67,48,49,84,57,70,20,51,76,80
Sviblovo,65,28,81,94,83,106,55,46,4,73,47
Pokrovskoe-Streshnevo,66,50,91,83,37,56,79,85,46,71,66
Basmanny,67,91,72,74,47,86,51,92,15,44,33
Horoshevo-Mnevniki,68,72,58,55,64,69,53,55,97,49,87
Butyrsky,69,85,73,99,69,40,76,76,63,36,15
Koptevo,70,47,99,107,39,95,86,39,20,57,49
Sokolinaja Gora,71,64,109,86,33,108,27,64,19,65,59
Chertanovo Central'noe,72,65,55,46,102,66,56,71,56,93,74
Zamoskvorechye,73,97,40,50,90,102,42,108,73,22,50
Biryulyovo-Vostochnoye,74,60,53,72,92,50,87,30,105,82,77
Maryino,75,108,74,33,63,10,13,10,77,94,75
Nagorny,76,88,23,60,50,72,104,89,55,43,107
Orehovo-Borisovo Juzhnoe,77,82,41,28,93,68,95,27,90,85,82
Aeroport,78,84,103,108,34,92,82,90,82,19,46
Veshnjaki,79,61,88,84,75,34,44,29,94,106,96
Chertanovo Juzhnoe,80,75,66,56,94,71,78,33,96,92,73
Severnoe Izmajlovo,81,42,110,87,60,67,59,87,67,102,63
Savjolovskij,82,86,61,76,67,105,66,77,100,55,58
Perovo,83,54,94,96,82,87,67,69,62,86,90
Altuf'evskij,84,81,87,45,79,83,77,24,60,105,61
Kuncevo,85,52,78,106,65,80,90,86,87,75,100
Timirjazevskij,86,58,101,91,77,103,94,103,64,61,70
Hovrino,87,79,96,89,44,42,81,99,95,80,92
Mar'ina Roshha,88,104,64,92,71,84,41,17,84,64,88
Tekstil'shhiki,89,89,80,54,96,58,74,44,50,107,51
Vyhino-Zhulebino,90,90,98,98,52,46,72,40,102,88,76
Tsaritsyno,91,51,83,80,101,96,106,94,99,97,55
Lefortovo,92,102,90,95,81,94,80,59,54,79,25
Ochakovo-Matveevskoe,93,76,62,62,76,88,112,96,107,77,94
Beskudnikovskij,94,80,69,82,100,61,93,88,101,99,106
Vostochnoe Degunino,95,63,60,88,112,74,102,61,93,91,109
Mozhajskij,96,73,82,77,85,100,101,81,110,83,103
Begovoj,97,111,24,75,62,114,105,101,17,48,30
Bogorodskoe,98,57,104,113,104,98,103,66,72,89,89
Dorogomilovo,99,112,65,57,29,110,110,114,69,15,105
Gol'janovo,100,71,105,101,86,91,92,47,104,110,95
Danilovskij,101,105,92,93,91,111,91,80,61,84,65
Dmitrovskij,102,92,102,104,98,73,54,60,112,95,108
Metrogorodok,103,38,112,111,88,89,114,78,57,100,91
Vnukovo,104,24,67,73,72,113,111,113,113,104,102
Zapadnoe Degunino,105,96,108,81,80,99,100,45,109,103,98
Rjazanskij,106,95,106,105,105,76,99,50,88,111,83
Krasnosel'skij,107,103,93,85,111,109,97,95,80,109,101
Pechatniki,108,106,113,110,106,79,64,70,79,108,111
Kuz'minki,109,94,111,109,108,60,96,63,108,112,81
Nizhegorodskij,110,113,95,97,110,112,108,100,86,98,38
Birjuljovo Zapadnoe,111,101,75,68,113,82,109,75,111,114,112
Juzhnoportovyj,112,110,100,112,114,107,107,74,98,101,110
Ljublino,113,109,114,114,109,104,98,83,106,113,114
Nekrasovka,114,114,79,66,107,97,113,110,114,96,104"""

In [383]:
df_ranking = spark.createDataFrame([line.split(",") for line in data.split("\n")]).toDF(*header)
df_ranking.show(5)

+------------------+-------+-------+---------+--------+----------+---------+-------+-----+---------+------+----------+
|          District|Overall|Ecology|Cleanness|Services|Neighbours|Childhood|Leasure|Shops|Transport|Safety|LivingCost|
+------------------+-------+-------+---------+--------+----------+---------+-------+-----+---------+------+----------+
|           Kurkino|      1|      1|        1|       1|         1|        1|      1|    4|        1|     1|         1|
|          Strogino|      2|      3|        5|       3|         5|        3|      2|   11|       26|    11|         4|
|            Mitino|      3|      9|        3|       5|        13|        4|      3|    6|        8|    13|        16|
|           Marfino|      4|      2|        2|       9|        11|       27|     85|   68|       39|     3|         2|
|Troparevo-Nikulino|      5|      6|       27|       7|         4|       41|      8|   25|       10|    14|         7|
+------------------+-------+-------+---------+--

In [385]:
def get_districts(data_frame, column_name):
    return set(data_frame.select(column_name).distinct().toPandas()[column_name])

districts_sales = get_districts(df, "sub_area")
districts_ranking = get_districts(df_ranking, "District")
print(len(districts_sales), len(districts_ranking), len(set(districts_sales).intersection(set(districts_ranking))))

146 114 58


In [387]:
# TODO:
# 1. Demo join expression
# 2. Demo various join types
# 3. Demo join on complex types
# 4. Demo broadcast

In [389]:
# TODO:
# * SQL for each DataFrame API call
# * Correlated & uncorrelated predicate subqueries
# * Writing/reading Parquet files
# * Anything else from "Learning Spark"?
# * More on debugging & tuning# * Examples with pivots, etc https://stackoverflow.com/questions/37864222/transpose-column-to-row-with-spark https://stackoverflow.com/questions/42465568/unpivot-in-spark-sql-pyspark

# Debugging

In [21]:
df.sort('timestamp').explain()

== Physical Plan ==
Sort [timestamp#11 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(timestamp#11 ASC NULLS FIRST, 200)
   +- FileScan csv [id#10,timestamp#11,full_sq#12,life_sq#13,floor#14,max_floor#15,material#16,build_year#17,num_room#18,kitch_sq#19,state#20,product_type#21,sub_area#22,area_m#23,raion_popul#24,green_zone_part#25,indust_part#26,children_preschool#27,preschool_quota#28,preschool_education_centers_raion#29,children_school#30,school_quota#31,school_education_centers_raion#32,school_education_centers_top_20_raion#33,... 268 more fields] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/anton/datasets/sberbank_russian_housing_market/train.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,timestamp:timestamp,full_sq:int,life_sq:string,floor:string,max_floor:string,materi...


In [287]:
#df.cache() #TODO good example and explain