In [1]:
#Python
import numpy as np 
import matplotlib.pyplot as plt
import pandas as pd
from matplotlib import colors

#PySpark
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
from pyspark.ml.feature import *
from pyspark.ml.stat import *
from pyspark.sql.types import *
Spark = SparkSession.builder.appName("pyspark_final").getOrCreate()
sqlContext = SQLContext(Spark)



### Read Data

In [2]:
#read data
dataset = pd.read_excel("BigDataRAW.xlsx")

#Convert all column's type to str
for col in dataset.columns:
    dataset[col] = dataset[col].astype(str)

#SQL dataframe pyspark
dataset = sqlContext.createDataFrame(dataset)
dataset.show()

+----------+--------------------+----------+------+-----------+---------+-----------------+-----------+---------------+--------------------+-------------+
|Unnamed: 0|               title|road-width|floors|    bedroom|  parking|         dientich|         KT|          huong|             dia_chi|        Price|
+----------+--------------------+----------+------+-----------+---------+-----------------+-----------+---------------+--------------------+-------------+
|         1|Bán gấp nhà Hẻm x...|        5m| 3 lầu|5 phòng ngủ|chỗ để xe| Diện tích: 80 m2|KT: 6x13,5m|       Hướng: _|Đường Cách Mạng T...|Giá: 12,8 tỷ |
|         2|Giảm 300 - 400 Tr...|        8m| 3 lầu|4 phòng ngủ|chỗ để xe| Diện tích: 70 m2|  KT: 5x15m|       Hướng: _|Quốc Lộ 13, Phườn...| Giá: 6,6 tỷ |
|         3|BÁN nhà 4 tầng Ph...|        3m| 4 lầu|3 phòng ngủ|chỗ để xe| Diện tích: 30 m2| KT: 4x7,5m|       Hướng: _|Đường Phan Đình P...| Giá: 5,8 tỷ |
|         4|Bán nhà mặt tiền ...|       20m| 2 lầu|2 phòng ngủ|      n

In [3]:
#Check type
dataset.printSchema()

#Check how many observation we have
dataset.count()

root
 |-- Unnamed: 0: string (nullable = true)
 |-- title: string (nullable = true)
 |-- road-width: string (nullable = true)
 |-- floors: string (nullable = true)
 |-- bedroom: string (nullable = true)
 |-- parking: string (nullable = true)
 |-- dientich: string (nullable = true)
 |-- KT: string (nullable = true)
 |-- huong: string (nullable = true)
 |-- dia_chi: string (nullable = true)
 |-- Price: string (nullable = true)



9980

### Data Preprocessing

In [4]:
#Check duplicate
duplicate_dataset = dataset.drop_duplicates()
duplicate_dataset.count()

9980

*Not have duplicating records in dataset because before dropping duplicating, there were 9980 rows*

In [5]:
#drop not used columns
dataset = dataset.drop("Unnamed: 0", "title")
dataset.show(10)

+----------+------+-----------+---------+-----------------+-----------+---------------+--------------------+-------------+
|road-width|floors|    bedroom|  parking|         dientich|         KT|          huong|             dia_chi|        Price|
+----------+------+-----------+---------+-----------------+-----------+---------------+--------------------+-------------+
|        5m| 3 lầu|5 phòng ngủ|chỗ để xe| Diện tích: 80 m2|KT: 6x13,5m|       Hướng: _|Đường Cách Mạng T...|Giá: 12,8 tỷ |
|        8m| 3 lầu|4 phòng ngủ|chỗ để xe| Diện tích: 70 m2|  KT: 5x15m|       Hướng: _|Quốc Lộ 13, Phườn...| Giá: 6,6 tỷ |
|        3m| 4 lầu|3 phòng ngủ|chỗ để xe| Diện tích: 30 m2| KT: 4x7,5m|       Hướng: _|Đường Phan Đình P...| Giá: 5,8 tỷ |
|       20m| 2 lầu|2 phòng ngủ|      nan| Diện tích: 96 m2|  KT: 4x24m|Hướng: Đông Bắc|Đường Mai Thị Lựu...|Giá: 25,5 tỷ |
|       30m| 3 lầu|4 phòng ngủ|chỗ để xe|Diện tích: 100 m2|  KT: 5x20m|       Hướng: _|Đường Liên Phường...|Giá: 21,1 tỷ |
|       16m| 3 l

In [6]:
#Rename the columns
dataset = dataset.withColumnRenamed("road-width", "Độ rộng đường trước nhà")\
    .withColumnRenamed("floors", "Số lầu")\
    .withColumnRenamed("bedroom", "Số phòng ngủ")\
    .withColumnRenamed("parking", "Chỗ đậu xe hơi")\
    .withColumnRenamed("dientich", "Diện Tích")\
    .withColumnRenamed("KT", "Kích Thước")\
    .withColumnRenamed("huong", "Hướng")\
    .withColumnRenamed("dia_chi", "Địa chỉ")\
    .withColumnRenamed("Price", "Giá trị")
dataset.show(10)

+-----------------------+------+------------+--------------+-----------------+-----------+---------------+--------------------+-------------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|        Diện Tích| Kích Thước|          Hướng|             Địa chỉ|      Giá trị|
+-----------------------+------+------------+--------------+-----------------+-----------+---------------+--------------------+-------------+
|                     5m| 3 lầu| 5 phòng ngủ|     chỗ để xe| Diện tích: 80 m2|KT: 6x13,5m|       Hướng: _|Đường Cách Mạng T...|Giá: 12,8 tỷ |
|                     8m| 3 lầu| 4 phòng ngủ|     chỗ để xe| Diện tích: 70 m2|  KT: 5x15m|       Hướng: _|Quốc Lộ 13, Phườn...| Giá: 6,6 tỷ |
|                     3m| 4 lầu| 3 phòng ngủ|     chỗ để xe| Diện tích: 30 m2| KT: 4x7,5m|       Hướng: _|Đường Phan Đình P...| Giá: 5,8 tỷ |
|                    20m| 2 lầu| 2 phòng ngủ|           nan| Diện tích: 96 m2|  KT: 4x24m|Hướng: Đông Bắc|Đường Mai Thị Lựu...|Giá: 25,5 tỷ |
|     

In [7]:
# Null Check
dataset.select([count(when(column(c).isNull(), c)).alias(c) for c in dataset.columns]).show()

+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|Hướng|Địa chỉ|Giá trị|
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+
|                      0|     0|           0|             0|        0|         0|    0|      0|      0|
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+



In [8]:
# Nan check
dataset.select([count(when(isnan(c), c)).alias(c) for c in dataset.columns]).show()

+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|Hướng|Địa chỉ|Giá trị|
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+
|                     17|   349|         397|          5665|        0|         0|    0|      0|      0|
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+



#### Chỗ đậu xe hơi Preprocessing

In [9]:
### Label Columns Chỗ đậu xe hơi
# 1 = Have somewhere to park
# 0 = Dont have anywhere to park
###
dataset = dataset.withColumn("Chỗ đậu xe hơi", regexp_replace("Chỗ đậu xe hơi", 'chỗ để xe', '1'))
dataset = dataset.withColumn("Chỗ đậu xe hơi", regexp_replace("Chỗ đậu xe hơi", 'nan', '0'))
dataset.show()

+-----------------------+------+------------+--------------+-----------------+-----------+---------------+--------------------+-------------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|        Diện Tích| Kích Thước|          Hướng|             Địa chỉ|      Giá trị|
+-----------------------+------+------------+--------------+-----------------+-----------+---------------+--------------------+-------------+
|                     5m| 3 lầu| 5 phòng ngủ|             1| Diện tích: 80 m2|KT: 6x13,5m|       Hướng: _|Đường Cách Mạng T...|Giá: 12,8 tỷ |
|                     8m| 3 lầu| 4 phòng ngủ|             1| Diện tích: 70 m2|  KT: 5x15m|       Hướng: _|Quốc Lộ 13, Phườn...| Giá: 6,6 tỷ |
|                     3m| 4 lầu| 3 phòng ngủ|             1| Diện tích: 30 m2| KT: 4x7,5m|       Hướng: _|Đường Phan Đình P...| Giá: 5,8 tỷ |
|                    20m| 2 lầu| 2 phòng ngủ|             0| Diện tích: 96 m2|  KT: 4x24m|Hướng: Đông Bắc|Đường Mai Thị Lựu...|Giá: 25,5 tỷ |
|     

#### Số lầu, Số phòng ngủ and Độ rộng đường trước nhà Preprocessing

In [10]:
# Label Columns Số lầu, Số phòng ngủ , Độ rộng đường trước nhà for delete nan values
dataset = dataset.withColumn("Số lầu", regexp_replace("Số lầu", 'nan', "None"))
dataset = dataset.withColumn("Số phòng ngủ", regexp_replace("Số phòng ngủ", 'nan', "None"))
dataset = dataset.withColumn("Độ rộng đường trước nhà", regexp_replace("Độ rộng đường trước nhà", 'nan', "None"))

In [11]:
# Nan check again
dataset.select([count(when(isnan(c), c)).alias(c) for c in dataset.columns]).show()

+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|Hướng|Địa chỉ|Giá trị|
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+
|                      0|     0|           0|             0|        0|         0|    0|      0|      0|
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+



In [12]:
# Delete Rows have None ( Na valus ) that we create before
dataset = dataset.filter(dataset["Số lầu"] != "None")
dataset = dataset.filter(dataset["Số phòng ngủ"] != "None")
dataset = dataset.filter(dataset["Độ rộng đường trước nhà"] != "None")
dataset.show(10)

+-----------------------+------+------------+--------------+-----------------+-----------+---------------+--------------------+-------------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|        Diện Tích| Kích Thước|          Hướng|             Địa chỉ|      Giá trị|
+-----------------------+------+------------+--------------+-----------------+-----------+---------------+--------------------+-------------+
|                     5m| 3 lầu| 5 phòng ngủ|             1| Diện tích: 80 m2|KT: 6x13,5m|       Hướng: _|Đường Cách Mạng T...|Giá: 12,8 tỷ |
|                     8m| 3 lầu| 4 phòng ngủ|             1| Diện tích: 70 m2|  KT: 5x15m|       Hướng: _|Quốc Lộ 13, Phườn...| Giá: 6,6 tỷ |
|                     3m| 4 lầu| 3 phòng ngủ|             1| Diện tích: 30 m2| KT: 4x7,5m|       Hướng: _|Đường Phan Đình P...| Giá: 5,8 tỷ |
|                    20m| 2 lầu| 2 phòng ngủ|             0| Diện tích: 96 m2|  KT: 4x24m|Hướng: Đông Bắc|Đường Mai Thị Lựu...|Giá: 25,5 tỷ |
|     

In [13]:
dataset.count()

9363

*so we only have 9363 row after checking Null and Nan*

#### Format some columns 

In [14]:
#Format columns
dataset = dataset.withColumn("Số lầu", regexp_replace("Số lầu", 'lầu', ""))
dataset = dataset.withColumn("Số phòng ngủ", regexp_replace("Số phòng ngủ", "phòng ngủ", ""))
dataset = dataset.withColumn("Độ rộng đường trước nhà", regexp_replace("Độ rộng đường trước nhà", 'm', ""))
dataset = dataset.withColumn("Diện Tích", regexp_replace("Diện Tích", 'm2', ""))
dataset = dataset.withColumn("Diện Tích", regexp_replace("Diện Tích", "Diện tích:", ""))
dataset = dataset.withColumn("Kích Thước", regexp_replace("Kích Thước", 'KT:', ""))
dataset = dataset.withColumn("Kích Thước", regexp_replace("Kích Thước", '---', "0x0"))
dataset = dataset.withColumn("Kích Thước", regexp_replace("Kích Thước", 'm', ""))
dataset.show(10)

+-----------------------+------+------------+--------------+---------+----------+---------------+--------------------+-------------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|          Hướng|             Địa chỉ|      Giá trị|
+-----------------------+------+------------+--------------+---------+----------+---------------+--------------------+-------------+
|                      5|    3 |          5 |             1|      80 |    6x13,5|       Hướng: _|Đường Cách Mạng T...|Giá: 12,8 tỷ |
|                      8|    3 |          4 |             1|      70 |      5x15|       Hướng: _|Quốc Lộ 13, Phườn...| Giá: 6,6 tỷ |
|                      3|    4 |          3 |             1|      30 |     4x7,5|       Hướng: _|Đường Phan Đình P...| Giá: 5,8 tỷ |
|                     20|    2 |          2 |             0|      96 |      4x24|Hướng: Đông Bắc|Đường Mai Thị Lựu...|Giá: 25,5 tỷ |
|                     30|    3 |          4 |             1|     100 

#### Kích Thước, Diện Tích Preprocessing

In [15]:
dataset.filter(dataset["Diện tích"] == "0").show()

+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|Hướng|Địa chỉ|Giá trị|
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+



In [16]:
dataset.filter(dataset["Diện tích"] == "KXĐ").show()

+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|Hướng|Địa chỉ|Giá trị|
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+



In [17]:
#Check all 9363 observation by Diện Tích
dataset.groupBy('Diện Tích').count().show(9363)

+---------+-----+
|Diện Tích|count|
+---------+-----+
|      97 |   20|
|     190 |   28|
|      47 |   34|
|      53 |   55|
|     168 |   17|
|     118 |   13|
|     106 |   19|
|      83 |   35|
|      18 |   10|
|      49 |   38|
|     250 |   44|
|     472 |    1|
|      34 |   19|
|     615 |    4|
|     180 |   66|
|      68 |  166|
|     120 |  209|
|      73 |   37|
|     198 |    9|
|      87 |   14|
|     143 |    6|
|     152 |    8|
|     203 |    2|
|     132 |   17|
|     114 |   20|
|      43 |   31|
|      56 |  151|
|     108 |   39|
|      40 |  175|
|     163 |    7|
|      76 |  102|
|      52 |  131|
|      81 |   42|
|     360 |   11|
|     125 |   54|
|     312 |    3|
|     170 |   30|
|     208 |    3|
|     490 |    2|
|     110 |  128|
|      58 |   66|
|      26 |   15|
|     195 |    7|
|     205 |    5|
|      88 |  104|
|      30 |   42|
|      36 |   59|
|     112 |   49|
|      77 |   41|
|     268 |    4|
|     144 |   35|
|     480 |   11|
|     220 

*So,in this dataset, we dont have a house that dont have Diện tích = KXĐ và Diện tích = 0*

In [18]:
#Format column Chiều dài and Chiều Rộng
dataset = dataset.withColumn("Chiều dài", split(dataset['Kích Thước'], 'x').getItem(0))
dataset = dataset.withColumn("Chiều dài",regexp_replace("Chiều dài", ',', "."))
dataset = dataset.withColumn("Chiều rộng", split(dataset['Kích Thước'], 'x').getItem(1))
dataset = dataset.withColumn("Chiều rộng",regexp_replace("Chiều rộng", ',', "."))
#in case the data of houses lacks length and width, we assume length = width and replace 0 with the value of square root
dataset = dataset.withColumn("Chiều dài", when(dataset["Chiều dài"] == 0,pow(column('Diện tích'),1/2)).otherwise(dataset["Chiều dài"]))
dataset = dataset.withColumn("Chiều rộng", when(dataset["Chiều rộng"] == 0,pow(column('Diện tích'),1/2)).otherwise(dataset["Chiều rộng"]))

dataset.show(10)

+-----------------------+------+------------+--------------+---------+----------+---------------+--------------------+-------------+---------+----------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|          Hướng|             Địa chỉ|      Giá trị|Chiều dài|Chiều rộng|
+-----------------------+------+------------+--------------+---------+----------+---------------+--------------------+-------------+---------+----------+
|                      5|    3 |          5 |             1|      80 |    6x13,5|       Hướng: _|Đường Cách Mạng T...|Giá: 12,8 tỷ |        6|      13.5|
|                      8|    3 |          4 |             1|      70 |      5x15|       Hướng: _|Quốc Lộ 13, Phườn...| Giá: 6,6 tỷ |        5|        15|
|                      3|    4 |          3 |             1|      30 |     4x7,5|       Hướng: _|Đường Phan Đình P...| Giá: 5,8 tỷ |        4|       7.5|
|                     20|    2 |          2 |             0|      96 |      

#### Giá Trị  preprocessing

In [19]:
#Check all 9363 observation by Giá trị
dataset.groupBy('Giá trị').count().show(9363)

+--------------------+-----+
|             Giá trị|count|
+--------------------+-----+
|       Giá: 4,68 tỷ |    4|
|       Giá: 3,95 tỷ |   16|
|          Giá: 9 tỷ |   96|
|        Giá: 8,1 tỷ |   20|
|       Giá: 3,88 tỷ |    1|
|         Giá: 80 tỷ |   23|
|        Giá: 2,6 tỷ |    5|
|       Giá: 2,45 tỷ |    3|
|         Giá: 37 tỷ |   28|
|        Giá: 1,1 tỷ |    7|
|        Giá: 105 tỷ |   13|
|       Giá: 21,1 tỷ |    1|
|        Giá: 3,3 tỷ |   19|
|       Giá: 1,86 tỷ |    1|
|         Giá: 29 tỷ |   49|
|       Giá: 12,5 tỷ |   83|
|        Giá: 2,7 tỷ |   13|
|       Giá: 2,08 tỷ |    3|
|        Giá: 3,6 tỷ |   20|
|     Giá: 900 triệu |    1|
|       Giá: 17,5 tỷ |   57|
|       Giá: 28,9 tỷ |    2|
|        Giá: 3,8 tỷ |   23|
|       Giá: 7,95 tỷ |   16|
|        Giá: 7,7 tỷ |   45|
|       Giá: 5,15 tỷ |    8|
|        Giá: 1,8 tỷ |    2|
|       Giá: 1,75 tỷ |    2|
|       Giá: 12,9 tỷ |   24|
|         Giá: 17 tỷ |   55|
|         Giá: 39 tỷ |   35|
|       Giá: 4

In [20]:
#filter Giá  trị
dataset.filter(dataset["Giá trị"] == "Giá: Thỏa thuận").show()
dataset = dataset.filter(dataset["Giá trị"] != "Giá: Thỏa thuận")

+-----------------------+------+------------+--------------+---------+----------+--------+--------------------+---------------+---------+----------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|   Hướng|             Địa chỉ|        Giá trị|Chiều dài|Chiều rộng|
+-----------------------+------+------------+--------------+---------+----------+--------+--------------------+---------------+---------+----------+
|                      3|    5 |         26 |             0|     114 |      4x25|Hướng: _|Đường Thống Nhất,...|Giá: Thỏa thuận|        4|        25|
+-----------------------+------+------------+--------------+---------+----------+--------+--------------------+---------------+---------+----------+



In [21]:
dataset.count()

9362

*so we drop the row that has Giá: Thỏa thuận*

In [22]:
#format column
dataset = dataset.withColumn("Giá trị", regexp_replace("Giá trị", 'Giá:', ""))

In [23]:
dataset.select("Giá trị").show()

+---------+
|  Giá trị|
+---------+
| 12,8 tỷ |
|  6,6 tỷ |
|  5,8 tỷ |
| 25,5 tỷ |
| 21,1 tỷ |
| 3,84 tỷ |
|  8,8 tỷ |
|  7,2 tỷ |
|    1 tỷ |
|   15 tỷ |
| 3,62 tỷ |
| 3,77 tỷ |
| 8,08 tỷ |
|   25 tỷ |
|   20 tỷ |
| 2,08 tỷ |
| 12,6 tỷ |
| 1,75 tỷ |
|  1,2 tỷ |
|    7 tỷ |
+---------+
only showing top 20 rows



In [24]:
#Def remove alphabet
def remove_alphabet(x):
    a=list(x)
    b=[]
    for i in a:
        i=i.replace(',','.')
        for k in i:
            if not k.isnumeric() and not k=='.' and not k=='x':
                i=i.replace(k,'')
        b.append(i)
    return "".join(b)

# UDF is defined
remove_alphabetUDF = udf(remove_alphabet)


# Create a index columns for calculating
dataset = dataset.withColumn("Giá trị Index",dataset["Giá trị"])

# Delete alphabet in Giá trị
dataset = dataset.withColumn("Giá trị",regexp_replace("Giá trị", '/\xa0m2\xa0\xa0', ""))
dataset = dataset.withColumn("Giá trị",remove_alphabetUDF(dataset["Giá trị"]))

# Calculating
dataset = dataset.withColumn("Giá trị", when(dataset["Giá trị Index"].contains('/\xa0m2\xa0\xa0')
                                             ,column('Giá trị')*column("Diện Tích")).otherwise(dataset["Giá trị"]))

# Convert data from Triệu, Ngàn to Tỷ
dataset = dataset.withColumn("Giá trị", when(dataset["Giá trị Index"].contains('triệu')
                                             ,column('Giá trị')/1000).otherwise(dataset["Giá trị"]))
dataset = dataset.withColumn("Giá trị", when(dataset["Giá trị Index"].contains('ngàn')
                                             ,column('Giá trị')/1000000).otherwise(dataset["Giá trị"]))
dataset = dataset.withColumn("Giá trị",dataset["Giá trị"].cast('Float'))
dataset.show()

+-----------------------+------+------------+--------------+---------+----------+---------------+--------------------+-------+------------------+------------------+-------------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|          Hướng|             Địa chỉ|Giá trị|         Chiều dài|        Chiều rộng|Giá trị Index|
+-----------------------+------+------------+--------------+---------+----------+---------------+--------------------+-------+------------------+------------------+-------------+
|                      5|    3 |          5 |             1|      80 |    6x13,5|       Hướng: _|Đường Cách Mạng T...|   12.8|                 6|              13.5|     12,8 tỷ |
|                      8|    3 |          4 |             1|      70 |      5x15|       Hướng: _|Quốc Lộ 13, Phườn...|    6.6|                 5|                15|      6,6 tỷ |
|                      3|    4 |          3 |             1|      30 |     4x7,5|       Hướng: _|Đường Ph

In [25]:
#drop columns Giá Trị Index
dataset = dataset.drop("Giá trị Index")
dataset.show()

+-----------------------+------+------------+--------------+---------+----------+---------------+--------------------+-------+------------------+------------------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|          Hướng|             Địa chỉ|Giá trị|         Chiều dài|        Chiều rộng|
+-----------------------+------+------------+--------------+---------+----------+---------------+--------------------+-------+------------------+------------------+
|                      5|    3 |          5 |             1|      80 |    6x13,5|       Hướng: _|Đường Cách Mạng T...|   12.8|                 6|              13.5|
|                      8|    3 |          4 |             1|      70 |      5x15|       Hướng: _|Quốc Lộ 13, Phườn...|    6.6|                 5|                15|
|                      3|    4 |          3 |             1|      30 |     4x7,5|       Hướng: _|Đường Phan Đình P...|    5.8|                 4|               7.5|
|         

#### Địa chỉ Preprocessing

In [26]:
#Create Đường , Phường , Quận columns 
dataset = dataset.withColumn("Đường", split(dataset["Địa chỉ"], ',').getItem(0))
dataset = dataset.withColumn("Phường", split(dataset["Địa chỉ"], ',').getItem(1))
dataset = dataset.withColumn("Quận", split(dataset["Địa chỉ"], ',').getItem(2))
dataset.show()


+-----------------------+------+------------+--------------+---------+----------+---------------+--------------------+-------+------------------+------------------+--------------------+--------------------+------------------+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|          Hướng|             Địa chỉ|Giá trị|         Chiều dài|        Chiều rộng|               Đường|              Phường|              Quận|
+-----------------------+------+------------+--------------+---------+----------+---------------+--------------------+-------+------------------+------------------+--------------------+--------------------+------------------+
|                      5|    3 |          5 |             1|      80 |    6x13,5|       Hướng: _|Đường Cách Mạng T...|   12.8|                 6|              13.5|Đường Cách Mạng T...|            Phường 5|     Quận Tân Bình|
|                      8|    3 |          4 |             1|      70 |      5x15|       Hướng: _

In [27]:
# Null Check
dataset.select([count(when(column(c).isNull(), c)).alias(c) for c in dataset.columns]).show()

+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+---------+----------+-----+------+----+
|Độ rộng đường trước nhà|Số lầu|Số phòng ngủ|Chỗ đậu xe hơi|Diện Tích|Kích Thước|Hướng|Địa chỉ|Giá trị|Chiều dài|Chiều rộng|Đường|Phường|Quận|
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+---------+----------+-----+------+----+
|                      0|     0|           0|             0|        0|         0|    0|      0|      0|        0|         0|    0|     0|  40|
+-----------------------+------+------------+--------------+---------+----------+-----+-------+-------+---------+----------+-----+------+----+



In [28]:
#Drop null value from Columns Quận
dataset = dataset.na.drop()
dataset.count()

9322

In [29]:
#Check Quận Column
dataset.groupBy('Quận').count().show()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "D:\Python\venv\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "D:\Python\venv\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\vungo\AppData\Local\Programs\Python\Python39\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt

KeyboardInterrupt



*Need to remove Quận Hồ Chí Minh and Quận Thành Phố Thủ Đức*

In [None]:
dataset = dataset.filter(dataset["Quận"] != " Hồ Chí Minh")
data_final = dataset.filter(dataset["Quận"] != " Thành phố Thủ Đức")
data_final.count()

*So the final data has 8659 observations*

### Descriptive Statistic and Visualization

In [None]:
# Data to visual
visual1 = data_final.groupby("Quận").count()
visual1 = visual1.toPandas()

# Wedge properties
wp = { 'linewidth' : 1, 'edgecolor' : "black" }

# Creating autocpt arguments
def func(pct, allvalues):
    absolute = int(pct / 100.*np.sum(allvalues))
    return "{:.1f}%\n({:d} )".format(pct, absolute)

# Creating plot
fig, ax = plt.subplots(figsize =(30, 20))
wedges, texts, autotexts = ax.pie(visual1['count'],
                                  autopct = lambda pct: func(pct, visual1['count']),
                                  labels = visual1['Quận'],
                                  startangle = 90,
                                  wedgeprops = wp,
                                  textprops = dict(color ="Black"))

# Adding legend
ax.legend(wedges, visual1['Quận'],
          title ="Quận",
          loc ="center left",
          bbox_to_anchor =(1, 0, 0.5, 1))

plt.setp(autotexts, size = 8, weight ="bold")
ax.set_title("Quận pie chart")

# show plot
plt.show()

In [None]:
# Creat Data to visual
visual2 = data_final.toPandas()
# Create a list of Quận for visualize
visual2_list = []
for i in visual2["Quận"]:
    if i not in visual2_list:
        visual2_list.append(i)
    else:
        pass
visual2_list

In [None]:
# the loop that visualize distribution of values
for i in visual2_list:
    x = visual2[(visual2['Quận'] == i)]
    n_bins = 50
    legend = ['Distribution' + i]
    # Creating histogram
    fig, axs = plt.subplots(1, 1,
                            figsize =(10, 7),
                            tight_layout = True)


    # Remove axes splines
    for s in ['top', 'bottom', 'left', 'right']:
        axs.spines[s].set_visible(False)

    # Remove x, y ticks
    axs.xaxis.set_ticks_position('none')
    axs.yaxis.set_ticks_position('none')

    # Add padding between axes and labels
    axs.xaxis.set_tick_params(pad = 5)
    axs.yaxis.set_tick_params(pad = 10)

    # Add x, y gridlines
    axs.grid(b = True, color ='grey',
            linestyle ='-.', linewidth = 0.5,
            alpha = 0.6)

    # Creating histogram
    N, bins, patches = axs.hist(x["Giá trị"], bins = n_bins)

    # Setting color
    fracs = ((N**(1 / 5)) / N.max())
    norm = colors.Normalize(fracs.min(), fracs.max())

    for thisfrac, thispatch in zip(fracs, patches):
        color = plt.cm.viridis(norm(thisfrac))
        thispatch.set_facecolor(color)

    # Adding extra features
    plt.xlabel("Values of House")
    plt.ylabel("Number of House")
    plt.legend(legend)
    plt.title('Histogram Values of House in' + i)

    # Show plot
    plt.show()

In [None]:
# Quận Descriptive Statistic
data_final.groupBy('Quận')\
    .agg(
        mean('Giá trị').alias('mean'),
        stddev('Giá trị').alias('std'),
        count('Giá trị').alias('count'),
        min('Giá trị').alias('min'),
        max('Giá trị').alias('max')).show()

In [None]:
# Hướng Descriptive Statistic
data_final.groupBy('Hướng')\
    .agg(
        mean('Giá trị').alias('mean'),
        stddev('Giá trị').alias('std'),
        count('Giá trị').alias('count'),
        min('Giá trị').alias('min'),
        max('Giá trị').alias('max')).show()

In [None]:
visual3 = data_final.toPandas()
fig = plt.figure(figsize =(25, 20))
ax = fig.add_subplot(111)

# Creating axes instance
bp = ax.boxplot(visual3["Giá trị"], patch_artist = True,
                notch ='True', vert = 0)

# changing color and linewidth of
# whiskers
for whisker in bp['whiskers']:
    whisker.set(color ='#8B008B',
                linewidth = 2.5,
                linestyle =":")

# changing color and linewidth of
# caps
for cap in bp['caps']:
    cap.set(color ='#8B008B',
            linewidth = 3)

# changing color and linewidth of
# medians
for median in bp['medians']:
    median.set(color ='red',
               linewidth = 4)

# changing style of fliers
for flier in bp['fliers']:
    flier.set(marker ='D',
              color ='#e7298a',
              alpha = 1)

# x-axis labels
ax.set_yticklabels(['Giá trị'])

# Adding title
plt.title("Giá trị box plot")

# Removing top axes and right axes
# ticks
ax.get_xaxis().tick_bottom()
ax.get_yaxis().tick_left()

# show plot
plt.show()

### Data Transformation

In [None]:
from pyspark.ml import Pipeline
data_final = data_final.drop("Kích Thước")
data_final = data_final.drop("Hướng")
data_final = data_final.drop("Địa chỉ")
data_final = data_final.drop("Phường")
data_final = data_final.drop("Đường")
data_final.toPandas().to_excel("BigDataPredict.xlsx")
#get dummies
dummies = data_final.select("Quận").distinct().rdd.flatMap(lambda x: x).collect()
dummies_expr = [when(data_final["Quận"] == ty, 1).otherwise(0).alias(ty) for ty in dummies]
data_final = data_final.select("Độ rộng đường trước nhà","Số lầu","Số phòng ngủ","Chỗ đậu xe hơi","Diện tích","Giá trị","Chiều dài","Chiều rộng", *dummies_expr)
data_final.show()

In [None]:
#Identifying and assigning lists of variables
float_vars=['Độ rộng đường trước nhà', 'Số phòng ngủ', 'Chỗ đậu xe hơi','Số lầu','Diện Tích','Giá trị','Chiều dài','Chiều rộng']
#Converting variables
for column in float_vars:
    data_final = data_final.withColumn(column,data_final[column].cast(FloatType()))
data_final.dtypes

In [None]:
#Drop null value from Columns Quận
data_final = data_final.na.drop()
data_final.count()

In [None]:
# Defining string columns to pass on to the String Indexer (= categorical feature encoding)

data_final_columns = []

for col, dtype in data_final.dtypes:
    if dtype == 'string':
        data_final_columns.append(col)

indexers = [StringIndexer(inputCol=column, outputCol=column+'_index', handleInvalid='keep').fit(data_final) for column in data_final_columns]


pipeline = Pipeline(stages=indexers)
data_final = pipeline.fit(data_final).transform(data_final)

print(len(data_final.columns))

In [None]:
def get_dtype(df,colname):
    return [dtype for name, dtype in df.dtypes if name == colname][0]

num_cols_train = []
for col in data_final.columns:
    if get_dtype(data_final,col) != 'string':
        num_cols_train.append(str(col))

data_final = data_final.select(num_cols_train)

print(len(data_final.columns))

In [None]:
vectorAssembler = VectorAssembler(inputCols = data_final.drop("Giá trị").columns, outputCol = 'features').setHandleInvalid("keep")

vector = vectorAssembler.transform(data_final)
vector.show(5)

In [None]:
# Train-test split
splits = vector.randomSplit([0.8, 0.2],seed=10)
train = splits[0]
val = splits[1]

In [None]:
# Simple baseline (linreg)

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='Giá trị', maxIter=10,
                      regParam=0.8, elasticNetParam=0.1) # It is always a good idea to play with hyperparameters.
lr_model = lr.fit(train)

trainingSummary = lr_model.summary

lr_predictions = lr_model.transform(val)
lr_predictions.select("prediction","Giá trị","features").show(10)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Giá trị",metricName="r2")
print("R Squared (R2) on data = %g" % lr_evaluator.evaluate(lr_predictions))
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

In [None]:
# A more complex model with RF

from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol = 'features', labelCol='Giá trị',
                           maxDepth=20,
                           minInstancesPerNode=2,
                           bootstrap=True
                          )
rf_model = rf.fit(train)

rf_predictions = rf_model.transform(val)
rf_predictions.select("prediction","Giá trị","features").show(10)

from pyspark.ml.evaluation import RegressionEvaluator
rf_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Giá trị",metricName="r2")
print("R Squared (R2) on data = %g" % rf_evaluator.evaluate(rf_predictions))

rf_evaluator = RegressionEvaluator(labelCol="Giá trị", predictionCol="prediction", metricName="rmse")
rmse = rf_evaluator.evaluate(rf_predictions)
print("Root Mean Squared Error (RMSE) on data = %g" % rmse)

In [None]:
# A more complex model with Gradient-boosted trees (GBTs)

from pyspark.ml.regression import GBTRegressor

GBT = GBTRegressor(featuresCol="features", labelCol='Giá trị',maxIter=10)
GBT_model = GBT.fit(train)

GBT_predictions = GBT_model.transform(val)
GBT_predictions.select("prediction","Giá trị","features").show(10)

from pyspark.ml.evaluation import RegressionEvaluator
GBT_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Giá trị",metricName="r2")
print("R Squared (R2) on data = %g" % GBT_evaluator.evaluate(GBT_predictions))

GBT_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Giá trị",metricName="rmse")
rmse = GBT_evaluator.evaluate(GBT_predictions)
print("Root Mean Squared Error (RMSE) on data = %g" % rmse)

### Remove ouliers data 

In [None]:
# calculate summary statistics
data_final_std = data_final.select(mean("Giá trị").alias('mean'),stddev('Giá trị').alias('std')).collect()
data_std = data_final_std[0]['std']
data_mean = data_final_std[0]['mean']
# identify outliers
cut_off = data_std * 2
lower, upper = data_mean - cut_off, data_mean + cut_off
# identify outliers
data_final_outlier = data_final.filter((data_final["Giá trị"] < lower) | (data_final["Giá trị"] > upper))
print('Identified outliers: %d' % data_final_outlier.count())
# remove outliers
data_final = data_final.filter((data_final["Giá trị"] >= lower) & (data_final["Giá trị"] <= upper))
print('Non-outlier observations: %d' % data_final.count())

In [None]:
vectorAssembler = VectorAssembler(inputCols = data_final.drop("Giá trị").columns, outputCol = 'features').setHandleInvalid("keep")

vector = vectorAssembler.transform(data_final)
vector.show(5)

In [None]:
# Train-test split
splits = vector.randomSplit([0.8, 0.2],seed=10)
train = splits[0]
val = splits[1]

In [None]:
# Simple baseline (linreg)

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='Giá trị', maxIter=10,
                      regParam=0.8, elasticNetParam=0.1) # It is always a good idea to play with hyperparameters.
lr_model = lr.fit(train)

trainingSummary = lr_model.summary

lr_predictions = lr_model.transform(val)
lr_predictions.select("prediction","Giá trị","features").show(10)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Giá trị",metricName="r2")
print("R Squared (R2) on data = %g" % lr_evaluator.evaluate(lr_predictions))
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

In [None]:
# A more complex model with RF

from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol = 'features', labelCol='Giá trị',
                           maxDepth=20,
                           minInstancesPerNode=2,
                           bootstrap=True
                          )
rf_model = rf.fit(train)

rf_predictions = rf_model.transform(val)
rf_predictions.select("prediction","Giá trị","features").show(10)

from pyspark.ml.evaluation import RegressionEvaluator
rf_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Giá trị",metricName="r2")
print("R Squared (R2) on data = %g" % rf_evaluator.evaluate(rf_predictions))

rf_evaluator = RegressionEvaluator(labelCol="Giá trị", predictionCol="prediction", metricName="rmse")
rmse = rf_evaluator.evaluate(rf_predictions)
print("Root Mean Squared Error (RMSE) on data = %g" % rmse)

In [None]:
# A more complex model with Gradient-boosted trees (GBTs)

from pyspark.ml.regression import GBTRegressor

GBT = GBTRegressor(featuresCol="features", labelCol='Giá trị',maxIter=10)
GBT_model = GBT.fit(train)

GBT_predictions = GBT_model.transform(val)
GBT_predictions.select("prediction","Giá trị","features").show(10)

from pyspark.ml.evaluation import RegressionEvaluator
GBT_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Giá trị",metricName="r2")
print("R Squared (R2) on data = %g" % GBT_evaluator.evaluate(GBT_predictions))

GBT_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Giá trị",metricName="rmse")
rmse = GBT_evaluator.evaluate(GBT_predictions)
print("Root Mean Squared Error (RMSE) on data = %g" % rmse)