# Big Data Analytics: NYC Crashes

## Einleitung

In den Straßen New York Citys strotzt es gerade so vor Verkehr, wodurch Verkehrsunfälle an der Tagesordnung stehen. Durch das NYC Open Data Project werden die erfassten Verkehrsunfälle für die Allgemeinheit zugänglich.

Thema des Berichts und unseres Projekts ist die Nutzung dieser öffentlich zugänglichen Unfalldaten zur Analyse und visuellen Auswertung. 

### Aufbau der Data Pipeline

Für das Projekt soll eine Data-Pipeline aufgebaut werden, die die Daten in die Datenbank lädt. 
Von dort aus sollen die Daten von einem Python-Script zur Analyse angefragt werden.

Beteiligt an der Data-Pipeline ist eine Kafka-Instanz, wobei ein Python-Script einen Producer dafür darstellt und ein weiteres Python-Script einen Consumer. Das Producer-Script lädt die Daten zeilenweise aus der Datenquelle, einer CSV-Datei, in Kafka ein, während der Consumer die Daten aus Kafka ausliest und in die MongoDB schreibt. Das Analyse-Script bezieht die Daten wiederum aus der MongoDB.

#### Installation der benötigten Docker-Container

Das Script `start_docker.sh` startet den MongoDB Docker Container, beziehungsweise erstellt bei Bedarf einen neuen Docker Container. Dabei wird, falls noch kein Image für die DockerDB vorhanden ist, das neueste Heruntergeladen. 

Die weiteren benötigten Docker Container sind in der Docker-Compose-Konfiguration (`docker-compose.yml`) vorhanden.
Diese können mit `docker-compose up` erstellt oder heruntergeladen werden. 
Mit `docker-compose down` können die gestarteten Container gestoppt werden, wobei auf die Vollendigung des Befehls gewartet werden sollte.

Auf manchen Systemen befindet sich die Docker Engine nach stoppen der Docker Container mit `docker-compose down` in einem nicht-revertierbarem Fehlerzustand, wobei ein _Neustart des Computers_ unausweichlich ist. Weitere Informationen zu dem Fehler, wobei jedoch _keine der angegebenen Workarounds genutzt werden sollte_, findet sich [auf GitHub](https://github.com/docker/for-linux/issues/162). 

### Datenquelle
Als Datenquellen dienen die vom NYC Open Data Project bereitgestellten Unfalldaten des NYPD. Die detaillierten Unfalldaten befinden sich in 3 unterschiedlichen Datenquellen die unter Anderem als CSV-Datei verfügbar sind: [Motor Vehicle Collisions](https://data.cityofnewyork.us/browse?Data-Collection_Data-Collection=Motor+Vehicle+Collisions&q=crashes).

Enthalten in den Daten sind die am Unfall beteiligten Verkehrsteilnehmer, die Verletzten und Toten, sowie die genaue Position des Unfalls. Darüberhinaus sind noch weitere Daten in den Quellen enthalten, auf die teilweise im Analyseabschnitt genauer eingegangen wird.

### Analyseziele
Ziel der Analyse der Daten ist herauszufinden, welche saisonalen und lokalen Zusammenhänge festzustellen sind, wodurch sich besondere Hotspots im Stadtraum New Yorks lokalisieren lassen. Des weiteren soll ermittelt werden, welche Verkehrsteilnehmer besonders oft getötet oder verletzt werden, sowie an welchen Stellen sich gerade schwerwiegende Unfälle häufen. Anhand einer animierten Karte kann dadurch das Unfallgeschehen über die Zeit betrachtet werden.

### Installation der Python Packages

In [None]:
pip install kafka-python pymongo

### Importieren benötigter Module

In [None]:
from kafka import KafkaProducer, KafkaConsumer
from pymongo import MongoClient
import datetime as dt
import requests
import os
from pathlib import Path
from multiprocessing import Process
from time import sleep

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

### Herunterladen der Datensätze
Zum herunterladen der Datensätze werden die Daten von der Datenquelle mithilfe des nachfolgenden Python-Scripts heruntergeladen. Resultat sind drei `.csv`-Dateien die die Unfalldaten zeilenweise enthalten.

In [None]:
if not os.path.exists('data/'):
    os.mkdir('data')

for file_name, download_url in [
    ('crashes.csv', 'https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv?accessType=DOWNLOAD'),
    ('vehicles.csv', 'https://data.cityofnewyork.us/api/views/bm4k-52h4/rows.csv?accessType=DOWNLOAD'),
    ('persons.csv', 'https://data.cityofnewyork.us/api/views/f55k-p6yu/rows.csv?accessType=DOWNLOAD'),
]:
    if not os.path.isfile(fp:= (Path('data') / file_name)):
        with open(fp, 'wb') as crash_file:
            crash_file.write(requests.get(download_url).content)

## Einlesen der Daten und senden über den Producer

Hier wird der Producer erstellt, sodass dieser verwendet werden kann, um Daten zu senden

In [None]:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

### Einlesen der CSV-Daten in Kafka

Achtung: Viele Datensätze, dauert einige Minuten

In [None]:
def send_to_kafka(topic):
    print(f'Started {topic} Producer process.')    
    
    with open((fn:=Path('data') / f'{topic}.csv')) as source:
        for i, row in enumerate(source.readlines()):
            if i:
                producer.send(f'nyc_{topic}', value=bytearray(row, encoding='utf-8'), key=bytearray(str(i), encoding='utf-8'))
            if not i % 25000 and i:
                print(f'read {i} lines from {fn}.')
                
producers = {
    'crashes': Process(target=send_to_kafka, args=['crashes']),
    'vehicles': Process(target=send_to_kafka, args=['vehicles']),
    'persons': Process(target=send_to_kafka, args=['persons']),
}

### Lesen der Daten mit dem Consumer und Import in die Datenbank

In [None]:
client = MongoClient("mongodb://localhost:27017")

crashes_db = client['nyc_crashes']['crashes']
persons_db = client['nyc_crashes']['persons']
vehicles_db = client['nyc_crashes']['vehicles']

deletion_processes = []

try:
    p = Process(taget=crashes_db.delete_many)
    p.start()
    deletion_processes.append(p)
except Exception:
    pass

try:
    p = Process(taget=persons_db.delete_many)
    p.start()
    deletion_processes.append(p)
except Exception:
    pass

try:
    p = Process(taget=vehicles_db.delete_many)
    p.start()
    deletion_processes.append(p)
except Exception:
    pass

for p in deletion_processes:
    p.join()

Zunächst werden die Daten aus den Topics `nyc_persons` und `nyc_vehicles` verarbeitet.

In [None]:
def process_topic(topic, types):
    consumer = KafkaConsumer(f'nyc_{topic}', bootstrap_servers=['localhost:9092'], auto_offset_reset="earliest")
    counter = 0
    
    print(f'Started {topic} Consumer process.')
    
    for row in consumer:
        row = row.value.decode('utf-8').split(',')
        print(row)
        counter += 1
        if counter and not counter % 100:
            print(f'Read {counter} lines from Kafka topic {topic}')
        res = {}
        for idx, (db_field, field_type) in enumerate(types.items()):
            try:
                if row_data := row[idx]:
                    res[db_field] = field_type.__call__(row_data) if not isinstance(row_data, field_type) else row_data
                else:
                    res[db_field] = None   
            except Exception as e:
                res[db_field] = None

        # make sure the document is identifiable
        if res['_id']:
            db_by_topic[topic].insert_one(res)

            
db_by_topic = {
    'vehicles': vehicles_db,
    'persons': persons_db,
    'crashes': crashes_db
}
            
consumers = {
    'vehicles': Process(target=process_topic, args=(
        'vehicles',  {
            'unique_id': str,
            '_id': str,
            'crash_date': str,
            'crash_time': str,
            'vehicle_id': str,
            'state_registration': str,
            'vehicle_type': str,
            'vehicle_make': str,
            'vehicle_model': str,
            'vehicle_year': str,
            'travel_direction': str,
            'vehicle_occupants': str,
            'driver_sex': str,
            'driver_license_status': str,
            'driver_license_jurisdiction': str,
            'pre_crash': str,
            'point_of_impact': str,
            'vehicle_damage': str,
            'vehicle_damage_1': str,
            'vehicle_damage_2': str,
            'vehicle_damage_3': str,
            'public_property_damage': str,
            'public_property_damage_type': str,
            'contributing_factor_1': str,
            'contributing_factor_2': str
        }
    )),
    'persons': Process(target=process_topic, args=(
        'persons', {
            'unique_id': str,
            '_id': str,
            'crash_date': str,
            'crash_time': str,
            'person_id': str,
            'person_type': str,
            'person_injury': str,
            'vehicle_id': str,
            'person_age': str,
            'ejection': str,
            'emotional_status': str,
            'bodily_injury': str,
            'position_in_vehicle': str,
            'safety_equipment': str,
            'ped_location': str,
            'ped_action': str,
            'complaint': str,
            'ped_role': str,
            'contributing_factor_1': str,
            'contributing_factor_2': str,
            'person_sex': str
        }
    ))
}

### Starten der Prozesse
Nun werden die Prozesse zum Produzieren und Konsumieren der Daten aus `vehicles.csv` und `persons.csv` gestartet.

In [None]:
for topic in 'persons vehicles'.split(' '):
    producers[topic].start()

sleep(10)
    
for topic in 'persons vehicles'.split(' '):
    consumers[topic].start()

In [None]:
# Just in case you need to stop processes:
for topic in 'persons vehicles'.split(' '):
    consumers[topic].terminate()
    producers[topic].terminate()

Danach können diese Daten mit den Daten aus `nyc_crashes` gejoined werden.

In [None]:
def process_crashes():
    consumer = KafkaConsumer('nyc_crashes', bootstrap_servers=['localhost:9092'], auto_offset_reset="earliest")
    
    types = {
                'crash_date': str, 
                'crash_time': str, 
                'borough': str, 
                'zip_code': int, 
                'latitude': float, 
                'longitude': float, 
                'location': str, 
                'on_street_name': str, 
                'cross_street_name': str, 
                'off_street_name': str, 
                'persons_injured': int, 
                'persons_killed': int, 
                'pedestrians_injured': int, 
                'pedestrians_killed': int, 
                'cyclists_injured': int, 
                'cyclists_killed': int, 
                'motorists_injured': int, 
                'motorists_killed': int, 
                'contributing_factor_vehicle_1': str,
                'contributing_factor_vehicle_2': str,
                'contributing_factor_vehicle_3': str,
                'contributing_factor_vehicle_4': str,
                'contributing_factor_vehicle_5': str,
                '_id': int,
                'vehicle_type_code_1': str,
                'vehicle_type_code_2': str,
                'vehicle_type_code_3': str,
                'vehicle_type_code_4': str,
                'vehicle_type_code_5': str,
    }
    
    for row in consumer: 
        row = row.value.decode('utf-8').split(',')
        counter += 1
        if counter and not counter % 25000:
            print(counter)
        res = {}
        for idx, (db_field, field_type) in enumerate(types.items()):
            try:
                if row_data := row[idx]:
                    res[db_field] = field_type.__call__(row_data) if not isinstance(row_data, field_type) else row_data
                else:
                    res[db_field] = None
            except Exception as e:
                res[db_field] = None

        # make sure the document is identifiable
        if res['_id']:
            res['vehicles'] = list(db_by_topic['vehicles'].find({'_id': {'$eq': res['_id']}}))
            res['persons'] = list(db_by_topic['persons'].find({'_id': {'$eq': res['_id']}}))

            db_by_topic['crashes'].insert_one(res)

consumers['crashes'] = Process(target=process_crashes)

### Starten des Prozesses
Im letzten Schritt müssen die beteiligten Prozesse gestartet werden.

In [None]:
producers['crashes'].start()
consumers['crashes'].start()

In [None]:


producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

for i in range(1, 13):
    if i < 10:
        i = "0%s" % i
    dataset = open('./data/2019/yellow-cabs-2019-%s.csv' % i, encoding='utf-8')
    rows = dataset.readlines()[1:]

    for i, row in enumerate(rows):
        producer.send('yellow-cabs', value=bytearray(row, encoding='utf-8'), key=bytearray(str(i), encoding='utf-8'))

for i in range(1, 7):
    if i < 10:
        i = "0%s" % i
    dataset = open('./data/2019/yellow-cabs-2020-%s.csv' % i, encoding='utf-8')
    rows = dataset.readlines()[1:]

    for i, row in enumerate(rows):
        producer.send('yellow-cabs', value=bytearray(row, encoding='utf-8'), key=bytearray(str(i), encoding='utf-8'))

In [None]:


consumer = KafkaConsumer('yellow-cabs', bootstrap_servers=[""])

client = MongoClient("")

yellow_collection = client['datawarehouse']['bg-yellowcabs']
yellow_collection.delete_many({})

count = 0

for msg in consumer: 
    count += 1
    print('Received new message: %s' % count)
    values = msg.value.decode('utf-8').split(',')
    
    yellow_collection.insert_one({
        'pickup_datetime': dt.datetime.strptime(values[1], "%Y-%m-%d %H:%M:%S"),
        'dropoff_datetime': dt.datetime.strptime(values[2], "%Y-%m-%d %H:%M:%S"),
        'passenger_count': int(values[3]),
        'trip_distance': float(values[4]),
        'PULocationID': values[5],
        'DOLocationID': values[6],
        'payment_type': int(values[9]),
        'fare_amount': float(values[10]),
        'tip_amount': float(values[15]),
        'total_amount': float(values[16])
    })