In [1]:
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import pandas as pd



In [2]:
spark = SparkSession.builder.config("spark.executor.memory", "10g").config("spark.driver.memory", "2G").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/11/30 14:43:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/30 14:43:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
data = spark.read.csv('./data/부동산실거래가1998-2023.csv', header=True)

In [4]:
data.show()

+---------------------------------+-----------+------------+--------+--------------+---+--------+
|                           시군구|     단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|
+---------------------------------+-----------+------------+--------+--------------+---+--------+
|강원특별자치도 양구군 양구읍 상리|       경림|        59.4|  200509|          5000| 11|    1998|
|           경기도 의정부시 민락동| 한라비발디|       84.99|  200510|         12750| 19|    2003|
|           경기도 의정부시 신곡동|      신일1|      59.878|  200510|          5750|  3|    1997|
|           경기도 의정부시 신곡동|       풍림|       49.83|  200510|          5500|  8|    1998|
|           경기도 의정부시 용현동|용현현대1차|      129.73|  200511|         12200| 11|    1992|
|           경기도 의정부시 용현동|용현현대1차|      129.73|  200511|         11500| 17|    1992|
|     강원특별자치도 강릉시 견소동|   송정한신|       43.38|  200601|          4200|  1|    1997|
|       강원특별자치도 강릉시 교동|  교동1주공|       59.89|  200601|          7500|  2|    1999|
|       강원특별자치도 강릉시 교동|  교동1주공|       59.89|  200601|          8500

In [5]:
data.printSchema()

root
 |-- 시군구: string (nullable = true)
 |-- 단지명: string (nullable = true)
 |-- 전용면적(㎡): string (nullable = true)
 |-- 계약년월: string (nullable = true)
 |-- 거래금액(만원): string (nullable = true)
 |-- 층: string (nullable = true)
 |-- 건축년도: string (nullable = true)



# 전체 데이터의 양 측정
* count() = 전체 행
* len(데이터프레임.columns) = 컬럼 수

In [6]:
import pyspark.sql.functions as f

In [7]:
print(data.count(), len(data.columns))



9591228 7


                                                                                

# 컬럼의 데이터 타입 바꾸기(거래금액, 전용면적) 숫자로 바꾸기

In [8]:
data = data.withColumn('거래금액(만원)', f.col('거래금액(만원)').cast('int'))

In [9]:
data.printSchema()

root
 |-- 시군구: string (nullable = true)
 |-- 단지명: string (nullable = true)
 |-- 전용면적(㎡): string (nullable = true)
 |-- 계약년월: string (nullable = true)
 |-- 거래금액(만원): integer (nullable = true)
 |-- 층: string (nullable = true)
 |-- 건축년도: string (nullable = true)



In [10]:
data = data.withColumn('전용면적(㎡)', f.col('전용면적(㎡)').cast('float'))

In [11]:
data.printSchema()

root
 |-- 시군구: string (nullable = true)
 |-- 단지명: string (nullable = true)
 |-- 전용면적(㎡): float (nullable = true)
 |-- 계약년월: string (nullable = true)
 |-- 거래금액(만원): integer (nullable = true)
 |-- 층: string (nullable = true)
 |-- 건축년도: string (nullable = true)



In [12]:
data.summary().show()

ERROR:root:KeyboardInterrupt while sending command.                 (0 + 4) / 6]
Traceback (most recent call last):
  File "/home/himedia/.local/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/himedia/.local/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/himedia/.conda/envs/spark/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [13]:
# 결측값 검사
data.select(*(f.sum(f.col(c).isNull().cast('int')).alias(c) for c in data.columns)).show()
# 결측값 없어서 0 나옴



+------+------+------------+--------+--------------+---+--------+
|시군구|단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|
+------+------+------------+--------+--------------+---+--------+
|     0|     0|           0|       0|             0|  0|       0|
+------+------+------------+--------+--------------+---+--------+





# 데이터 프레임에서 이상값 찾기

In [14]:
bounds = data.stat.approxQuantile('거래금액(만원)', [0.25, 0.75], 0)



In [15]:
bounds

[11000.0, 32000.0]

In [16]:
# 이상치 최대값, 최소값
lower_range = bounds[0] - (1.5 * (bounds[1] - bounds[0]))
upper_range = bounds[1] + (1.5 * (bounds[1] - bounds[0]))

In [17]:
print(lower_range)
print(upper_range)

-20500.0
63500.0


In [18]:
data.filter((data['거래금액(만원)'] < lower_range) | (data['거래금액(만원)'] > upper_range)).count()

                                                                                

578946



In [19]:
data.filter((data['거래금액(만원)'] < lower_range) | (data['거래금액(만원)'] > upper_range)).show()

+--------------------------+---------------+------------+--------+--------------+---+--------+
|                    시군구|         단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|
+--------------------------+---------------+------------+--------+--------------+---+--------+
|경기도 고양일산동구 마두동| 강촌마을(영남)|      165.99|  200601|         78000| 18|    1992|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         71300| 10|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         73000|  5|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         67000|  7|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      164.37|  200601|         87500| 17|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      164.37|  200601|        102750|  5|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|      173.58|  200601|         78500| 17|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|      173.58|  200601|         73500| 12|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|     134.803|  200601|         67500| 10|    1993|
|경기도 고양일산동구 마두동| 백마마을(극동)|       132.8|  200601|



# 데이터프레임에서 이상값 찾는 함수

In [20]:
data.select('전용면적(㎡)').dtypes

[('전용면적(㎡)', 'float')]

In [21]:
data.select('전용면적(㎡)').dtypes[0][1]

'float'

In [22]:
def out_lier(data):
    import pyspark.sql.functions as f
    for col_name in data.columns:
        if data.select(col_name).dtypes[0][1] in ('int', 'integer', 'float', 'float32', 'float64', 'double'):
            bounds = data.stat.approxQuantile(col_name, [0.25, 0.75], 0)
            # 이상치 범위계산
            lower_range = bounds[0] - (1.5 * (bounds[1] - bounds[0]))
            upper_range = bounds[1] + (1.5 * (bounds[1] - bounds[0]))
            print(col_name, lower_range, upper_range)
            # 이상치 필터링
            outliers = data.filter((data[col_name] < lower_range) | (data[col_name] > upper_range))
            print('number of outliers: ', outliers.count())
            outliers.show()

In [23]:
out_lier(data)

                                                                                

전용면적(㎡) 21.3750057220459 123.09499168395996


                                                                                

number of outliers:  639736
+-----------------------------------+-------------------+------------+--------+--------------+---+--------+
|                             시군구|             단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|
+-----------------------------------+-------------------+------------+--------+--------------+---+--------+
|             경기도 의정부시 용현동|        용현현대1차|      129.73|  200511|         12200| 11|    1992|
|             경기도 의정부시 용현동|        용현현대1차|      129.73|  200511|         11500| 17|    1992|
|       강원특별자치도 강릉시 노암동|           노암한라|     129.801|  200601|         11300|  6|    1994|
|       강원특별자치도 강릉시 포남동|              대인4|       19.99|  200601|          1000|  5|    1995|
|       강원특별자치도 강릉시 포남동|              대인4|       19.99|  200601|          1850| 10|    1995|
|       강원특별자치도 속초시 조양동|              부영3|     166.535|  200601|         15650| 11|    1996|
|       강원특별자치도 속초시 조양동|              부영3|     133.835|  200601|         12100|  9|    1996|
|강원특별자치도 양양군 양양읍 구교리|    

                                                                                

거래금액(만원) -20500.0 63500.0


                                                                                

number of outliers:  578946
+--------------------------+---------------+------------+--------+--------------+---+--------+
|                    시군구|         단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|
+--------------------------+---------------+------------+--------+--------------+---+--------+
|경기도 고양일산동구 마두동| 강촌마을(영남)|      165.99|  200601|         78000| 18|    1992|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         71300| 10|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         73000|  5|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      134.99|  200601|         67000|  7|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      164.37|  200601|         87500| 17|    1993|
|경기도 고양일산동구 마두동| 강촌마을(우방)|      164.37|  200601|        102750|  5|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|      173.58|  200601|         78500| 17|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|      173.58|  200601|         73500| 12|    1993|
|경기도 고양일산동구 마두동| 강촌마을(한신)|     134.803|  200601|         67500| 10|    1993|
|경기도 고양일산동구 마두동| 백마마

# 레이블 인코딩하기 StringIndexer
* 데이터 프레임의 데이터 타입을 검출한 후 string이면 StringIndexer를 사용해 숫자로 변환

# 레이블 인코딩과 one-hot 인코딩 한번에 하기
* 데이터 프레임에서 데이터 타입이 string인 컬럼 찾기
* StringIndexer 를 이용해 레이블 인코딩(문자->숫자)
* 숫자로 변경된 컬럼을 OneHotEncoder를 이용해 OneHotEncoding (숫자-> 원핫인코딩)

In [25]:
data.dtypes

[('시군구', 'string'),
 ('단지명', 'string'),
 ('전용면적(㎡)', 'float'),
 ('계약년월', 'string'),
 ('거래금액(만원)', 'int'),
 ('층', 'string'),
 ('건축년도', 'string')]

In [26]:
# 레이블 인코딩 하기 scikit-learn의 label encoding과 같은 역할
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCols=['시군구','단지명','계약년월','층','건축년도'], outputCols=['시군구Index','단지명Index','계약년월Index','층Index','건축년도Index'])
data = indexer.fit(data).transform(data)

                                                                                

In [27]:
data.dtypes

[('시군구', 'string'),
 ('단지명', 'string'),
 ('전용면적(㎡)', 'float'),
 ('계약년월', 'string'),
 ('거래금액(만원)', 'int'),
 ('층', 'string'),
 ('건축년도', 'string'),
 ('시군구Index', 'double'),
 ('단지명Index', 'double'),
 ('계약년월Index', 'double'),
 ('층Index', 'double'),
 ('건축년도Index', 'double')]

In [28]:
data.show()

23/11/30 14:51:00 WARN DAGScheduler: Broadcasting large task binary with size 1711.8 KiB
+---------------------------------+-----------+------------+--------+--------------+---+--------+-----------+-----------+-------------+-------+-------------+
|                           시군구|     단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|시군구Index|단지명Index|계약년월Index|층Index|건축년도Index|
+---------------------------------+-----------+------------+--------+--------------+---+--------+-----------+-----------+-------------+-------+-------------+
|강원특별자치도 양구군 양구읍 상리|       경림|        59.4|  200509|          5000| 11|    1998|     1969.0|     2139.0|        206.0|   10.0|          1.0|
|           경기도 의정부시 민락동| 한라비발디|       84.99|  200510|         12750| 19|    2003|       95.0|       25.0|        204.0|   18.0|          9.0|
|           경기도 의정부시 신곡동|      신일1|      59.878|  200510|          5750|  3|    1997|       17.0|     6011.0|        204.0|    2.0|          0.0|
|           경기도 의정부시 신곡동|       풍림|       49.83|

In [44]:
data.drop('시군구','단지명','계약년월','층','건축년도').show()

23/11/30 14:33:17 WARN DAGScheduler: Broadcasting large task binary with size 1711.0 KiB
+------------+--------------+-----------+-----------+-------------+-------+-------------+
|전용면적(㎡)|거래금액(만원)|시군구Index|단지명Index|계약년월Index|층Index|건축년도Index|
+------------+--------------+-----------+-----------+-------------+-------+-------------+
|        59.4|          5000|     1969.0|     2139.0|        206.0|   10.0|          1.0|
|       84.99|         12750|       95.0|       25.0|        204.0|   18.0|          9.0|
|      59.878|          5750|       17.0|     6011.0|        204.0|    2.0|          0.0|
|       49.83|          5500|       17.0|       36.0|        204.0|    7.0|          1.0|
|      129.73|         12200|      187.0|     2206.0|        205.0|   10.0|         13.0|
|      129.73|         11500|      187.0|     2206.0|        205.0|   16.0|         13.0|
|       43.38|          4200|     1197.0|     2888.0|        191.0|    4.0|          0.0|
|       59.89|          7500|      36

In [29]:
# 원핫인코딩 하기 -> 먼저하면 오류남(레이블인코딩먼저)
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=['시군구Index','단지명Index','계약년월Index','층Index','건축년도Index'], outputCols=['시군구_vec','단지명_vec','계약년월_vec','층_vec','건축년도_vec'])
data= encoder.fit(data).transform(data)

In [30]:
data.show()

23/11/30 14:51:58 WARN DAGScheduler: Broadcasting large task binary with size 1735.3 KiB
+---------------------------------+-----------+------------+--------+--------------+---+--------+-----------+-----------+-------------+-------+-------------+-------------------+--------------------+-----------------+---------------+---------------+
|                           시군구|     단지명|전용면적(㎡)|계약년월|거래금액(만원)| 층|건축년도|시군구Index|단지명Index|계약년월Index|층Index|건축년도Index|         시군구_vec|          단지명_vec|     계약년월_vec|         층_vec|   건축년도_vec|
+---------------------------------+-----------+------------+--------+--------------+---+--------+-----------+-----------+-------------+-------+-------------+-------------------+--------------------+-----------------+---------------+---------------+
|강원특별자치도 양구군 양구읍 상리|       경림|        59.4|  200509|          5000| 11|    1998|     1969.0|     2139.0|        206.0|   10.0|          1.0|(3653,[1969],[1.0])|(34151,[2139],[1.0])|      (206,[],[])|(87,[10],[1.0])| (60,

In [32]:
data = data.drop('시군구','단지명','계약년월','층','건축년도','시군구Index','단지명Index','계약년월Index','층Index','건축년도Index')

In [33]:
data.show()

23/11/30 14:53:19 WARN DAGScheduler: Broadcasting large task binary with size 1733.4 KiB
+------------+--------------+-------------------+--------------------+-----------------+---------------+---------------+
|전용면적(㎡)|거래금액(만원)|         시군구_vec|          단지명_vec|     계약년월_vec|         층_vec|   건축년도_vec|
+------------+--------------+-------------------+--------------------+-----------------+---------------+---------------+
|        59.4|          5000|(3653,[1969],[1.0])|(34151,[2139],[1.0])|      (206,[],[])|(87,[10],[1.0])| (60,[1],[1.0])|
|       84.99|         12750|  (3653,[95],[1.0])|  (34151,[25],[1.0])|(206,[204],[1.0])|(87,[18],[1.0])| (60,[9],[1.0])|
|      59.878|          5750|  (3653,[17],[1.0])|(34151,[6011],[1.0])|(206,[204],[1.0])| (87,[2],[1.0])| (60,[0],[1.0])|
|       49.83|          5500|  (3653,[17],[1.0])|  (34151,[36],[1.0])|(206,[204],[1.0])| (87,[7],[1.0])| (60,[1],[1.0])|
|      129.73|         12200| (3653,[187],[1.0])|(34151,[2206],[1.0])|(206,[205],[1.0])|(8

In [34]:
data.printSchema()

root
 |-- 전용면적(㎡): float (nullable = true)
 |-- 거래금액(만원): integer (nullable = true)
 |-- 시군구_vec: vector (nullable = true)
 |-- 단지명_vec: vector (nullable = true)
 |-- 계약년월_vec: vector (nullable = true)
 |-- 층_vec: vector (nullable = true)
 |-- 건축년도_vec: vector (nullable = true)



# 종속변수 인덱싱, 독립변수 인덱싱

In [35]:
data.columns

['전용면적(㎡)', '거래금액(만원)', '시군구_vec', '단지명_vec', '계약년월_vec', '층_vec', '건축년도_vec']

In [36]:
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(inputCols = ['전용면적(㎡)','시군구_vec', '단지명_vec', '계약년월_vec', '층_vec', '건축년도_vec'], outputCol = 'Features')
data = feature_assembler.transform(data)
data.show()

23/11/30 15:02:35 WARN DAGScheduler: Broadcasting large task binary with size 1771.3 KiB
+------------+--------------+-------------------+--------------------+-----------------+---------------+---------------+--------------------+
|전용면적(㎡)|거래금액(만원)|         시군구_vec|          단지명_vec|     계약년월_vec|         층_vec|   건축년도_vec|            Features|
+------------+--------------+-------------------+--------------------+-----------------+---------------+---------------+--------------------+
|        59.4|          5000|(3653,[1969],[1.0])|(34151,[2139],[1.0])|      (206,[],[])|(87,[10],[1.0])| (60,[1],[1.0])|(38158,[0,1970,57...|
|       84.99|         12750|  (3653,[95],[1.0])|  (34151,[25],[1.0])|(206,[204],[1.0])|(87,[18],[1.0])| (60,[9],[1.0])|(38158,[0,96,3679...|
|      59.878|          5750|  (3653,[17],[1.0])|(34151,[6011],[1.0])|(206,[204],[1.0])| (87,[2],[1.0])| (60,[0],[1.0])|(38158,[0,18,9665...|
|       49.83|          5500|  (3653,[17],[1.0])|  (34151,[36],[1.0])|(206,[204],[1.0

In [37]:
data = data.drop('전용면적(㎡)','시군구_vec', '단지명_vec', '계약년월_vec', '층_vec', '건축년도_vec')
data.show()

23/11/30 15:03:27 WARN DAGScheduler: Broadcasting large task binary with size 1741.2 KiB
+--------------+--------------------+
|거래금액(만원)|            Features|
+--------------+--------------------+
|          5000|(38158,[0,1970,57...|
|         12750|(38158,[0,96,3679...|
|          5750|(38158,[0,18,9665...|
|          5500|(38158,[0,18,3690...|
|         12200|(38158,[0,188,586...|
|         11500|(38158,[0,188,586...|
|          4200|(38158,[0,1198,65...|
|          7500|(38158,[0,364,517...|
|          8500|(38158,[0,364,517...|
|         12500|(38158,[0,364,517...|
|          5998|(38158,[0,364,517...|
|          7379|(38158,[0,364,517...|
|          5580|(38158,[0,364,517...|
|          7213|(38158,[0,364,517...|
|          7379|(38158,[0,364,517...|
|          7379|(38158,[0,364,517...|
|          7213|(38158,[0,364,517...|
|          7379|(38158,[0,364,517...|
|          7379|(38158,[0,364,517...|
|          8600|(38158,[0,364,517...|
+--------------+--------------------+
only 

# 홀드아웃

In [38]:
data.printSchema()

root
 |-- 거래금액(만원): integer (nullable = true)
 |-- Features: vector (nullable = true)



In [39]:
(train_data, test_data) = data.randomSplit([0.8, 0.2], seed=10)

In [40]:
train_data.printSchema()

root
 |-- 거래금액(만원): integer (nullable = true)
 |-- Features: vector (nullable = true)



In [41]:
test_data.printSchema()

root
 |-- 거래금액(만원): integer (nullable = true)
 |-- Features: vector (nullable = true)



In [49]:
from sklearn.linear_model import LogisticRegression
lr = LogisticRegression(featuresCol = 'Features', labelCol='거래금액(만원)', maxDepth=3)
lr = lr.fit(train_data)
pred_lr = lr.predict(test_data)
pred_lr.show()

TypeError: LogisticRegression.__init__() got an unexpected keyword argument 'featuresCol'

In [48]:
from sklearn.metrics import accuracy_score, classification_report