In [1]:
import json
import os
import shutil
from datetime import date, timedelta

import pandas as pd
import requests
from google.cloud import storage

In [2]:
PROJECT_ID = 'nyc-transit-426211'
REGION = 'us-central1'
STAGING = 'gs://motor-vehicle-crashes/collisions/2024-01-01/data.json'
TEMP = 'gs://motor-vehicle-crashes/temp'
HOME = '/home/andrub818'
api_key, secret_key = os.environ['NYCT_API_KEY'], os.environ['NYCT_SECRET_KEY']

#### __API documentation__
https://dev.socrata.com/docs/filtering

#### __Cool MLOps resource__
https://mlops-coding-course.fmind.dev/2.%20Prototyping/2.0.%20Notebooks.html

In [10]:
# pedestrian crash API endpoint: https://data.cityofnewyork.us/Public-Safety/Motor-Vehicle-Collisions-Person/f55k-p6yu/about_data
# url = "https://data.cityofnewyork.us/resource/f55k-p6yu.json?$order=crash_date DESC&$limit=25"
url = "https://data.cityofnewyork.us/resource/f55k-p6yu.json?$where=crash_date = '2024-06-12T00:00:00.000'"
r = requests.get(url, auth=(api_key, secret_key))
print(r.status_code)
d = r.json()
print(len(d))
d

200
759


[{'unique_id': '13007935',
  'collision_id': '4732475',
  'crash_date': '2024-06-12T00:00:00.000',
  'crash_time': '13:05',
  'person_id': '73d08a1a-1b18-4f39-bb01-852bf85da5b3',
  'person_type': 'Occupant',
  'person_injury': 'Unspecified',
  'vehicle_id': '20667154',
  'ped_role': 'Registrant'},
 {'unique_id': '13009524',
  'collision_id': '4732780',
  'crash_date': '2024-06-12T00:00:00.000',
  'crash_time': '17:50',
  'person_id': '2a4d7e60-4bff-40a7-9b67-101a9d90ebab',
  'person_type': 'Occupant',
  'person_injury': 'Unspecified',
  'vehicle_id': '20668026',
  'person_age': '47',
  'ped_role': 'Registrant',
  'person_sex': 'F'},
 {'unique_id': '13010740',
  'collision_id': '4732988',
  'crash_date': '2024-06-12T00:00:00.000',
  'crash_time': '0:00',
  'person_id': 'eb0120ad-ab66-4cc8-985d-be56d752c5a1',
  'person_type': 'Occupant',
  'person_injury': 'Unspecified',
  'vehicle_id': '20668711',
  'ped_role': 'Registrant'},
 {'unique_id': '13009061',
  'collision_id': '4732716',
  'cr

In [11]:
# motor vehicle collision API endpoint: https://data.cityofnewyork.us/Public-Safety/Motor-Vehicle-Collisions-Crashes/h9gi-nx95/about_data
# c_url = "https://data.cityofnewyork.us/resource/h9gi-nx95.json?$order=crash_date DESC&$limit=25"
c_url = "https://data.cityofnewyork.us/resource/h9gi-nx95.json?$where=crash_date = '2024-06-12T00:00:00.000'"
r2 = requests.get(c_url, auth=(api_key, secret_key))
print(r2.status_code)
d2 = r2.json()
print(len(d2))
d2

200
204


[{'crash_date': '2024-06-12T00:00:00.000',
  'crash_time': '6:00',
  'borough': 'QUEENS',
  'zip_code': '11354',
  'latitude': '40.76961',
  'longitude': '-73.831505',
  'location': {'latitude': '40.76961',
   'longitude': '-73.831505',
   'human_address': '{"address": "", "city": "", "state": "", "zip": ""}'},
  'on_street_name': '31 ROAD',
  'off_street_name': '137 STREET',
  'number_of_persons_injured': '2',
  'number_of_persons_killed': '0',
  'number_of_pedestrians_injured': '0',
  'number_of_pedestrians_killed': '0',
  'number_of_cyclist_injured': '0',
  'number_of_cyclist_killed': '0',
  'number_of_motorist_injured': '2',
  'number_of_motorist_killed': '0',
  'contributing_factor_vehicle_1': 'Traffic Control Disregarded',
  'contributing_factor_vehicle_2': 'Unspecified',
  'collision_id': '4732290',
  'vehicle_type_code1': 'Station Wagon/Sport Utility Vehicle'},
 {'crash_date': '2024-06-12T00:00:00.000',
  'crash_time': '12:24',
  'latitude': '40.825577',
  'longitude': '-73.918

### Overview

Set up integrations for these datasets. Run them daily. Write to BQ. Seems like they're up to date as of a few days ago (When I pulled around midnight on May 17, most recent data showed May 13). Partition by day.

After tables are created, join them to create more exhaustive dataset. Motor vehicle collisions have lat/long data. Questions for EDA:
- what does a map viz look like
- is there a location where accidents happen more often
- people stats? age/bodily injury/death
- what type of vehicle?
- are certain vehicles more prone to pedestrian death (trucks obvi).

Put a streamlit dashboard together that shows the stuff you want to display.

After all that, what is something we can predict? and how can we operationalize it? Apply some of the skills you learned in the MLOps class.

In [3]:
r3 = requests.get('https://data.cityofnewyork.us/resource/h9gi-nx95.json?crash_date=2024-06-12T00:00:00.000', auth=(api_key, secret_key))
print(r3.status_code)
print(len(r3.json()))
r3.json()[:3]

200
222


[{'crash_date': '2024-06-12T00:00:00.000',
  'crash_time': '6:00',
  'borough': 'QUEENS',
  'zip_code': '11354',
  'latitude': '40.76961',
  'longitude': '-73.831505',
  'location': {'latitude': '40.76961',
   'longitude': '-73.831505',
   'human_address': '{"address": "", "city": "", "state": "", "zip": ""}'},
  'on_street_name': '31 ROAD',
  'off_street_name': '137 STREET',
  'number_of_persons_injured': '2',
  'number_of_persons_killed': '0',
  'number_of_pedestrians_injured': '0',
  'number_of_pedestrians_killed': '0',
  'number_of_cyclist_injured': '0',
  'number_of_cyclist_killed': '0',
  'number_of_motorist_injured': '2',
  'number_of_motorist_killed': '0',
  'contributing_factor_vehicle_1': 'Traffic Control Disregarded',
  'contributing_factor_vehicle_2': 'Unspecified',
  'collision_id': '4732290',
  'vehicle_type_code1': 'Station Wagon/Sport Utility Vehicle'},
 {'crash_date': '2024-06-12T00:00:00.000',
  'crash_time': '12:24',
  'latitude': '40.825577',
  'longitude': '-73.918

In [4]:
# download data. arg inputs?
# dt = (date.today() - timedelta(days=5)).strftime('%Y-%m-%d')
dt = '2024-06-12'

In [5]:
def download_data(dt, endpoint):
    base_url = "https://data.cityofnewyork.us/resource/"
    endpoints = {
        "pedestrian": "f55k-p6yu.json",
        "collision": "h9gi-nx95.json"
    }
    ep = endpoints[endpoint]
    params = f"?crash_date={dt}T00:00:00.000"
    url = base_url + ep + params
    response = requests.get(url, auth=(api_key, secret_key))
    response.raise_for_status()
    return response.json()

c = download_data('2024-06-12', 'collision')
print(len(c))
c[:1]

222


[{'crash_date': '2024-06-12T00:00:00.000',
  'crash_time': '14:35',
  'on_street_name': 'LONG ISLAND EXPRESSWAY',
  'off_street_name': '188 STREET',
  'number_of_persons_injured': '0',
  'number_of_persons_killed': '0',
  'number_of_pedestrians_injured': '0',
  'number_of_pedestrians_killed': '0',
  'number_of_cyclist_injured': '0',
  'number_of_cyclist_killed': '0',
  'number_of_motorist_injured': '0',
  'number_of_motorist_killed': '0',
  'contributing_factor_vehicle_1': 'Unspecified',
  'contributing_factor_vehicle_2': 'Unspecified',
  'collision_id': '4732236',
  'vehicle_type_code1': 'Box Truck',
  'vehicle_type_code2': 'Sedan'}]

In [9]:
def create_tmp_dir(dt, data):
    "Create a tmp directory after downloading data"
    data_dir = os.path.join('..', 'data', 'tmp', dt)
    
    # create dir if it doesn't exist
    os.makedirs(data_dir, exist_ok=True)
    
    # write the data to a json file
    file_path = os.path.join(data_dir, 'data.json')
    with open(file_path, 'w') as out:
        for record in data:
            json.dump(record, out)
            out.write('\n')
    print(f"data saved to {file_path}")

In [10]:
create_tmp_dir('2024-06-12', c)

data saved to ../data/tmp/2024-06-12/data.json


In [11]:
def sync_to_gcs(dt, tmp_dir):
    # source_file = tmp_dir # "../data/tmp/2024-01-01/data.json"
    destination_blob_name = f"collisions/{dt}/data.json"
    storage_client = storage.Client(project=PROJECT_ID)
    bucket = storage_client.bucket('motor-vehicle-crashes')
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(tmp_dir)
    print(f"File {tmp_dir} uploaded to {bucket}/{destination_blob_name}.")

In [12]:
sync_to_gcs(dt, '../data/tmp/2024-06-12/data.json')

File ../data/tmp/2024-06-12/data.json uploaded to collisions/2024-06-12/data.json.


In [73]:
# delete local file.
if os.path.isfile(file_path):
    os.remove(file_path)
    print(f"File {file_path} deleted.")
else:
    print(f"File {file_path} not found.")

OSError: [Errno 66] Directory not empty: '../data/tmp/2024-01-01'

In [71]:
def remove_local_dir(tmp_dir):
    if path.exists(tmp_dir):
        shutil.rmtree(tmp_dir)

'../data/tmp/2024-01-01'

## __Dataflow__

Documentation example: https://beam.apache.org/get-started/wordcount-example/

Tour of Beam: https://tour.beam.apache.org/tour/python/introduction/guide

#### __Steps__:
1. Create the pipeline object and run it with a direct runner to test locally.  You can run it with a command similar to this:
    
    `python -m apache_beam.examples.wordcount --input YOUR_INPUT_FILE --output counts`

2. Create your pcollection from a csv file saved in a GCS bucket. Note: you can download data from NYCOpenData in csv or JSON format, I'll switch to CSV for now.

3. Write the data to another GCS bucket.

4. After you figured out basic io, have the input then come from an API call.

5. Apply one transformation to the PCollection, formatting one of the dates so it can be recognized as a date partition in BQ (or maybe I can append a date column to the data).

6. Write the data to a BQ table.

7. Now process 2 files, collisions and pedestrians, to execute the same steps in parellel, writing to separate tables.

8. After doing the testing with DirectRunner, update to DataflowRunner.

9. Figure out how to schedule this on a regular basis.

10. You now have reliable data that you can query from SQL, and take you to your next step which would be to perform more EDA, and make visualizations in a streamlit app.



In [6]:
# Using DirectRunner to test

import argparse
import logging
import re

import apache_beam as beam

from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

input = '/home/andrub818/motor-vehicle-crashes/notebooks/data.json'
output = '/home/andrub818/motor-vehicle-crashes/data/output.json'
# example command:
# python -m apache_beam.examples.wordcount --input YOUR_INPUT_FILE --output counts


# Output PCollection
class Output(beam.PTransform):
    class _OutputFn(beam.DoFn):

        def process(self, element):
            print(element)

    def expand(self, input):
        input | beam.ParDo(self._OutputFn())

def main(argv=None, save_main_session=True):

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        default=input,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=True,
        help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    with beam.Pipeline(options=pipeline_options) as p:
        # Read the text file[pattern] into a PCollection.
        lines = p | 'Read' >> ReadFromText(known_args.input) \
            | beam.Filter(lambda line: line != "")

        # Write the output using a "Write" transform that has side effects.
        # pylint: disable=expression-not-assigned
        output = lines | 'Write' >> WriteToText(known_args.output)


        result = p.run()
        result.wait_until_finish()


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    main()


IndentationError: unexpected indent (2088034255.py, line 53)

Now let's do the same thing we did but using dataflow running, this time reading and writing from one GCS bucket to another. After that, let's read from a GCS bucket and write to BQ doing something like this:

https://tour.beam.apache.org/tour/python/io/bigqueryio/write-table-schema

You'll run it using a command like this:

```
python -m dataflow_runner \
    --input gs://machine-learning-workspace/motor-vehicle-crashes/collisions/2024-01-01/data.json \
    --output gs://machine-learning-workspace/motor-vehicle-crashes/staging/ \
    --runner DataflowRunner \
    --project prime-odyssey-415016 \
    --region us-central1 \
    --temp_location gs://machine-learning-workspace/motor-vehicle-crashes/temp/
```

#### Using Dataflow Runner