In [2]:
from datetime import datetime
from google.cloud import storage, bigquery
import os
from dateutil.relativedelta import relativedelta

AIRFLOW_HOME = '/projects/airflow_finding_the_fastest_way_around_NYC/airflow'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = f'{AIRFLOW_HOME}/dags/credentials.json'


# Access bucket
client = storage.Client()
bucket = client.get_bucket('citi_bikes_airflow_project')
blobs = bucket.list_blobs()

In [4]:
blobs.num_results

0

In [None]:
# Generalizing the DAG

So far, for the purposes of building the DAG, we have hard-coded the date to download. This is not ideal, as we want the DAG to be able to run for any date.

The final step in this article is to generlize the DAG so it would apply to other dates.

We will create a function called grab_latest_date() that will return the latest date in the Google Bucket. This function will be called in the DAG to determine the date to download.

The function will be called in the DAG as follows:

# Grab date to download
def grab_latest_date():

    # Access bucket
    client = storage.Client()
    bucket = client.get_bucket('citi_bikes_airflow_project')
    blobs = bucket.list_blobs()
    latest_date = datetime(2022,1,1)

    # Return 202201 for the first run of the DAG
    if blobs.num_results == 0:
        return latest_date.strftime('%Y%m')

    # Loop through blobs to find the latest date
    for blob in blobs:
        if blob.name.endswith('.zip'):
            date = blob.name.split('-')[0]
            date = datetime.strptime(date, '%Y%m')
            if date > latest_date:
                latest_date = date
    
    # Adding one month to the latest date to get the next month
    latest_date = latest_date.strftime('%Y%m')
    latest_date = datetime.strptime(latest_date, '%Y%m')
    latest_date = latest_date + relativedelta(months=1)
    return latest_date.strftime('%Y%m')

date_to_download = grab_latest_date()

In [None]:
project name: airflow_finding_the_fastest_way_around_NYC

introduction

This series of articles will walk you through the process of building a data pipeline using Apache Airflow. The data pipeline will be used to download, process, and analyze Citi Bike data from NYC. 

Citi Bike is a bicycle rental service available on the streets of New York. Users can pick up a bike at one station and ride it to their destination station. The data pipeline will be used to analyze the fastest way to get around NYC.

The data pipeline will be built in two articles. 

The first article will focus on building the DAG to periodically download the data, store it in a Google Bucket as backup, and load it into BigQuery. 
The second article will focus on building the data processing functions and the DAG to process the data and analyze the fastest way to get around NYC.
