In [1]:
import pyspark.pandas as ps
from pyspark.sql import SparkSession
import pandas as pd



In [2]:
spark = SparkSession.builder.config("spark.executor.memory", "10g").config("spark.driver.memory", "2G").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/11/30 17:07:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/30 17:07:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/11/30 17:07:07 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
data = spark.read.csv("./부동산실거래가1998-2023.csv", header=True)

                                                                                

In [4]:
data.show()

+---------------------------------+-----------+------------+--------+--------------+---+--------+
|                           시군구|     단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|
+---------------------------------+-----------+------------+--------+--------------+---+--------+
|강원특별자치도 양구군 양구읍 상리|       경림|        59.4|  200509|          5000| 11|    1998|
|           경기도 의정부시 민락동| 한라비발디|       84.99|  200510|         12750| 19|    2003|
|           경기도 의정부시 신곡동|      신일1|      59.878|  200510|          5750|  3|    1997|
|           경기도 의정부시 신곡동|       풍림|       49.83|  200510|          5500|  8|    1998|
|           경기도 의정부시 용현동|용현현대1차|      129.73|  200511|         12200| 11|    1992|
|           경기도 의정부시 용현동|용현현대1차|      129.73|  200511|         11500| 17|    1992|
|     강원특별자치도 강릉시 견소동|   송정한신|       43.38|  200601|          4200|  1|    1997|
|       강원특별자치도 강릉시 교동|  교동1주공|       59.89|  200601|          7500|  2|    1999|
|       강원특별자치도 강릉시 교동|  교동1주공|       59.89|  200601|          8500

In [5]:
data.printSchema()

root
 |-- 시군구: string (nullable = true)
 |-- 단지명: string (nullable = true)
 |-- 전용면적(㎡): string (nullable = true)
 |-- 계약년월: string (nullable = true)
 |-- 거래금액(만원): string (nullable = true)
 |-- 층: string (nullable = true)
 |-- 건축년도: string (nullable = true)



# 전체 데이터의 양 측정
* count()= 전체 행
* len(데이터프레임.columns) = 컬럼 수 

In [6]:
import pyspark.sql.functions as f

In [7]:
print(data.count(), len(data.columns))



9591228 7


                                                                                

# 컬럼의 데이터 타입 바꾸기 (거래금액, 전용면적) 숫자로 바꾸기

In [8]:
data = data.withColumn('거래금액(만원)', f.col('거래금액(만원)').cast('int'))

In [9]:
data.printSchema()

root
 |-- 시군구: string (nullable = true)
 |-- 단지명: string (nullable = true)
 |-- 전용면적(㎡): string (nullable = true)
 |-- 계약년월: string (nullable = true)
 |-- 거래금액(만원): integer (nullable = true)
 |-- 층: string (nullable = true)
 |-- 건축년도: string (nullable = true)



In [10]:
data = data.withColumn('전용면적(㎡)', f.col('전용면적(㎡)').cast('float'))

In [11]:
data.printSchema()

root
 |-- 시군구: string (nullable = true)
 |-- 단지명: string (nullable = true)
 |-- 전용면적(㎡): float (nullable = true)
 |-- 계약년월: string (nullable = true)
 |-- 거래금액(만원): integer (nullable = true)
 |-- 층: string (nullable = true)
 |-- 건축년도: string (nullable = true)



In [12]:
data.summary().show()

[Stage 7:>                                                          (0 + 1) / 1]

+-------+-----------------------------------+---------------+------------------+------------------+------------------+-----------------+------------------+
|summary|                             시군구|         단지명|      전용면적(㎡)|          계약년월|    거래금액(만원)|               층|          건축년도|
+-------+-----------------------------------+---------------+------------------+------------------+------------------+-----------------+------------------+
|  count|                            9591228|        9591228|           9591228|           9591228|           9591228|          9591228|           9591228|
|   mean|                               null|           null| 74.46021320549711|201434.57845356193|26051.670125660658|8.759394521744245|2000.0007455771045|
| stddev|                               null|           null|26.196316055716533| 503.5909999681341|25687.646317538885|6.020988738465453| 8.664532884432418|
|    min|강원특별자치도 강릉시 강동면 안인리|         (1-10)|              9.26|            200509|       

                                                                                

In [13]:
# 결측값 검사
data.select(*(f.sum(f.col(c).isNull().cast('int')).alias(c) for c in data.columns)).show()



+------+------+------------+--------+--------------+---+--------+
|시군구|단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|
+------+------+------------+--------+--------------+---+--------+
|     0|     0|           0|       0|             0|  0|       0|
+------+------+------------+--------+--------------+---+--------+



                                                                                

# 데이터 프레임에서 이상값 찾기

In [14]:
bounds = data.stat.approxQuantile("거래금액(만원)", [0.25, 0.75], 0)

                                                                                

In [15]:
bounds

[11000.0, 32000.0]

In [16]:
# 이상치 최대값, 최소값
lower_range = bounds[0] - (1.5 * (bounds[1] - bounds[0]))
upper_range = bounds[1] + (1.5 * (bounds[1] - bounds[0]))


In [17]:
print(lower_range)
print(upper_range)

-20500.0
63500.0


In [18]:
data.filter((data['거래금액(만원)'] < lower_range) | (data['거래금액(만원)'] > upper_range)).count()

                                                                                

578946

In [19]:
data.filter((data['거래금액(만원)'] < lower_range) | (data['거래금액(만원)'] > upper_range)).show()

+--------------------------+---------------+------------+--------+--------------+---+--------+
|                    시군구|         단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|
+--------------------------+---------------+------------+--------+--------------+---+--------+
|경기도 고양일산동구 마두동| 강촌마을(영남)|      165.99|  200601|         78000| 18|    1992|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         71300| 10|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         73000|  5|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         67000|  7|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      164.37|  200601|         87500| 17|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      164.37|  200601|        102750|  5|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|      173.58|  200601|         78500| 17|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|      173.58|  200601|         73500| 12|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|     134.803|  200601|         67500| 10|    1993|
|경기도 고양일산동구 마두동| 백마마을(극동)|       132.8|  200601|

# 데이터프레임에서 이상값 찾는 함수

In [20]:
data.select('전용면적(㎡)').dtypes

[('전용면적(㎡)', 'float')]

In [21]:
data.select('전용면적(㎡)').dtypes[0][1]

'float'

In [22]:
def out_lier(data):
    import pyspark.sql.functions as f
    for col_name in data.columns:
        if data.select(col_name).dtypes[0][1] in ('int', 'integer', 'float', 'float32', 'float64', 'double'):
            bounds = data.stat.approxQuantile(col_name, [0.25, 0.75], 0)
            # 이상치 범위 계산
            lower_range = bounds[0] - (1.5 * (bounds[1] - bounds[0]))
            upper_range = bounds[1] + (1.5 * (bounds[1] - bounds[0]))
            print(col_name, lower_range, upper_range)
            # 이상치 필터링
            outliers = data.filter((data[col_name] < lower_range) | (data[col_name] > upper_range))
            print("number of outliers: ", outliers.count())
            outliers.show()

In [23]:
out_lier(data)

                                                                                

전용면적(㎡) 21.3750057220459 123.09499168395996


                                                                                

number of outliers:  639736
+-----------------------------------+-------------------+------------+--------+--------------+---+--------+
|                             시군구|             단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|
+-----------------------------------+-------------------+------------+--------+--------------+---+--------+
|             경기도 의정부시 용현동|        용현현대1차|      129.73|  200511|         12200| 11|    1992|
|             경기도 의정부시 용현동|        용현현대1차|      129.73|  200511|         11500| 17|    1992|
|       강원특별자치도 강릉시 노암동|           노암한라|     129.801|  200601|         11300|  6|    1994|
|       강원특별자치도 강릉시 포남동|              대인4|       19.99|  200601|          1000|  5|    1995|
|       강원특별자치도 강릉시 포남동|              대인4|       19.99|  200601|          1850| 10|    1995|
|       강원특별자치도 속초시 조양동|              부영3|     166.535|  200601|         15650| 11|    1996|
|       강원특별자치도 속초시 조양동|              부영3|     133.835|  200601|         12100|  9|    1996|
|강원특별자치도 양양군 양양읍 구교리|    

                                                                                

거래금액(만원) -20500.0 63500.0




number of outliers:  578946
+--------------------------+---------------+------------+--------+--------------+---+--------+
|                    시군구|         단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|
+--------------------------+---------------+------------+--------+--------------+---+--------+
|경기도 고양일산동구 마두동| 강촌마을(영남)|      165.99|  200601|         78000| 18|    1992|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         71300| 10|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         73000|  5|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         67000|  7|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      164.37|  200601|         87500| 17|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      164.37|  200601|        102750|  5|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|      173.58|  200601|         78500| 17|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|      173.58|  200601|         73500| 12|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|     134.803|  200601|         67500| 10|    1993|
|경기도 고양일산동구 마두동| 백마마

                                                                                

# 레이블 인코딩하기 StringIndexer
* 데이터 프레임의 데이터 타입을 검출한 후 string이면 StringIndexer를 사용해 숫자로 변환

In [24]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="단지명", outputCol="apart_name_Index")
indexed = indexer.fit(data).transform(data)

                                                                                

In [25]:
indexed.select("apart_name_Index").dtypes

[('apart_name_Index', 'double')]

In [26]:
indexed.show()

23/11/30 17:14:45 WARN DAGScheduler: Broadcasting large task binary with size 1456.5 KiB
+---------------------------------+-----------+------------+--------+--------------+---+--------+----------------+
|                           시군구|     단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|apart_name_Index|
+---------------------------------+-----------+------------+--------+--------------+---+--------+----------------+
|강원특별자치도 양구군 양구읍 상리|       경림|        59.4|  200509|          5000| 11|    1998|          2139.0|
|           경기도 의정부시 민락동| 한라비발디|       84.99|  200510|         12750| 19|    2003|            25.0|
|           경기도 의정부시 신곡동|      신일1|      59.878|  200510|          5750|  3|    1997|          6011.0|
|           경기도 의정부시 신곡동|       풍림|       49.83|  200510|          5500|  8|    1998|            36.0|
|           경기도 의정부시 용현동|용현현대1차|      129.73|  200511|         12200| 11|    1992|          2206.0|
|           경기도 의정부시 용현동|용현현대1차|      129.73|  200511|         11500| 17|    1992|      

# 레이블 인코딩과 one-hot 인코딩 한 번에 하기 
* 데이터 프레임에서 데이터타입이 string인 컬럼 찾기
* StringIndexer 를 이용해 레이블 인코딩(문자 -> 숫자)
* 숫자로 변경된 컬럼을 OneHotEncoder를 이용해 OneHotEncoding 

In [27]:
def str2index_onehot(data):
    from pyspark.ml.feature import StringIndexer, OneHotEncoder
    cols = []
    for col_name in data.columns:
        if data.select(col_name).dtypes[0][1] == 'string':
            indexer = StringIndexer(inputCols=[col_name], outputCols=[f"{col_name}_index"])
            data = indexer.fit(data).transform(data)
            # data.show()
            # print(data.select(f"{col_name}_index").dtypes)
            encoder = OneHotEncoder(inputCols=[f"{col_name}_index"], outputCols=[f"{col_name}_vec"])
            data = encoder.fit(data).transform(data)
            # data.show()
            cols.append(col_name)
            cols.append(f"{col_name}_index")
            cols.append(f"{col_name}_vec")
    return data, cols

In [28]:
data2, cols = str2index_onehot(data)

                                                                                

In [29]:
data2.show(vertical=True)

23/11/30 17:15:14 WARN DAGScheduler: Broadcasting large task binary with size 1762.5 KiB
-RECORD 0-------------------------------------------
 시군구         | 강원특별자치도 양구군 양구읍 상리 
 단지명         | 경림                              
 전용면적(㎡)   | 59.4                              
 계약년월       | 200509                            
 거래금액(만원) | 5000                              
 층             | 11                                
 건축년도       | 1998                              
 시군구_index   | 1969.0                            
 시군구_vec     | (3653,[1969],[1.0])               
 단지명_index   | 2139.0                            
 단지명_vec     | (34151,[2139],[1.0])              
 계약년월_index | 206.0                             
 계약년월_vec   | (206,[],[])                       
 층_index       | 10.0                              
 층_vec         | (87,[10],[1.0])                   
 건축년도_index | 1.0                               
 건축년도_vec   | (60,[1],[1.0])                    
-RECORD 1---------------------

In [30]:
cols

['시군구',
 '시군구_index',
 '시군구_vec',
 '단지명',
 '단지명_index',
 '단지명_vec',
 '계약년월',
 '계약년월_index',
 '계약년월_vec',
 '층',
 '층_index',
 '층_vec',
 '건축년도',
 '건축년도_index',
 '건축년도_vec']

In [31]:
data2.columns

['시군구',
 '단지명',
 '전용면적(㎡)',
 '계약년월',
 '거래금액(만원)',
 '층',
 '건축년도',
 '시군구_index',
 '시군구_vec',
 '단지명_index',
 '단지명_vec',
 '계약년월_index',
 '계약년월_vec',
 '층_index',
 '층_vec',
 '건축년도_index',
 '건축년도_vec']

In [32]:
data3 = data2.drop('시군구', '시군구_index', '단지명', '단지명_index', '계약년월', '계약년월_index', '층', '층_index', '건축년도', '건축년도_vec')
data3.show()

23/11/30 17:15:15 WARN DAGScheduler: Broadcasting large task binary with size 1756.5 KiB
+------------+--------------+-------------------+--------------------+-----------------+---------------+--------------+
|전용면적(㎡)|거래금액(만원)|         시군구_vec|          단지명_vec|     계약년월_vec|         층_vec|건축년도_index|
+------------+--------------+-------------------+--------------------+-----------------+---------------+--------------+
|        59.4|          5000|(3653,[1969],[1.0])|(34151,[2139],[1.0])|      (206,[],[])|(87,[10],[1.0])|           1.0|
|       84.99|         12750|  (3653,[95],[1.0])|  (34151,[25],[1.0])|(206,[204],[1.0])|(87,[18],[1.0])|           9.0|
|      59.878|          5750|  (3653,[17],[1.0])|(34151,[6011],[1.0])|(206,[204],[1.0])| (87,[2],[1.0])|           0.0|
|       49.83|          5500|  (3653,[17],[1.0])|  (34151,[36],[1.0])|(206,[204],[1.0])| (87,[7],[1.0])|           1.0|
|      129.73|         12200| (3653,[187],[1.0])|(34151,[2206],[1.0])|(206,[205],[1.0])|(87,[10],

In [33]:
data3.printSchema()

root
 |-- 전용면적(㎡): float (nullable = true)
 |-- 거래금액(만원): integer (nullable = true)
 |-- 시군구_vec: vector (nullable = true)
 |-- 단지명_vec: vector (nullable = true)
 |-- 계약년월_vec: vector (nullable = true)
 |-- 층_vec: vector (nullable = true)
 |-- 건축년도_index: double (nullable = false)



# 독립변수를 1개의 벡터로 변환하고 독립변수와 종속변수만 남기기

In [34]:
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols=['전용면적(㎡)', '시군구_vec', '단지명_vec', '계약년월_vec', '층_vec', '건축년도_index'], outputCol='Features')
data3 = va.transform(data3)
data3.show()

23/11/30 17:15:16 WARN DAGScheduler: Broadcasting large task binary with size 1778.1 KiB
+------------+--------------+-------------------+--------------------+-----------------+---------------+--------------+--------------------+
|전용면적(㎡)|거래금액(만원)|         시군구_vec|          단지명_vec|     계약년월_vec|         층_vec|건축년도_index|            Features|
+------------+--------------+-------------------+--------------------+-----------------+---------------+--------------+--------------------+
|        59.4|          5000|(3653,[1969],[1.0])|(34151,[2139],[1.0])|      (206,[],[])|(87,[10],[1.0])|           1.0|(38099,[0,1970,57...|
|       84.99|         12750|  (3653,[95],[1.0])|  (34151,[25],[1.0])|(206,[204],[1.0])|(87,[18],[1.0])|           9.0|(38099,[0,96,3679...|
|      59.878|          5750|  (3653,[17],[1.0])|(34151,[6011],[1.0])|(206,[204],[1.0])| (87,[2],[1.0])|           0.0|(38099,[0,18,9665...|
|       49.83|          5500|  (3653,[17],[1.0])|  (34151,[36],[1.0])|(206,[204],[1.0])| (8

In [35]:
data3 = data3.withColumnRenamed('거래금액(만원)', 'price')
data3.show()

23/11/30 17:15:16 WARN DAGScheduler: Broadcasting large task binary with size 1778.1 KiB
+------------+-----+-------------------+--------------------+-----------------+---------------+--------------+--------------------+
|전용면적(㎡)|price|         시군구_vec|          단지명_vec|     계약년월_vec|         층_vec|건축년도_index|            Features|
+------------+-----+-------------------+--------------------+-----------------+---------------+--------------+--------------------+
|        59.4| 5000|(3653,[1969],[1.0])|(34151,[2139],[1.0])|      (206,[],[])|(87,[10],[1.0])|           1.0|(38099,[0,1970,57...|
|       84.99|12750|  (3653,[95],[1.0])|  (34151,[25],[1.0])|(206,[204],[1.0])|(87,[18],[1.0])|           9.0|(38099,[0,96,3679...|
|      59.878| 5750|  (3653,[17],[1.0])|(34151,[6011],[1.0])|(206,[204],[1.0])| (87,[2],[1.0])|           0.0|(38099,[0,18,9665...|
|       49.83| 5500|  (3653,[17],[1.0])|  (34151,[36],[1.0])|(206,[204],[1.0])| (87,[7],[1.0])|           1.0|(38099,[0,18,3690...|
|      

In [36]:
data3.columns

['전용면적(㎡)',
 'price',
 '시군구_vec',
 '단지명_vec',
 '계약년월_vec',
 '층_vec',
 '건축년도_index',
 'Features']

In [37]:
data3 = data3.drop('전용면적(㎡)', '시군구_vec',  '단지명_vec',  '계약년월_vec',  '층_vec',  '건축년도_index')
data3.show()

23/11/30 17:15:16 WARN DAGScheduler: Broadcasting large task binary with size 1765.3 KiB
+-----+--------------------+
|price|            Features|
+-----+--------------------+
| 5000|(38099,[0,1970,57...|
|12750|(38099,[0,96,3679...|
| 5750|(38099,[0,18,9665...|
| 5500|(38099,[0,18,3690...|
|12200|(38099,[0,188,586...|
|11500|(38099,[0,188,586...|
| 4200|(38099,[0,1198,65...|
| 7500|(38099,[0,364,517...|
| 8500|(38099,[0,364,517...|
|12500|(38099,[0,364,517...|
| 5998|(38099,[0,364,517...|
| 7379|(38099,[0,364,517...|
| 5580|(38099,[0,364,517...|
| 7213|(38099,[0,364,517...|
| 7379|(38099,[0,364,517...|
| 7379|(38099,[0,364,517...|
| 7213|(38099,[0,364,517...|
| 7379|(38099,[0,364,517...|
| 7379|(38099,[0,364,517...|
| 8600|(38099,[0,364,517...|
+-----+--------------------+
only showing top 20 rows



# 홀드아웃

In [38]:
(train_data, test_data) = data3.randomSplit([0.6, 0.4], seed=7)

# 회귀 분석으로 분석하기

In [39]:
from pyspark.ml.regression import LinearRegression

In [40]:
lr = LinearRegression(featuresCol='Features', labelCol='price')
lr = lr.fit(train_data)
# coefficient
print(lr.coefficients)
print(lr.intercept)
# prediction
pred = lr.evaluate(test_data)

23/11/30 17:15:18 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB




23/11/30 17:15:57 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:15:59 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB




23/11/30 17:16:39 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:16:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/11/30 17:16:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
23/11/30 17:16:40 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:40 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:41 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB




23/11/30 17:16:41 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:16:42 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB




23/11/30 17:16:43 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:16:43 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB




23/11/30 17:16:44 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:16:44 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB




23/11/30 17:16:45 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:16:45 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:46 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:46 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:47 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:47 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:48 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:48 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:49 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:49 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:50 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:50 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:51 WARN DAGScheduler: Broadcasting larg



23/11/30 17:16:54 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:16:55 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:55 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:56 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:56 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:57 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:57 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:57 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:58 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:58 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:59 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:16:59 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:00 WARN DAGScheduler: Broadcasting larg



23/11/30 17:17:03 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:17:04 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:04 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:05 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB




23/11/30 17:17:05 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:17:06 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:06 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:06 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:07 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:07 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:08 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:08 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:09 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:09 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB




23/11/30 17:17:10 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:17:10 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB




23/11/30 17:17:11 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:17:11 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:11 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:12 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:12 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:13 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:13 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:14 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:14 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:14 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:15 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:15 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:16 WARN DAGScheduler: Broadcasting larg



23/11/30 17:17:18 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:17:19 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:19 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:20 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:20 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:20 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:21 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:21 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:22 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:22 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:23 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:23 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:23 WARN DAGScheduler: Broadcasting larg



23/11/30 17:17:27 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:17:27 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB




23/11/30 17:17:28 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:17:28 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:29 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:29 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:30 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:30 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:31 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:31 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:31 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:32 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:32 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:33 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:33 WARN DAGScheduler: Broadcasting larg



23/11/30 17:17:42 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:17:42 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:42 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:43 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:43 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:44 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:44 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:44 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:45 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:45 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB




23/11/30 17:17:46 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:17:46 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:47 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:47 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:48 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:48 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:49 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:49 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:49 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:50 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:50 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:51 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:17:51 WARN DAGScheduler: Broadcasting larg



23/11/30 17:17:59 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:17:59 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:18:00 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:18:00 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:18:01 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:18:01 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:18:01 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
23/11/30 17:18:02 WARN DAGScheduler: Broadcasting large task binary with size 6.6 MiB




23/11/30 17:18:39 WARN DAGScheduler: Broadcasting large task binary with size 6.6 MiB


                                                                                

[309.2985122283395,11088.460734433973,-1804.5570905329237,15591.351672289089,8268.774082187869,2631.1790538646005,-3814.6356190647753,4498.360616884011,-2688.9570039491873,-8643.59476022515,-2292.505294255856,13310.947764863766,6365.47882321631,-12452.739718102135,64.62070502688047,-3111.506701574433,10150.04023717379,-25119.442549866355,-2705.567073067001,-1196.727120912743,20527.635721941526,529.5514526917975,3773.6202702631936,-2206.0932659435584,13815.724674458801,12999.390314536571,12560.537050735358,7867.73343328721,-11703.129849528083,-9248.453798018978,11426.4565243465,13377.860904346835,7929.713878667405,-7169.4047522920155,-1742.3070770434717,-1433.1511344970668,-6969.45289711458,2238.1179630442584,21313.308755003072,7675.832314966655,-7600.233957847246,3570.72923782546,-11191.65560273004,443.6299504602175,13060.540591637216,-7698.964635705914,5575.2546347122,-6018.194947587973,-4083.625050785616,-16993.242110434225,-1677.1083442904214,4492.985581615729,-7553.241190589084,-32



23/11/30 17:19:16 WARN DAGScheduler: Broadcasting large task binary with size 6.6 MiB


                                                                                

In [41]:
print("MAE: ", pred.meanAbsoluteError)
print("MSE: ", pred.meanSquaredError)
print("R2square: ", pred.r2)

MAE:  5298.7344425290685
MSE:  92944565.91367579
R2square:  0.8588386583304177


In [42]:
print(pred.predictions.show())

23/11/30 17:19:17 WARN DAGScheduler: Broadcasting large task binary with size 6.5 MiB


[Stage 245:>                                                        (0 + 1) / 1]

+-----+--------------------+-------------------+
|price|            Features|         prediction|
+-----+--------------------+-------------------+
|  350|(38099,[0,2719,60...|-3610.8934191300277|
|  350|(38099,[0,2719,60...| -5276.826423135452|
|  400|(38099,[0,2719,60...| -5662.488405931952|
|  400|(38099,[0,2970,18...|-3392.2831537452694|
|  400|(38099,[0,2970,18...| -3505.456905453876|
|  400|(38099,[0,3529,32...|-4978.7163549225115|
|  475|(38099,[0,2584,15...| -3846.458122690952|
|  500|(38099,[0,81,2290...|   82408.8019684845|
|  500|(38099,[0,350,482...| 11910.580199185115|
|  500|(38099,[0,350,482...| 11891.534421230068|
|  500|(38099,[0,350,482...| 12293.792282309887|
|  500|(38099,[0,1983,77...|  6886.970216112716|
|  500|(38099,[0,1983,77...| 10726.107199827358|
|  500|(38099,[0,2382,36...| 3218.1449050033493|
|  500|(38099,[0,2936,15...|  -2592.43032302142|
|  500|(38099,[0,2936,15...| -4060.667343315611|
|  525|(38099,[0,2936,15...| -4102.097538564577|
|  550|(38099,[0,106

                                                                                

In [43]:
from pyspark.ml.regression import RandomForestRegressor

In [44]:
rfr = RandomForestRegressor(featuresCol='Features', labelCol='price')
rfr = rfr.fit(train_data)
pred = rfr.transform(test_data)
pred

23/11/30 17:19:32 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:19:47 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB


                                                                                

23/11/30 17:20:20 ERROR Instrumentation: java.lang.IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 32) to be at least as large as the number of values in each categorical feature, but categorical feature 38098 has 61 values. Consider removing this and other categorical features with a large number of values, or add more training examples.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:151)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:274)
	at org.apache.spark.ml.regression.RandomForestRegressor.$anonfun$train$1(RandomForestRegressor.scala:150)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForest

IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 32) to be at least as large as the number of values in each categorical feature, but categorical feature 38098 has 61 values. Consider removing this and other categorical features with a large number of values, or add more training examples.

In [None]:
result = pred.predictions.toPandas()