In [10]:
import os
import folium
import pandas as pd
import shutil

root_folder = "/home/trungdc/unimelb/MAST30024/asm/mast30034_2021_s2_project_1-alexdang02-1"
data_dir = os.path.join(root_folder, "Data")
SQLOutput_dir = os.path.join(root_folder, "code/SparkSQL_Output")
plot_dir = os.path.join(root_folder, "Plots")

In [20]:
import pyspark.sql.functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")

In [21]:

spark = SparkSession.builder.getOrCreate()
sdf = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(os.path.join(data_dir,"Merge", "train.csv"))

In [22]:
def weekend(dayofweek):
  if   dayofweek <= 5: 
      return 0
  else:
      return 1
udfWeekendFunc = F.udf(weekend, IntegerType())

def workingHour(hour, weekend):
    if weekend <= 5:
        if 7 <= hour <= 19:
            return 1
        else:
            return 0
    else:
        return 0
udfworkingHour = F.udf(workingHour, IntegerType())

In [23]:
def preprocess(sdf):
    sdf = sdf.withColumn("tpep_pickup_datetime", to_timestamp(sdf.tpep_pickup_datetime, 'yyyy-MM-dd HH:mm:ss') ) \
    .withColumnRenamed("duration(m)", "duration")  \
    .withColumnRenamed('expected_total_distance(miles)', "expected_total_distance") \
    .withColumnRenamed('expected_total_duration(s)', "expected_total_duration") 
    sdf = sdf.withColumn("expected_total_duration", sdf.expected_total_duration/60)
    sdf = sdf.filter(sdf.passenger_count <= 6) \
        .filter(sdf.duration > 0) \
        .filter(sdf.PULocationID != sdf.DOLocationID) \
        .filter( ~ ((sdf.RatecodeID == 2) & (sdf.fare_amount <= 50))) \
        .filter(sdf.duration < 500) \
        .filter(sdf.total_amount >= 2.5) \
        .filter(sdf.tolls_amount >= 0) \
        .filter(sdf.VendorID.isin([1,2])) \
        .filter(sdf.RatecodeID.isin([1,2])) \
        .filter(sdf.payment_type.isin([1,2])) \
        .filter(sdf.tip_amount <= 25) \
    .withColumn("DayofWeek", dayofweek(sdf.tpep_pickup_datetime))   \
    .withColumn("Weekend", udfWeekendFunc(col("DayofWeek"))) \
    .withColumn("Month", month(sdf.tpep_pickup_datetime))   \
    .withColumn("Hour", hour(sdf.tpep_pickup_datetime)) 

    sdf = sdf.withColumn("WorkingHour", udfworkingHour(col("Hour"), col("DayofWeek"))) \
    .withColumn("trip_distance",F.when(sdf.trip_distance==0, sdf.expected_total_distance).otherwise(sdf.trip_distance)) \
    .withColumn("passenger_count", F.when(sdf.passenger_count==0, 1).when(sdf.passenger_count==6, 5).otherwise(sdf.passenger_count)) \
    .withColumn("tip_amount", F.when(sdf.tip_amount<0, 0).otherwise(sdf.tip_amount)) \
    .withColumn("fare_amount", F.when(sdf.fare_amount<2.5, 2.5).otherwise(sdf.fare_amount)) \

    .drop("expected_total_distance", "expected_total_duration", "tpep_pickup_datetime", "VendorID")
    return sdf
sdf = preprocess(sdf)

    # .withColumn("Key", concat(sdf.DayofWeek,lit(","), sdf.Hour )) \

In [24]:
sdf.show(5)

+---------------+-------------+----------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------+-------+-------+-------+-------------+----+---+-------------+-------+---------+---------+-------+-----+----+-----------+----+
|passenger_count|trip_distance|RatecodeID|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|duration|tempMax|tempMin|tempAvg|tempDeparture| hdd|cdd|precipitation|newSnow|snowDepth|DayofWeek|Weekend|Month|Hour|WorkingHour| Key|
+---------------+-------------+----------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------+-------+-------+-------+-------------+----+---+-------------+-------+---------+---------+-------+-----+----+-----------+----+
|              3|         17.0|         1|         100|          11|           1|       50.5|  0.

In [25]:
sdf.count()

28064431

In [26]:
file = "train1.csv"
size_before = sdf.count()
print(size_before)
sample = sdf.sample(withReplacement=False, fraction=0.1)
size_after = sample.count()
print(f"File {file} is sampled with size {sample.count()/size_before} or {size_after} rows ")
sample.repartition(1).write.csv(os.path.join(data_dir,"Model", file), header=True)
for outfile in os.listdir(os.path.join(data_dir, "Model", file)):
    if outfile.endswith(".csv"):
        os.rename(os.path.join(data_dir, "Model",file ), os.path.join(data_dir, "Model", file ))


28064431
File train1.csv is sampled with size 0.1000888990052925 or 2808938 rows 


# Create sampling test set

In [27]:
spark = SparkSession.builder.getOrCreate()
sdf = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(os.path.join(data_dir,"Merge", "test.csv"))

In [28]:
sdf=  preprocess(sdf)

In [29]:
file = "test1.csv"
size_before = sdf.count()
print(size_before)
sample = sdf.sample(withReplacement=False, fraction=0.03)
size_after = sample.count()
print(f"File {file} is sampled with size {sample.count()/size_before} or {size_after} rows ")
sample.repartition(1).write.csv(os.path.join(data_dir,"Model", file), header=True)
for outfile in os.listdir(os.path.join(data_dir, "Model", file)):
    if outfile.endswith(".csv"):
        os.rename(os.path.join(data_dir, "Model",file ), os.path.join(data_dir, "Model", file ))

23096972
File test1.csv is sampled with size 0.029983800473932253 or 692535 rows 
