In [10]:
from pyspark import SparkContext
from pyspark.sql import Row, SQLContext, SparkSession
import os
import pyspark.sql.functions as F

In [2]:
import warnings
warnings.filterwarnings("ignore")

In [3]:
sc = SparkContext()

22/05/09 11:54:28 WARN Utils: Your hostname, Nathans-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.255.49.235 instead (on interface en0)
22/05/09 11:54:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/09 11:54:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/09 11:54:29 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
spark = SparkSession(sc)

In [63]:
file_path = os.path.join('..', 'src-data', '2017_StPaul_MN_Real_Estate.csv')
df = spark.read.csv(file_path, header = True, inferSchema = True)

In [6]:
df_sub = df.select('STREETNUMBERNUMERIC', 'FIREPLACES', 
                   'LOTSIZEDIMENSIONS', 'LISTTYPE', 'ACRES', 
                   'ASSUMABLEMORTGAGE', 'SalesClosePrice', 'ListPrice',
                   'DAYSONMARKET', 'YEARBUILT')

### 1. Fundamental Mathematic Operation

sum 2 columns

In [10]:
df = df.withColumn ('TSQT', (df['SQFTBELOWGROUND'] + df['SQFTABOVEGROUND']))

ratio

In [8]:
df = df.withColumn('BED_TO_BATHS', 
                  df['Bedrooms'] / df['BathsFull'])

In [9]:
df['BED_TO_BATHS'].show(3)

+------------+
|BED_TO_BATHS|
+------------+
|         3.0|
|         4.0|
|         2.0|
+------------+
only showing top 3 rows



### 2. Regular Expression
Use `regexp_extract`. Read [here](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.regexp_extract.html)

### 3. Time Feature

In [11]:
from datetime import datetime
from pyspark.sql.types import DateType

In [13]:
# convert to date type
func = F.udf(lambda x: datetime.strptime(x, "%m/%d/%Y %H:%M"), DateType())

In [14]:
# convert to Date type
df = df.withColumn("ListDateNew", func(F.col("LISTDATE")))

In [15]:
# get date
df = df.withColumn("ListDateNew_1", F.to_date("ListDateNew"))

In [16]:
# get day of week
df = df.withColumn("List_day_week", F.dayofweek("ListDateNew_1"))

In [18]:
# get year
df = df.withColumn("Year", F.year("ListDateNew_1"))

In [19]:
df[['ListDateNew', 'ListDateNew_1', 'List_day_week', 'Year']].show(5)

+-----------+-------------+-------------+----+
|ListDateNew|ListDateNew_1|List_day_week|Year|
+-----------+-------------+-------------+----+
| 2017-07-15|   2017-07-15|            7|2017|
| 2017-10-09|   2017-10-09|            2|2017|
| 2017-06-26|   2017-06-26|            2|2017|
| 2017-08-25|   2017-08-25|            6|2017|
| 2017-09-12|   2017-09-12|            3|2017|
+-----------+-------------+-------------+----+
only showing top 5 rows



### 4. Extracting Feature

In [20]:
has_attached_garage = df["GARAGEDESCRIPTION"].like("%Attached Garage%")
has_detached_garage = df["GARAGEDESCRIPTION"].like("%Detached Garage%")

In [23]:
from pyspark.sql.functions import when

In [24]:
df = df.withColumn("has_attached_garage", (F.when(has_attached_garage, 1)
                                           .when(has_detached_garage, 0)
                                           .otherwise(None)
                                          )
                  )

In [26]:
df[["GARAGEDESCRIPTION", "has_attached_garage"]].show(truncate = 50, n= 5)

+--------------------------------------------------+-------------------+
|                                 GARAGEDESCRIPTION|has_attached_garage|
+--------------------------------------------------+-------------------+
|                                   Attached Garage|                  1|
|Attached Garage, Driveway - Asphalt, Garage Doo...|                  1|
|                                   Attached Garage|                  1|
|Attached Garage, Detached Garage, Tuckunder, Dr...|                  1|
|Attached Garage, Driveway - Asphalt, Garage Doo...|                  1|
+--------------------------------------------------+-------------------+
only showing top 5 rows



`split` function: split string.

In [27]:
split_col = F.split(df["ROOF"], ",")

In [28]:
df = df.withColumn("Roof_Material", split_col.getItem(0))

In [29]:
df[["ROOF", "Roof_Material"]].show(5, truncate = 100)

+----------------------------------------------+----------------+
|                                          ROOF|   Roof_Material|
+----------------------------------------------+----------------+
|                                          null|            null|
|Asphalt Shingles, Pitched, Age 8 Years or Less|Asphalt Shingles|
|                                          null|            null|
|Asphalt Shingles, Pitched, Age 8 Years or Less|Asphalt Shingles|
|            Asphalt Shingles, Age Over 8 Years|Asphalt Shingles|
+----------------------------------------------+----------------+
only showing top 5 rows



`Explode` function.

In [30]:
df = df.withColumn("roof_list", F.split(df["ROOF"], ","))

In [31]:
df[["No", "roof_list"]].show(5, truncate = 50)

+---+--------------------------------------------------+
| No|                                         roof_list|
+---+--------------------------------------------------+
|  1|                                              null|
|  2|[Asphalt Shingles,  Pitched,  Age 8 Years or Less]|
|  3|                                              null|
|  4|[Asphalt Shingles,  Pitched,  Age 8 Years or Less]|
|  5|             [Asphalt Shingles,  Age Over 8 Years]|
+---+--------------------------------------------------+
only showing top 5 rows



'roof_list' feature contain many value by each record.

Convert it to `1NF` by `explode`.

In [32]:
roof_df = df.withColumn("ex_roof_list", F.explode(df["roof_list"]))

In [33]:
roof_df[["No", "ex_roof_list"]].show(5, truncate = 50)

+---+--------------------+
| No|        ex_roof_list|
+---+--------------------+
|  2|    Asphalt Shingles|
|  2|             Pitched|
|  2| Age 8 Years or Less|
|  4|    Asphalt Shingles|
|  4|             Pitched|
+---+--------------------+
only showing top 5 rows



### 3. Pivot

Pivot the column `ex_roof_list`

Create dummy by `lit` function. Read [here](https://sparkbyexamples.com/pyspark/pyspark-lit-add-literal-constant/)

In [36]:
# create dummy column of constant value
roof_df = roof_df.withColumn("constant_val", F.lit(1))

In [37]:
roof_df[["No", "ex_roof_list", "constant_val"]].show(5, truncate = 50)

+---+--------------------+------------+
| No|        ex_roof_list|constant_val|
+---+--------------------+------------+
|  2|    Asphalt Shingles|           1|
|  2|             Pitched|           1|
|  2| Age 8 Years or Less|           1|
|  4|    Asphalt Shingles|           1|
|  4|             Pitched|           1|
+---+--------------------+------------+
only showing top 5 rows



use `first` (read [here](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.first.html)) and `coalesce` (read [here](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.coalesce.html))

In [38]:
roof_piv_df = roof_df.groupBy("No").pivot("ex_roof_list").agg(F.coalesce(F.first("constant_val")))

In [39]:
roof_piv_df.orderBy("No").show(5)

22/05/09 12:16:40 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---+--------------------+-----------------+-----+------+------+--------+-------+-------+------+-----------+-----+--------------------+--------------+-------------------+----------------+----------------+----+-----+-----+-------+------+------+-----+----------+----+-------------------+-------------+
| No| Age 8 Years or Less| Age Over 8 Years| Flat| Metal| Other| Pitched| Rubber| Shakes| Slate| Tar/Gravel| Tile| Unspecified Shingle| Wood Shingles|Age 8 Years or Less|Age Over 8 Years|Asphalt Shingles|Flat|Metal|Other|Pitched|Rubber|Shakes|Slate|Tar/Gravel|Tile|Unspecified Shingle|Wood Shingles|
+---+--------------------+-----------------+-----+------+------+--------+-------+-------+------+-----------+-----+--------------------+--------------+-------------------+----------------+----------------+----+-----+-----+-------+------+------+-----+----------+----+-------------------+-------------+
|  2|                   1|             null| null|  null|  null|       1|   null|   null|  null|    

### 4. Join

In [40]:
joined_data = df.join(roof_piv_df, on = "No", how = "left")

In [43]:
print(f'Number of column {len(joined_data.columns)}')

Number of column 108


In [44]:
# zero fill column of root_piv_dif
zfill_cols = roof_piv_df.columns

In [45]:
zfill_cols

['No',
 ' Age 8 Years or Less',
 ' Age Over 8 Years',
 ' Flat',
 ' Metal',
 ' Other',
 ' Pitched',
 ' Rubber',
 ' Shakes',
 ' Slate',
 ' Tar/Gravel',
 ' Tile',
 ' Unspecified Shingle',
 ' Wood Shingles',
 'Age 8 Years or Less',
 'Age Over 8 Years',
 'Asphalt Shingles',
 'Flat',
 'Metal',
 'Other',
 'Pitched',
 'Rubber',
 'Shakes',
 'Slate',
 'Tar/Gravel',
 'Tile',
 'Unspecified Shingle',
 'Wood Shingles']

In [46]:
zfilled_df = joined_data.fillna(0, subset = zfill_cols)

### 5. Binarizing

`Binarizing` is technique to create new feature from existing feature into binary format (0 or 1) with threshold.

Read [here](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Binarizer.html).

In [47]:
import pyspark.ml.feature as feature 

In [49]:
df = df.withColumn("SalesClosePrice", df["SalesClosePrice"].cast("double"))

In [50]:
mean_price = df.agg({"SalesClosePrice": "mean"}).collect()[0][0]

In [51]:
binary = feature.Binarizer(threshold = mean_price, inputCol = "SalesClosePrice", outputCol = "newSalesClosePrice")

In [52]:
# transform binary to DataFrame
df_new = binary.transform(df)

In [54]:
df_new[["SalesClosePrice", "newSalesClosePrice"]].show(6)

+---------------+------------------+
|SalesClosePrice|newSalesClosePrice|
+---------------+------------------+
|       143000.0|               0.0|
|       190000.0|               0.0|
|       225000.0|               0.0|
|       265000.0|               1.0|
|       249900.0|               0.0|
|       255000.0|               0.0|
+---------------+------------------+
only showing top 6 rows



### 6. Bucketing:
`Bucketing` is a technique in both Spark and Hive used to optimize the performance of the task. In bucketing buckets (clustering columns) determine data partitioning and prevent data shuffle. Based on the value of one or more bucketing columns, the data is allocated to a predefined number of buckets.

![plot](https://miro.medium.com/max/1164/0*eXB5TGP81IzwJHvh)

Read more [here](https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f).

in `PySpark` use `Bucketizer`. Read [here](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.Bucketizer.html)

In [55]:
splits = [0,1,2,3,4,float("Inf")]

In [56]:
buck = feature.Bucketizer(splits =splits, inputCol = "BATHSTOTAL", outputCol = "baths")

In [58]:
df_new_2 = buck.transform(df)

In [59]:
df_new_2[["BATHSTOTAL", "baths"]].show(5)

+----------+-----+
|BATHSTOTAL|baths|
+----------+-----+
|         2|  2.0|
|         3|  3.0|
|         1|  1.0|
|         2|  2.0|
|         2|  2.0|
+----------+-----+
only showing top 5 rows



In [60]:
values = [(0.1, 0.0), (0.4, 1.0), (1.2, 1.3), (1.5, float("nan")), (float("nan"), 1.0), (float("nan"), 0.0)]

In [64]:
df_temp = spark.createDataFrame(values, ["values1", "values2"])

In [68]:
df_temp.show()

+-------+-------+
|values1|values2|
+-------+-------+
|    0.1|    0.0|
|    0.4|    1.0|
|    1.2|    1.3|
|    1.5|    NaN|
|    NaN|    1.0|
|    NaN|    0.0|
+-------+-------+



In [62]:
buck = feature.Bucketizer(splits = [-float("inf"), 0.5, 1.4, float("inf")], inputCol = "values1", outputCol = "baths_values1")

In [70]:
df_new_2 = buck.setHandleInvalid("keep").transform(df_temp)

In [71]:
df_new_2.show(5)

+-------+-------+-------------+
|values1|values2|baths_values1|
+-------+-------+-------------+
|    0.1|    0.0|          0.0|
|    0.4|    1.0|          0.0|
|    1.2|    1.3|          1.0|
|    1.5|    NaN|          2.0|
|    NaN|    1.0|          3.0|
+-------+-------+-------------+
only showing top 5 rows



### 7. One hot encoding
`One hot encoding`: is technique ecnoding feature into one-hoe numeric array.

In [85]:
df_temp = spark.createDataFrame([
    ("a", 2.0),
    ("b", 2.0),
    ("c", 1.0),
    ("a", 1.0),
    ("a", 0.0),
    ("b", 1.0),
    ("c", 1.0)
], ["categoryIndex", "categoryIndex2"])

In [86]:
df_temp.show()

+-------------+--------------+
|categoryIndex|categoryIndex2|
+-------------+--------------+
|            a|           2.0|
|            b|           2.0|
|            c|           1.0|
|            a|           1.0|
|            a|           0.0|
|            b|           1.0|
|            c|           1.0|
+-------------+--------------+



`StringIndexer`: It's like `label-encoder`

In [87]:
stringID = feature.StringIndexer(inputCol="categoryIndex", outputCol="categoryIndex1") 
indexer = stringID.fit(df_temp)
df_after = indexer.transform(df_temp)

In [88]:
df_after[['categoryIndex', 'categoryIndex1']].show()

+-------------+--------------+
|categoryIndex|categoryIndex1|
+-------------+--------------+
|            a|           0.0|
|            b|           1.0|
|            c|           2.0|
|            a|           0.0|
|            a|           0.0|
|            b|           1.0|
|            c|           2.0|
+-------------+--------------+



`OneHotEncoder`

In [89]:
encoder = feature.OneHotEncoder(inputCol="categoryIndex1",
                        outputCol="categoryVec1", 
                        dropLast=True) # default
encoded = encoder.fit(df_after)
df_after = encoded.transform(df_after)

In [90]:
df_after[["categoryIndex", "categoryIndex1", "categoryVec1"]].show()

+-------------+--------------+-------------+
|categoryIndex|categoryIndex1| categoryVec1|
+-------------+--------------+-------------+
|            a|           0.0|(2,[0],[1.0])|
|            b|           1.0|(2,[1],[1.0])|
|            c|           2.0|    (2,[],[])|
|            a|           0.0|(2,[0],[1.0])|
|            a|           0.0|(2,[0],[1.0])|
|            b|           1.0|(2,[1],[1.0])|
|            c|           2.0|    (2,[],[])|
+-------------+--------------+-------------+



22/05/09 13:14:24 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 164967 ms exceeds timeout 120000 ms
22/05/09 13:14:24 WARN SparkContext: Killing executors is not supported by current scheduler.
