# OpenCl

In [11]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import pyopencl as cl  # Import the OpenCL GPU computing API
from pyopencl import clmath
import pyopencl.array as pycl_array  # Import PyOpenCL Array 

import time

df = pd.read_csv('euc_dataset.csv', index_col = 'Unnamed: 0')
df = df.dropna(subset=['pickup_latitude','pickup_longitude','dropoff_latitude','dropoff_longitude'])
df_ = df[:10000]

def euc_distance_opencl(df):
    context = cl.create_some_context()  # Initialize the Context
    queue = cl.CommandQueue(context)  # Instantiate a Queue

    # Create 4 random pyopencl arrays representing latitude x and longitude y for pickup point 1 and dropoff point 2
    # x1 = pycl_array.to_device(queue, np.random.uniform(-90.0,90.0,1000000))
    # x2 = pycl_array.to_device(queue, np.random.uniform(-90.0,90.0,1000000))
    # y1 = pycl_array.to_device(queue, np.random.uniform(-180.0,180.0,1000000))
    # y2 = pycl_array.to_device(queue, np.random.uniform(-180.0,180.0,1000000))

    x1 = pycl_array.to_device(queue,np.array(df_['pickup_latitude']))
    x2 = pycl_array.to_device(queue,np.array(df_['dropoff_latitude']))
    y1 = pycl_array.to_device(queue,np.array(df_['pickup_longitude']))
    y2 = pycl_array.to_device(queue,np.array(df_['dropoff_longitude']))

    dist = pycl_array.empty_like(x1)  # Create an empty pyopencl distances array

    program = cl.Program(context, """
    __kernel void cal_dist(__global const float *x1, __global const float *x2, __global const float *y1, 
    __global const float *y2, __global float *dist)
    {
      int i = get_global_id(0);
      dist[i] = (x1[i]-x2[i])*(x1[i]-x2[i]) + (y1[i]-y2[i])*(y1[i]-y2[i]);
    }""").build()  # Create the OpenCL program

    # Enqueue the program for execution and store the result in c

    program.cal_dist(queue, x1.shape, None, x1.data, x2.data, y1.data, y2.data, dist.data)  

    sqrt_dist = cl.clmath.sqrt(dist, queue=None)

    print("total_dist: {} \n".format(sqrt_dist[:10]))  



total_dist: [5.95804297e-34 5.95804297e-34 7.48469770e-35 5.90032239e-33
 9.55045755e-36 6.06646474e-28 1.75055799e-26 3.68618294e-36
 5.31092863e-25 1.32375316e-32] 

run_time: 0.080713


In [12]:
%timeit -n 100 euc_distance_opencl(df_)

The slowest run took 9.14 times longer than the fastest. This could mean that an intermediate result is being cached.
22.6 ms ± 23 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)


# Map Reduce
To run the job, place the data on HDFS by issuing the command (replace the filename) from within the file directory:

The job should be launched by the following command:


The hjs command submits the job; the -file parameter indicates which files will be distributed to the worker nodes (i.e., the source code); the -mapper and -reducer parameters specify the paths to the mapper and reducer scripts; and the -input and -output paths specify the input file(s) and output file paths for the job. To retrieve the results of the computation, run:

To get all the partial outputs. To get the complete output as one file, run:

## mapper.py

In [7]:
#!/usr/bin/env python
import os
import sys
import math 

for line in sys.stdin:
        entry = line.strip().split(",")
        
        # Pickup Centroid Latitude, Pickup Centroid Longitude, Dropoff Centroid Latitude and Dropoff Centroid Longitude
        pick_la, pick_lon, drop_la, drop_lon = entry[6],entry[7],entry[8],entry[9]
        
        # pick up community area
        area = entry[5]
        
        # calculate euclidean distance for each trip
        dist = math.sqrt((pick_la - drop_la)**2 + (pick_lon - drop_lon)**2)
        
        print('{}\t{}'.format(area,dist))
        

## reducer.py

In [10]:
#!/usr/bin/env python
import os
import sys

current_area = None
current_dist = 0.0
area = None
dist = 0.0

# number of trips in this area
num_trips = 0

for line in sys.stdin:
    area, dist = line.split('\t', 1)
            
    # convert distance (currently a string) to int
    try:
        dist = float(dist)
    except ValueError:
        # dist was not a number, so silently
        # ignore/discard this line
        continue

    # Since Hadoop sorts map output by key (here: area) before it is passed to the reducer
    if current_area == area:
        current_dist += dist
        num_trips += 1
    else:
        if current_area and i != 0:
            # write result to STDOUT
            print('{}\t{}'.format(current_area, current_dist/num_trips))
        current_area = area
        current_dist = dist
        num_trips = 0

# Output the last area if needed
if current_area == area and num_trips !=0:
    print('{}\t{}'.format(current_area, current_dist/num_trips))