In [7]:
from pathlib import Path
import pandas as pd
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket
from random import randint
import os
from prefect.deployments import Deployment
from prefect.infrastructure.docker import DockerContainer
from prefect_gcp import GcpCredentials


## Question 1. Load January 2020 data

Using the `etl_web_to_gcs.py` flow that loads taxi data into GCS as a guide, create a flow that loads the green taxi CSV dataset for January 2020 into GCS and run it. Look at the logs to find out how many rows the dataset has.

How many rows does that dataset have?

* 447,770
* 766,792
* 299,234
* 822,132

In [4]:
@task(retries=3)
def fetch(dataset_url: str) -> pd.DataFrame:
    """Read taxi data from web into pandas DataFrame"""
    # if randint(0, 1) > 0:
    #     raise Exception

    df = pd.read_csv(dataset_url)
    return df


@task(log_prints=True)
def clean(df: pd.DataFrame, color: str) -> pd.DataFrame:
    """Fix dtype issues"""

    if color == "green":
        df["lpep_pickup_datetime"] = pd.to_datetime(df["lpep_pickup_datetime"])
        df["lpep_dropoff_datetime"] = pd.to_datetime(df["lpep_dropoff_datetime"])
        print(df.head(2))
        print(f"columns: {df.dtypes}")
        print(f"rows: {len(df)}")
        return df
    elif(color == "yellow"):
        df["tpep_pickup_datetime"] = pd.to_datetime(df["tpep_pickup_datetime"])
        df["tpep_dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"])
        print(df.head(2))
        print(f"columns: {df.dtypes}")
        print(f"rows: {len(df)}")
        return df

@task()
def write_local(df: pd.DataFrame, color: str, dataset_file: str) -> Path:
    """Write DataFrame out locally as parquet file"""
    mydir = (f"data/{color}/")
    check_folder = os.path.isdir(mydir)
    
    if not check_folder:
        os.makedirs(mydir)
        print("created folder:", mydir)
    else:
        print(mydir, "Folder already exists")
        
    path = Path(f"data/{color}/{dataset_file}.parquet")
    df.to_parquet(path, compression="gzip")
    return path


@task()
def write_gcs(path: Path) -> None:
    """Upload local parquet file to GCS"""
    gcs_block = GcsBucket.load("zoom-gcs")
    gcs_block.upload_from_path(from_path=path, to_path=path)
    return


@flow()
def etl_web_to_gcs(months: list[int] = [1], year: int = 2020, color: str = "green") -> None:
    """The main ETL function"""

    for month in months:
        dataset_file = f"{color}_tripdata_{year}-{month:02}"
        dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz"

        df = fetch(dataset_url)
        df_clean = clean(df, color)
        path = write_local(df_clean, color, dataset_file)
        write_gcs(path)

if __name__ == "__main__":
    etl_web_to_gcs()


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`


  df = pd.read_csv(dataset_url)


data/green/ Folder already exists


# Ans = 447770

# Cron
## Question 2. Scheduling with Cron

Cron is a common scheduling specification for workflows. 

Using the flow in `etl_web_to_gcs.py`, create a deployment to run on the first of every month at 5am UTC. What’s the cron schedule for that?

- `0 5 1 * *`
- `0 0 5 1 *`
- `5 * 1 0 *`
- `* * 5 1 0`

![Alt text](images/cron_v.png "Cron")
![Alt text](images/cron_u.png "Cron")

Courtesy of https://crontab.guru/#0_5_1_1-12_*

# Ans = `0 5 1 * *`

## Question 3. Loading data to BigQuery 

Using `etl_gcs_to_bq.py` as a starting point, modify the script for extracting data from GCS and loading it into BigQuery. This new script should not fill or remove rows with missing values. (The script is really just doing the E and L parts of ETL).

The main flow should print the total number of rows processed by the script. Set the flow decorator to log the print statement.

Parametrize the entrypoint flow to accept a list of months, a year, and a taxi color. 

Make any other necessary changes to the code for it to function as required.

Create a deployment for this flow to run in a local subprocess with local flow code storage (the defaults).

Make sure you have the parquet data files for Yellow taxi data for Feb. 2019 and March 2019 loaded in GCS. Run your deployment to append this data to your BiqQuery table. How many rows did your flow code process?

- 14,851,920
- 12,282,990
- 27,235,753
- 11,338,483

In [5]:
etl_web_to_gcs(months =[2,3], year=2019, color="yellow")

data/yellow/ Folder already exists


data/yellow/ Folder already exists


[Completed(message=None, type=COMPLETED, result=         VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
 0               1  2019-02-01 00:59:04   2019-02-01 01:07:27                1   
 1               1  2019-02-01 00:33:09   2019-02-01 01:03:58                1   
 2               1  2019-02-01 00:09:03   2019-02-01 00:09:16                1   
 3               1  2019-02-01 00:45:38   2019-02-01 00:51:10                1   
 4               1  2019-02-01 00:25:30   2019-02-01 00:28:14                1   
 ...           ...                  ...                   ...              ...   
 7019370         2  2019-02-28 23:29:08   2019-02-28 23:29:11                1   
 7019371         2  2019-02-28 22:48:47   2019-02-28 23:50:19                1   
 7019372         2  2019-02-28 23:41:23   2019-02-28 23:42:23                1   
 7019373         2  2019-02-28 23:12:52   2019-02-28 23:14:16                1   
 7019374         2  2019-02-28 23:10:35   2019-02-2

In [12]:
@task(retries=3)
def extract_from_gcs(color: str, year: int, month: int) -> Path:
    """Download trip data from GCS"""
    gcs_path = f"data/{color}/{color}_tripdata_{year}-{month:02}.parquet"
    gcs_block = GcsBucket.load("zoom-gcs")
    gcs_block.get_directory(from_path=gcs_path, local_path=f"gcp_data/")
    return Path(gcs_path)


# @task()
# def transform(path: Path) -> pd.DataFrame:
#     """Data cleaning example"""
#     df = pd.read_parquet(path)
#     print(f"pre: missing passenger count: {df['passenger_count'].isna().sum()}")
#     df["passenger_count"].fillna(0, inplace=True)
#     print(f"post: missing passenger count: {df['passenger_count'].isna().sum()}")
#     return df


@task()
def write_bq(df: pd.DataFrame) -> None:
    """Write DataFrame to BiqQuery"""

    gcp_credentials_block = GcpCredentials.load("zoom-gcp-creds")

    df.to_gbq(
        destination_table="dezoomcamp.rides",
        project_id="esoteric-pen-376110",
        credentials=gcp_credentials_block.get_credentials_from_service_account(),
        chunksize=500_000,
        if_exists="append",
    )

@flow(log_prints=True)
def etl_gcs_to_bq( months: list[int] = [2, 3], year: int = 2019, color: str = "yellow"):
    """Main ETL flow to load data into Big Query"""
    # color = "yellow"
    # year = 2021
    # month = 1
    no_of_rows = 0

    for month in months:
        
        path = extract_from_gcs(color, year, month)
        
        df = pd.read_parquet(path)
        # df = transform(path)
        print(f"rows: {len(df)}")
        no_of_rows+=len(df)
        write_bq(df)

    print("Total number of rows processed:", no_of_rows)

if __name__ == "__main__":
    
    etl_gcs_to_bq()
    # dep = Deployment.build_from_flow(
    # flow=etl_gcs_to_bq,
    # name="gcs_to_bq"
    # )

    # dep.apply()


 `@task(name='my_unique_name', ...)`

 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`
