# CHECK DATA FROM HDFS

In [1]:
from hdfs import InsecureClient

hdfs_client = InsecureClient('http://127.0.0.1:9870', user = 'owen')

hdfs_directory = '/partition_csv'


file_list = hdfs_client.list(hdfs_directory)

In [2]:
print(file_list)

['part0.csv', 'part1.csv', 'part2.csv']


# SPARK - ML

In [3]:
# setting for ssh 

import os
local_ip = '127.0.0.1'
os.environ['SPARK_LOCAL_IP'] = local_ip

# setting for out of memory

MAX_MEMORY = '2g' 

In [4]:
from pyspark.sql import SparkSession

ss = SparkSession.builder.appName('spark-ml')\
                          .config('spark.executer.memory', MAX_MEMORY)\
                          .config('spark.driver.memory', MAX_MEMORY)\
                          .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/27 06:29:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
ADDR = 'hdfs://127.0.0.1:9000/partition_csv/*'

df = ss.read.csv(ADDR, header = True, inferSchema = True)

df.show()

                                                                                

+---------+-------------------+-------------------+--------------------+-------------------+-------------------+--------------------+-------------+-----------+---------------+------------+------------+------------+------------+
|vendor_id|        pickup_date|        pickup_time|         pickup_zone|       dropoff_date|       dropoff_time|        dropoff_zone|trip_distance|fare_amount|passenger_count|driving_time|tolls_amount|total_amount|payment_type|
+---------+-------------------+-------------------+--------------------+-------------------+-------------------+--------------------+-------------+-----------+---------------+------------+------------+------------+------------+
|        1|2021-06-01 00:00:00|2023-08-27 09:13:26|Penn Station/Madi...|2021-06-01 00:00:00|2023-08-27 09:17:14|        Clinton West|          0.9|        5.0|            1.0|      0:3:48|         0.0|        11.0|           1|
|        1|2021-06-01 00:00:00|2023-08-27 09:32:23|         JFK Airport|2021-06-01 00:00

In [6]:
df.printSchema()

root
 |-- vendor_id: integer (nullable = true)
 |-- pickup_date: timestamp (nullable = true)
 |-- pickup_time: timestamp (nullable = true)
 |-- pickup_zone: string (nullable = true)
 |-- dropoff_date: timestamp (nullable = true)
 |-- dropoff_time: timestamp (nullable = true)
 |-- dropoff_zone: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- driving_time: string (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)



In [7]:
# change driving_time column type (string -> int)
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

def convert_to_seconds(time_str):
    parts = time_str.split(":")
    hours = int(parts[0])
    minutes = int(parts[1])
    seconds = int(parts[2])
    total_seconds = hours * 3600 + minutes * 60 + seconds
    return total_seconds

# make user defined func
convert_to_seconds_udf = udf(convert_to_seconds, IntegerType())

df = df.withColumn("driving_time_seconds", convert_to_seconds_udf(col("driving_time")))

df.show()

[Stage 3:>                                                          (0 + 1) / 1]

+---------+-------------------+-------------------+--------------------+-------------------+-------------------+--------------------+-------------+-----------+---------------+------------+------------+------------+------------+--------------------+
|vendor_id|        pickup_date|        pickup_time|         pickup_zone|       dropoff_date|       dropoff_time|        dropoff_zone|trip_distance|fare_amount|passenger_count|driving_time|tolls_amount|total_amount|payment_type|driving_time_seconds|
+---------+-------------------+-------------------+--------------------+-------------------+-------------------+--------------------+-------------+-----------+---------------+------------+------------+------------+------------+--------------------+
|        1|2021-06-01 00:00:00|2023-08-27 09:13:26|Penn Station/Madi...|2021-06-01 00:00:00|2023-08-27 09:17:14|        Clinton West|          0.9|        5.0|            1.0|      0:3:48|         0.0|        11.0|           1|                 228|
|   

                                                                                

### note : if all columns are string, check folder's files

In [8]:
# label (target) = total_amount 

# features = driving_time, trip_distance

df.createOrReplaceTempView('df')

query ="""
select trip_distance, driving_time_seconds, total_amount
from df
"""

for_train = ss.sql(query)


In [14]:
import gc

gc.collect()

234

# MODEL TRAIN

In [10]:
train_df, test_df = for_train.randomSplit([.7, .3], seed = 42)

In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

vecssemble = VectorAssembler(inputCols = ['trip_distance', 'driving_time_seconds'], outputCol = 'vecrized_features')

v_train, v_test = vecssemble.transform(train_df), vecssemble.transform(test_df)

lr = LinearRegression(maxIter = 10,
                      labelCol = 'total_amount',
                      featuresCol = 'vecrized_features')

model = lr.fit(v_train)

23/08/27 06:30:23 WARN Instrumentation: [dfb789dd] regParam is zero, which might cause numerical instability and overfitting.


[Stage 4:>                                                        (0 + 12) / 12]

23/08/27 06:30:37 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/08/27 06:30:37 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

23/08/27 06:30:41 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


                                                                                

In [15]:
# accuracy 

model.summary.r2

0.742453786141426