In [1]:

import pandas as pd
import requests
import io 
from datetime import datetime

In [2]:

# Methods 

# Create a list of URLS we need to fetch
today = datetime.now() 
current_month = today.month 
current_year = today.year
start_url = "https://mdhopendata.blob.core.windows.net/verkehrsdetektion/"


def get_urls(year=2023):
    """URL constructor for api.viz.berlin.de traffic data.

    Args:
        year (int, optional): How far back in time will the URLs / data go. Defaults to 2023.

    Returns:
        urls_list: List of URLs pointing to gzip csv of traffic data in Berlin
    """
    urls_list =[]
    for year in range(year, current_year+1):
        #2021 and 2023 have different url patterns
        if year == 2021:
            mid_url = f"{year}/Messquerschnitt/mq_hr_{year}_"
        elif year == 2023:
            mid_url = f"{year}/Messquerschnitte%20(fahrtrichtungsbezogen)/mq_hr_{year}_"
        else:
            mid_url = f"{year}/Messquerschnitt%20(fahrtrichtungsbezogen)/mq_hr_{year}_"
        base_url = start_url + mid_url

        # Loop through every month of the year
        for month in range(1, 13):
            # Break if current year, current month is reached
            if year == current_year and month == current_month:
                break
            # Finish URL and append to list
            appendix = f"{month:02d}.csv.gz"
            url = base_url + appendix
            urls_list.append(url)
    return urls_list

def get_traffic_data(year=2023):
    urls_list = get_urls(year)
    for index, url in enumerate(urls_list):
        try:
            response = requests.get(url)
            response.raise_for_status()
            data = io.BytesIO(response.content)
            if index == 0:
                df = pd.read_csv(data, compression='gzip', delimiter=";")
            else:
                df_next = pd.read_csv(data, compression='gzip', delimiter=";")
                df = pd.concat([df, df_next])
            print(len(df))
        except requests.exceptions.HTTPError as errh:
            print(f"{url[-14:]} not found")
    return df

def get_sensor_location_data():
    """Loads the location data of the traffic sensors in Berlin (VIZ Berlin)

    Returns:
        pandas.Dataframe: _description_
    """
    try:
        url = "https://mdhopendata.blob.core.windows.net/verkehrsdetektion/Stammdaten_Verkehrsdetektion_2022_07_20.xlsx"
        response = requests.get(url)
        location_data = pd.read_excel(io.BytesIO(response.content))
        return location_data
    except requests.exceptions.HTTPError as errh:
        print(f"Sensor location data not found")

def duplicates_drop(df, name):
    """Deletes duplicates from dataframe. Prints out how many duplicates were deleted.

    Args:
        df (pandas.DataFrame): Dataframe
        name (str): Name of the dataframe for printing

    Returns:
        df (pandas.DataFrame): Cleaned dataframe from duplicates
    """
    before_drop = len(df)
    df = df.drop_duplicates().reset_index(drop=True)
    after_drop = len(df)

    if after_drop < before_drop:
        print(f"{name}: {before_drop - after_drop} duplicates deleted")
    if after_drop == before_drop:
        print(f"{name}: No duplicates")
    return df


In [9]:
# EXTRACT DATA

fact_table = get_traffic_data(2018)
location_dim =  get_sensor_location_data()



In [4]:
# SAVE RAW DATA (OPTIONAL)

yes_choices = ['yes', 'y']
no_choices = ['no', 'n']

while True:
    user_input = input('Do you want rewrite the file?(yes/no): ')

    if user_input.lower() in yes_choices:
        print('User typed yes')
        fact_table.to_csv("vehicle_data.csv", index = False)
        break
    elif user_input.lower() in no_choices:
        print('User typed no')
        break
    else:
        print('Type yes/no')


User typed no


In [10]:
# TRANSFORM DATA 

## fact_table: Rename the columns according to data model
fact_table = fact_table.rename(columns={"mq_name":"mq_name_id","tag": "date", "stunde" : "hour"})
fact_table["datetime_id"] = pd.to_datetime(fact_table["date"] + ' ' + fact_table["hour"].apply(str), format='%d.%m.%Y %H')
fact_table.drop(["date", "hour"], axis=1, inplace=True)
fact_table['mq_name_id'] = fact_table['mq_name_id'].map(lambda x: x.rstrip('n'))

## location_dim: Rename the columns according to data model
location_dim  = location_dim.rename(columns={"DET_ID15": "det_id", "MQ_KURZNAME":"mq_name_id","LÄNGE (WGS84)": "lng" ,"BREITE (WGS84)":"lat", "POSITION": "desc"})
location_dim = location_dim[["det_id", "mq_name_id", "lat", "lng", "desc"]]


## Duplication check and drop
fact_table = duplicates_drop(fact_table, "fact_table")
location_dim = duplicates_drop(location_dim, "location_dim")






34 duplicates deleted


In [6]:
## Create Datetime_dim 

# Datetime dimension table from fact table
datetime_dim = fact_table[['datetime_id']].reset_index(drop=True)
# Delete all the duplication
datetime_dim = datetime_dim.drop_duplicates().reset_index(drop=True)
datetime_dim["weekday"] = datetime_dim["datetime_id"].dt.weekday
datetime_dim["hour"] = datetime_dim["datetime_id"].dt.hour
datetime_dim["day"] = datetime_dim["datetime_id"].dt.day
datetime_dim["month"] = datetime_dim["datetime_id"].dt.month
datetime_dim["year"] = datetime_dim["datetime_id"].dt.year


datetime_dim.to_csv("data/datetime_dim.csv", index=False)

In [6]:
# SAVE DATA 
fact_table.to_csv("./data/fact_table.csv", index=False)
location_dim.to_csv("./data/location_dim.csv", index=False)