In [75]:
%%bash
python --version
pip --version

Python 3.7.3
pip 19.0.3 from /Users/bryanwu/anaconda/lib/python3.7/site-packages/pip (python 3.7)


In [76]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from math import sin, cos, atan2, sqrt
from datetime import datetime
import matplotlib.pyplot as plt

In [77]:
class ProcessCSV(beam.DoFn):
    def process(self, element):
        key, fare_amount, pickup_datetime, pickup_longitude, pickup_latitude, \
        dropoff_longitude, dropoff_latitude, passenger_count = element.split(',')
        return [{
            'fare_amount': fare_amount,
            'pickup_datetime': pickup_datetime,
            'pickup_longitude': pickup_longitude,
            'pickup_latitude': pickup_latitude,
            'dropoff_longitude': dropoff_longitude,
            'dropoff_latitude': dropoff_latitude
        }]

In [78]:
class CalculateDistance(beam.DoFn):
    def process(self, element):
        pickup_longitude = float(element['pickup_longitude'])
        pickup_latitude = float(element['pickup_latitude'])
        dropoff_longitude = float(element['dropoff_longitude'])
        dropoff_latitude = float(element['dropoff_latitude'])
        del_longitude = pickup_longitude - dropoff_longitude
        del_latitude = pickup_latitude - dropoff_latitude
        a = sin(del_latitude/2)**2 + cos(pickup_latitude)*cos(dropoff_latitude)*sin(del_longitude/2)**2
        c = 2*atan2(sqrt(a), sqrt(1-a))
        R = 6371.0
        d = R * c
        element['distance'] = d
        return [element]

In [79]:
class FilterNoisyDataPoint(beam.DoFn):
    def process(self, element):
        def is_within_boundingbox(element, BB=(-74.5, -72.8, 40.5, 41.8)):
            pickup_longitude = float(element['pickup_longitude'])
            pickup_latitude = float(element['pickup_latitude'])
            dropoff_longitude = float(element['dropoff_longitude'])
            dropoff_latitude = float(element['dropoff_latitude'])
            return (pickup_longitude >= BB[0]) & (pickup_longitude <= BB[1]) & \
                   (pickup_latitude >= BB[2]) & (pickup_latitude <= BB[3]) & \
                   (dropoff_longitude >= BB[0]) & (dropoff_longitude <= BB[1]) & \
                   (dropoff_latitude >= BB[2]) & (dropoff_latitude <= BB[3])
        
        if is_within_boundingbox(element):
            return [element]

In [80]:
class ExtractDateTime(beam.DoFn):
    def process(self, element):
        dt = datetime.strptime(element['pickup_datetime'], '%Y-%m-%d %H:%M:%S UTC')
        element['hour'] = dt.hour
        element['month'] = dt.month
        element['week_number'] = dt.isocalendar()[1]
        element['weekday'] = dt.weekday()
        return [element]

In [101]:
class MergeToString(beam.DoFn):
    def process(self, element):
        column_names = ['fare_amount', 
                        'week_number', 
                        'weekday', 'hour', 
                        'pickup_longitude', 
                        'pickup_latitude', 
                        'dropoff_longitude',
                        'dropoff_latitude',
                        'distance']
        
        return [','.join(['{}']*len(column_names)).format(*[element[column] for column in column_names])]
        

In [102]:
opts = PipelineOptions(
    runner='direct'
)
pipe = beam.Pipeline(options=opts)

In [103]:
file_location = './new-york-city-taxi-fare-prediction/train_subset.csv'
column_names = ['fare_amount', 
                        'week_number', 
                        'weekday', 'hour', 
                        'pickup_longitude', 
                        'pickup_latitude', 
                        'dropoff_longitude',
                        'dropoff_latitude',
                        'distance']

lines = (pipe 
         | 'ReadFile' >> beam.io.ReadFromText(file_location, skip_header_lines=1)
         | 'SplitCSV' >> beam.ParDo(ProcessCSV())
         | 'CalculateDistance' >> beam.ParDo(CalculateDistance())
         | 'FilterNoisyDataPoints' >> beam.ParDo(FilterNoisyDataPoint())
         | 'ExtractDateTime' >> beam.ParDo(ExtractDateTime())
         | 'MergeToString' >> beam.ParDo(MergeToString())
         | 'WriteToGCS' >> beam.io.WriteToText('./tmp/transformed_train.csv', header=', '.join(column_names)))

In [104]:
pipe.run()



<apache_beam.runners.portability.fn_api_runner.RunnerResult at 0x1205e2e48>