# Linear regression

In [1]:
# 스파크 세션 생성
from pyspark.sql import SparkSession

In [2]:
# 인스턴스 생성(Max Memory 지정: Out of Memory 방지)
MAX_MEMORY="5g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .getOrCreate()

21/12/20 04:31:10 WARN Utils: Your hostname, 6miniui-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.200.112 instead (on interface en0)
21/12/20 04:31:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/20 04:31:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# 파일 불러오기
trips_df = spark.read.csv("/Users/6mini/trip/*", inferSchema=True, header=True)

                                                                                

In [4]:
# 스키마 확인
trips_df.printSchema()
# 만약 모든 컬럼이 string이 나온다면, 폴더 내 파일이 이상할 수 있으니 확인할 필요가 있다.

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [5]:
# SQL을 사용하기 위해 TempView에 담는다.
trips_df.createOrReplaceTempView("trips")

In [6]:
# 분석 때 했었던 이상치 제거를 적용한다.
query = """
SELECT 
    trip_distance,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 5
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""

In [7]:
# 쿼리를 적용시킨 데이터를 만들어 TempView에 담는다.
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [8]:
data_df.show()

+-------------+------------+
|trip_distance|total_amount|
+-------------+------------+
|          2.1|        11.8|
|          0.2|         4.3|
|         14.7|       51.95|
|         10.6|       36.35|
|         4.94|       24.36|
|          1.6|       14.15|
|          4.1|        17.3|
|          5.7|        21.8|
|          9.1|        28.8|
|          2.7|       18.95|
|         6.11|        24.3|
|         1.21|       10.79|
|          7.4|       33.92|
|         1.01|        10.3|
|         0.73|       12.09|
|         1.17|       12.36|
|         0.78|        9.96|
|         1.66|        12.3|
|         0.93|         9.3|
|         1.16|       11.84|
+-------------+------------+
only showing top 20 rows



In [9]:
# 통계 확인
data_df.describe().show()
# 600만원...?

[Stage 3:====>                                                    (1 + 11) / 12]

+-------+------------------+-----------------+
|summary|     trip_distance|     total_amount|
+-------+------------------+-----------------+
|  count|           1190465|          1190465|
|   mean|2.6467179043483084|16.47590517992286|
| stddev| 3.433414482182404|11.71610760812923|
|    min|              0.01|             0.01|
|    max|             427.7|           2292.4|
+-------+------------------+-----------------+



                                                                                

In [10]:
# 트레인 테스트 스플릿
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=6)

In [11]:
print(train_df.count())
print(test_df.count())

                                                                                

952078


[Stage 9:====>                                                    (1 + 11) / 12]

238387


                                                                                

In [12]:
# 벡터 어셈블러로 트레인이 가능한 상태로 변환한다.
from pyspark.ml.feature import VectorAssembler

In [13]:
vassembler = VectorAssembler(inputCols=["trip_distance"], outputCol="features")

In [14]:
vtrain_df = vassembler.transform(train_df)

In [15]:
vtrain_df.show()

+-------------+------------+--------+
|trip_distance|total_amount|features|
+-------------+------------+--------+
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.8|  [0.01]|
|         0.01|         3.8|  [0.01]|
|         0.01|         3.8|  [0.01]|
|         0.01|         3.8|  [0.01]|
|         0.01|         4.8|  [0.01]|
|         0.01|         5.8|  [0.01]|
|         0.01|         5.8|  [0.01]|
|         0.01|         5.8|  [0.01]|
|         0.01|         5.8|  [0.01]|
|         0.01|         5.8|  [0.01]|
|         0.01|         5.8|  [0.01]|
|         0.01|         5.8|  [0.01]|
|         0.01|         5.8|  [0.01]|
+-------------+------------+--------+
only showing top 20 rows



In [16]:
# 리그레션 모델 생성
from pyspark.ml.regression import LinearRegression

In [25]:
from pyspark.ml.regression import GBTRegressor

# 간단한 베이스 라인 코드
lr = LinearRegression(
    maxIter=50,
    labelCol="total_amount",
    featuresCol="features"
)

In [26]:
model = lr.fit(vtrain_df)

21/12/20 04:39:28 WARN Instrumentation: [aa658890] regParam is zero, which might cause numerical instability and overfitting.
21/12/20 04:39:29 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [27]:
vtest_df = vassembler.transform(test_df)

In [28]:
# 예측
prediction = model.transform(vtest_df)

In [29]:
prediction.show()

+-------------+------------+--------+-----------------+
|trip_distance|total_amount|features|       prediction|
+-------------+------------+--------+-----------------+
|         0.01|         3.3|  [0.01]|8.127374610240356|
|         0.01|         3.3|  [0.01]|8.127374610240356|
|         0.01|         3.3|  [0.01]|8.127374610240356|
|         0.01|         5.8|  [0.01]|8.127374610240356|
|         0.01|         5.8|  [0.01]|8.127374610240356|
|         0.01|         5.8|  [0.01]|8.127374610240356|
|         0.01|         6.3|  [0.01]|8.127374610240356|
|         0.01|         6.8|  [0.01]|8.127374610240356|
|         0.01|        30.3|  [0.01]|8.127374610240356|
|         0.01|        91.8|  [0.01]|8.127374610240356|
|         0.02|         3.3|  [0.02]|  8.1590346468269|
|         0.02|         3.3|  [0.02]|  8.1590346468269|
|         0.02|         3.3|  [0.02]|  8.1590346468269|
|         0.02|        3.96|  [0.02]|  8.1590346468269|
|         0.02|         5.8|  [0.02]|  8.1590346

In [24]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="total_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(prediction)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

[Stage 518:====>                                                  (1 + 11) / 12]

Root Mean Squared Error (RMSE) on test data = 7.67687


                                                                                

In [30]:
# 평가
model.summary.rootMeanSquaredError

3.9391521978481396

In [31]:
model.summary.r2

0.8809194877142736

In [32]:
# 실서비스에서 사용하기 위한 테스크
from pyspark.sql.types import DoubleType
distance_list = [1.1, 5.5, 10.5, 30.0]
distance_df = spark.createDataFrame(distance_list, DoubleType()).toDF("trip_distance")

In [33]:
distance_df.show()

+-------------+
|trip_distance|
+-------------+
|          1.1|
|          5.5|
|         10.5|
|         30.0|
+-------------+



                                                                                

In [34]:
vdistance_df = vassembler.transform(distance_df)

In [35]:
vdistance_df.show()

+-------------+--------+
|trip_distance|features|
+-------------+--------+
|          1.1|   [1.1]|
|          5.5|   [5.5]|
|         10.5|  [10.5]|
|         30.0|  [30.0]|
+-------------+--------+



In [36]:
model.transform(vdistance_df).show()

+-------------+--------+------------------+
|trip_distance|features|        prediction|
+-------------+--------+------------------+
|          1.1|   [1.1]|11.578318598173754|
|          5.5|   [5.5]|25.508734696253526|
|         10.5|  [10.5]|   41.338752989526|
|         30.0|  [30.0]|103.07582433328864|
+-------------+--------+------------------+



In [37]:
spark.stop()