-
Notifications
You must be signed in to change notification settings - Fork 0
/
from_web_to_gcs_orc.py
63 lines (55 loc) · 2.39 KB
/
from_web_to_gcs_orc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import os
import pandas as pd
from pathlib import Path
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket
dtype_dict = {'dispatching_base_num': str,
'pickup_datetime': str,
'dropOff_datetime': str,
'PUlocationID': 'float64',
'DOlocationID': 'float64',
'SR_Flag': 'float64',
'Affiliated_base_number': str}
@task(retries=3, log_prints=True)
def fetch(dataset_url: str)-> pd.DataFrame:
df = pd.read_csv(dataset_url, dtype= dtype_dict)
print(f'Dataframe loaded for: {dataset_url}')
return df
@task()
def write_local(df: pd.DataFrame, taxi_data_tag: str, dataset_file: str) -> Path:
"""Write DataFrame out locally as orc file"""
import os
folder_path = f"data/{taxi_data_tag}"
if not os.path.exists(folder_path):
print(f"Destination path : {folder_path} does not exist. Creating folder.")
os.makedirs(folder_path)
print(f'Folder (with sub-folder) created: {folder_path}')
path = Path(f"{folder_path}/{dataset_file}.orc")
df.to_orc(path=path)
return path
@task(log_prints=True)
def write_gcs(path: str) -> None:
"""Upload local orc file to GCS"""
gcs_block = GcsBucket.load("de-zoomcamp-gcs")
gcs_block.upload_from_path(from_path=path, to_path=path)
return
@flow(log_prints=True)
def fetch_from_web_to_gcs(taxi_data_tag : str, year: int, month: int, save_dir : str = 'data/') -> None:
"""The main ETL function"""
dataset_file = f"{taxi_data_tag}_tripdata_{year}-{month:02}"
dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{taxi_data_tag}/{dataset_file}.csv.gz"
# save_dir_with_subfolder =f'{save_dir}{taxi_data_tag}/'
df = fetch(dataset_url=dataset_url)
path = write_local(df=df, taxi_data_tag=taxi_data_tag, dataset_file=dataset_file) # Path(f'{save_dir_with_subfolder}{dataset_file}.csv.gz')
write_gcs(path = path)
print(f"GCS write successful for {dataset_file}.")
@flow(log_prints=True, name = 'orc_compression_test')
def parent_fetch_flow(months: list[int], year: int, taxi_data_tag : str):
for month in months:
fetch_from_web_to_gcs(taxi_data_tag=taxi_data_tag, month=month, year=year, save_dir = 'data/')
print(f'Finished data EL for month number: {month}')
if __name__ == "__main__":
months = list(range(1,13,1))
year = 2019
taxi_data_tag = 'fhv'
parent_fetch_flow(months=months, year= year, taxi_data_tag= taxi_data_tag)