# **How RayDP works together with Ray**

## 1. Colab enviroment Setup

In [None]:
# Install ray and raydp
! pip install ray==1.9
! pip install raydp
! pip install --upgrade pip
! pip install raydp
! pip install ray[tune]
! pip install torch==1.8.1+cpu -f https://download.pytorch.org/whl/torch_stable.html
! pip list

## 2. Import dependencies

In [55]:
import ray
from ray import tune
import ray.data
from ray import train
from ray.train import Trainer, TrainingCallback, get_dataset_shard
import torch
import torch.nn as nn
import torch.nn.functional as F

import raydp
from raydp.utils import random_split
from typing import List, Dict

import os
import argparse

import numpy as np
import pandas as pd

from os.path import dirname, realpath

import numpy as np
from pyspark.sql.functions import hour, quarter, month, year, dayofweek, dayofmonth, weekofyear, col, lit, udf, abs as functions_abs

## 3. Get the data file

In [None]:

base_date = np.datetime64("2010-01-01 00:00:00")

# The size of data
N = 2000

fare_amount = np.random.uniform(3.0, 50.0, size=N)
pick_long = np.random.uniform(-74.2, -73.8, size=N)
pick_lat = np.random.uniform(40.7, 40.8, size=N)
drop_long = np.random.uniform(-74.2, -73.8, size=N)
drop_lat = np.random.uniform(40.7, 40.8, size=N)
passenger_count = np.random.randint(1, 5, size=N)
date = np.random.randint(0, 157680000, size=N) + base_date
date = np.array([t.item().strftime("%Y-%m-%d %H:%m:%S UTC") for t in date])
key = ["fake_key"] * N
df = pd.DataFrame({
    "key": key,
    "fare_amount":fare_amount,
    "pickup_datetime": date,
    "pickup_longitude": pick_long,
    "pickup_latitude": pick_lat,
    "dropoff_longitude": drop_long,
    "dropoff_latitude": drop_lat,
    "passenger_count": passenger_count
    })
df.to_csv("/content/fake_nyctaxi.csv", index=False)

## 4. Define the data_process function

In [None]:
def clean_up(data):
    data = data.filter(col("pickup_longitude")<=-72) \
            .filter(col("pickup_longitude")>=-76) \
            .filter(col("dropoff_longitude")<=-72) \
            .filter(col("dropoff_longitude")>=-76) \
            .filter(col("pickup_latitude")<=42) \
            .filter(col("pickup_latitude")>=38) \
            .filter(col("dropoff_latitude")<=42) \
            .filter(col("dropoff_latitude")>=38) \
            .filter(col("passenger_count")<=6) \
            .filter(col("passenger_count")>=1) \
            .filter(col("fare_amount") > 0) \
            .filter(col("fare_amount") < 250) \
            .filter(col("dropoff_longitude") != col("pickup_longitude")) \
            .filter(col("dropoff_latitude") != col("pickup_latitude"))
    return data

# Add time related features
def add_time_features(data):
    data = data.withColumn("day", dayofmonth(col("pickup_datetime")))
    data = data.withColumn("hour_of_day", hour(col("pickup_datetime")))
    data = data.withColumn("day_of_week", dayofweek(col("pickup_datetime"))-2)
    data = data.withColumn("week_of_year", weekofyear(col("pickup_datetime")))
    data = data.withColumn("month_of_year", month(col("pickup_datetime")))
    data = data.withColumn("quarter_of_year", quarter(col("pickup_datetime")))
    data = data.withColumn("year", year(col("pickup_datetime")))
    @udf("int")
    def night(hour, weekday):
        if ((16 <= hour <= 20) and (weekday < 5)):
            return int(1)
        else:
            return int(0)

    @udf("int")
    def late_night(hour):
        if ((hour <= 6) or (hour >= 20)):
            return int(1)
        else:
            return int(0)
    data = data.withColumn("night", night("hour_of_day", "day_of_week"))
    data = data.withColumn("late_night", late_night("hour_of_day"))
    return data

def add_distance_features(data):
    @udf("float")
    def manhattan(lat1, lon1, lat2, lon2):
        return float(np.abs(lat2 - lat1) + np.abs(lon2 - lon1))
    # Location of NYC downtown
    ny = (-74.0063889, 40.7141667)
    # Location of the three airport in NYC
    jfk = (-73.7822222222, 40.6441666667)
    ewr = (-74.175, 40.69)
    lgr = (-73.87, 40.77)
    # Features about the distance between pickup/dropoff and airport
    data = data.withColumn("abs_diff_longitude", functions_abs(col(
        "dropoff_longitude")-col("pickup_longitude"))) \
               .withColumn("abs_diff_latitude", functions_abs(col(
        "dropoff_latitude") - col("pickup_latitude")))
    data = data.withColumn("manhattan", col(
        "abs_diff_latitude")+col("abs_diff_longitude"))
    data = data.withColumn("pickup_distance_jfk", manhattan(
        "pickup_longitude", "pickup_latitude", lit(jfk[0]), lit(jfk[1])))
    data = data.withColumn("dropoff_distance_jfk", manhattan(
        "dropoff_longitude", "dropoff_latitude", lit(jfk[0]), lit(jfk[1])))
    data = data.withColumn("pickup_distance_ewr", manhattan(
        "pickup_longitude", "pickup_latitude", lit(ewr[0]), lit(ewr[1])))
    data = data.withColumn("dropoff_distance_ewr", manhattan(
        "dropoff_longitude", "dropoff_latitude", lit(ewr[0]), lit(ewr[1])))
    data = data.withColumn("pickup_distance_lgr", manhattan(
        "pickup_longitude", "pickup_latitude", lit(lgr[0]), lit(lgr[1])))
    data = data.withColumn("dropoff_distance_lgr", manhattan(
        "dropoff_longitude", "dropoff_latitude", lit(lgr[0]), lit(lgr[1])))
    data = data.withColumn("pickup_distance_downtown", manhattan(
        "pickup_longitude", "pickup_latitude", lit(ny[0]), lit(ny[1])))
    data = data.withColumn("dropoff_distance_downtown", manhattan(
        "dropoff_longitude", "dropoff_latitude", lit(ny[0]), lit(ny[1])))
    return data

def drop_col(data):
    data = data.drop("pickup_datetime") \
            .drop("pickup_longitude") \
            .drop("pickup_latitude") \
            .drop("dropoff_longitude") \
            .drop("dropoff_latitude") \
            .drop("passenger_count") \
            .drop("key")
    return data

def nyc_taxi_preprocess(data):
    data = clean_up(data)
    data = add_time_features(data)
    data = add_distance_features(data)
    return drop_col(data)

## 5. Init or connect to a ray cluster

In [None]:
# 
# ray.init(address="auto")
# 
ray.init(num_cpus=6)

2022-05-06 03:34:42,787	INFO services.py:1340 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'metrics_export_port': 62617,
 'node_id': '2debd0aa31226f76ebe3dfaa8e38dc3bc389dea687b43737aff54701',
 'node_ip_address': '172.28.0.2',
 'object_store_address': '/tmp/ray/session_2022-05-06_03-34-39_827110_63/sockets/plasma_store',
 'raylet_ip_address': '172.28.0.2',
 'raylet_socket_name': '/tmp/ray/session_2022-05-06_03-34-39_827110_63/sockets/raylet',
 'redis_address': '172.28.0.2:6379',
 'session_dir': '/tmp/ray/session_2022-05-06_03-34-39_827110_63',
 'webui_url': '127.0.0.1:8265'}

## 6. Get a spark session

In [None]:
app_name = "NYC Taxi Fare Prediction with RayDP"
num_executors = 1
cores_per_executor = 1
memory_per_executor = "500M"
spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)

## 7. Data processing

In [None]:
# Read data from file
data = spark.read.format("csv").option("header", "true") \
        .option("inferSchema", "true") \
        .load("/content/fake_nyctaxi.csv")
# Set spark timezone for processing datetime
spark.conf.set("spark.sql.session.timeZone", "UTC")
# Transform the dataset
data = nyc_taxi_preprocess(data)
# Split data into train_dataset and test_dataset
train_df, test_df = random_split(data, [0.9, 0.1], 0)
features = [field.name for field in list(train_df.schema) if field.name != "fare_amount"]
# Convert spark dataframe into ray Dataset
# Remember to align ``parallelism`` with ``num_workers`` of ray train
train_dataset = ray.data.from_spark(train_df, parallelism = 8)
test_dataset = ray.data.from_spark(test_df, parallelism = 8)
feature_dtype = [torch.float] * len(features)



## 8. Define a neural network model

In [None]:
class NYC_Model(nn.Module):
    def __init__(self, cols):
        super().__init__()
        self.fc1 = nn.Linear(cols, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 16)
        self.fc5 = nn.Linear(16, 1)
        self.bn1 = nn.BatchNorm1d(256)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(64)
        self.bn4 = nn.BatchNorm1d(16)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.bn1(x)
        x = F.relu(self.fc2(x))
        x = self.bn2(x)
        x = F.relu(self.fc3(x))
        x = self.bn3(x)
        x = F.relu(self.fc4(x))
        x = self.bn4(x)
        x = self.fc5(x)
        return x

## 9. Define train and test function

In [None]:
def train_epoch(dataset, model, criterion, optimizer):
    model.train()
    train_loss, correct, data_size, batch_idx = 0, 0, 0, 0
    for batch_idx, (inputs, targets) in enumerate(dataset):
        # Compute prediction error
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        train_loss += loss.item()
        correct += (outputs == targets).sum().item()
        data_size += inputs.size(0)
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_loss /= (batch_idx + 1)
    train_acc = correct/data_size
    return train_acc, train_loss

def test_epoch(dataset, model, criterion):
    model.eval()
    test_loss, correct, data_size, batch_idx = 0, 0, 0, 0
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(dataset):
            # Compute prediction error
            outputs = model(inputs)
            test_loss += criterion(outputs, targets).item()
            correct += (outputs == targets).sum().item()
            data_size += inputs.size(0)
    test_loss /= (batch_idx + 1)
    test_acc = correct/data_size
    return test_acc, test_loss

## 10. Define train function

In [None]:
def train_func(config):
    num_epochs = config["num_epochs"]
    lr = config["lr"]
    batch_size = config["batch_size"]
    # Then convert to torch datasets
    train_data_shard = get_dataset_shard("train")
    train_dataset = train_data_shard.to_torch(feature_columns=features,
                                              label_column="fare_amount",
                                              label_column_dtype=torch.float,
                                              feature_column_dtypes=feature_dtype,
                                              batch_size=batch_size)
    test_data_shard = get_dataset_shard("test")
    test_dataset = test_data_shard.to_torch(feature_columns=features,
                                            label_column="fare_amount",
                                            label_column_dtype=torch.float,
                                            feature_column_dtypes=feature_dtype,
                                            batch_size=batch_size)
    model = NYC_Model(len(features))
    model = train.torch.prepare_model(model)
    criterion = nn.SmoothL1Loss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_results = []
    for epoch in range(num_epochs):
        train_acc, train_loss = train_epoch(train_dataset, model, criterion, optimizer)
        test_acc, test_loss = test_epoch(test_dataset, model, criterion)
        train.report(epoch = epoch, train_acc = train_acc, train_loss = train_loss)
        train.report(epoch = epoch, test_acc=test_acc, test_loss=test_loss)
        loss_results.append(test_loss)

## 11. Define the callback function

In [None]:
class PrintingCallback(TrainingCallback):
    def handle_result(self, results: List[Dict], **info):
        print(results)

## 12. Train model via ray train

In [None]:
trainer = Trainer(backend="torch", num_workers=num_executors)
trainer.start()
results = trainer.run(
    train_func, config={"num_epochs": 10, "lr": 0.1, "batch_size": 8},
    callbacks=[PrintingCallback()],
    dataset={
        "train": train_dataset,
        "test": test_dataset
    }
)
trainer.shutdown()

2022-05-06 03:35:25,648	INFO trainer.py:172 -- Trainer logs will be logged in: /root/ray_results/train_2022-05-06_03-35-25
2022-05-06 03:35:27,366	INFO trainer.py:178 -- Run results will be logged in: /root/ray_results/train_2022-05-06_03-35-25/run_001
[2m[36m(BaseWorkerMixin pid=2470)[0m 2022-05-06 03:35:27,363	INFO torch.py:67 -- Setting up process group for: env:// [rank=0, world_size=1]
[2m[36m(BaseWorkerMixin pid=2470)[0m 2022-05-06 03:35:27,453	INFO torch.py:239 -- Moving model to device: cpu


[{'epoch': 0, 'train_acc': 0.0, 'train_loss': 12.028679358107704, '_timestamp': 1651808130, '_time_this_iter_s': 2.573432445526123, '_training_iteration': 1}]
[{'epoch': 0, 'test_acc': 0.0, 'test_loss': 678.7765073423033, '_timestamp': 1651808130, '_time_this_iter_s': 0.0010333061218261719, '_training_iteration': 2}]
[{'epoch': 1, 'train_acc': 0.0, 'train_loss': 11.546148598194122, '_timestamp': 1651808133, '_time_this_iter_s': 3.356105089187622, '_training_iteration': 3}]
[{'epoch': 1, 'test_acc': 0.0, 'test_loss': 11.001856521323875, '_timestamp': 1651808133, '_time_this_iter_s': 0.0009887218475341797, '_training_iteration': 4}]
[{'epoch': 2, 'train_acc': 0.0, 'train_loss': 11.407406462090355, '_timestamp': 1651808135, '_time_this_iter_s': 2.6061911582946777, '_training_iteration': 5}]
[{'epoch': 2, 'test_acc': 0.0, 'test_loss': 10.934884194974545, '_timestamp': 1651808135, '_time_this_iter_s': 0.0008146762847900391, '_training_iteration': 6}]
[{'epoch': 3, 'train_acc': 0.0, 'train_l

## 13. shut down ray and raydp

In [None]:
raydp.stop_spark()
ray.shutdown()