In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from google.cloud import storage
from google.cloud import bigquery
import os   

# Define constants
PROJECT_ID = "de-data-422410"
BQ_DATASET = "nyc_citibike_data"
BQ_TABLENAME = "citibike_tripsdata_raw"
PARTITIONED_TABLE = "citibike_tripsdata"
PARTITION_COL = "started_at"
GCS_BUCKET = "hauct_nyc_project"

# Create a SparkSession
spark = SparkSession.builder.appName("ETL").getOrCreate()

In [23]:
from google.cloud import storage

gcs_client = storage.Client()
bucket = gcs_client.bucket(GCS_BUCKET)

In [2]:
def fetch(dataset_url: str) -> str:
    # Download the file using wget
    os.system(f"wget {dataset_url}")
    return dataset_url.split("/")[-1]

def clean(file_path: str) -> pd.DataFrame:
    """
    Cleans the DataFrame by setting correct data types for each column.
    """
    # Extract date value from file path
    raw_date_value = file_path.split("_")[-1].split(".")[0]

    # Read CSV file
    df = pd.read_csv(file_path)

    # Rename columns
    df = df.rename(columns={
        'VendorID': 'vendor_id',
        'RatecodeID': 'rate_code_id',
        'PULocationID': 'pu_location_id',
        'DOLocationID': 'do_location_id'
    })

    # Convert data types
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
    df['passenger_count'] = df['passenger_count'].astype(str)
    df['trip_distance'] = df['trip_distance'].astype(float)
    df['rate_code_id'] = df['rate_code_id'].astype(str)
    df['pu_location_id'] = df['pu_location_id'].astype(str)
    df['do_location_id'] = df['do_location_id'].astype(str)
    df['payment_type'] = df['payment_type'].astype(str)
    df['fare_amount'] = df['fare_amount'].astype(float)
    df['extra'] = df['extra'].astype(float)
    df['mta_tax'] = df['mta_tax'].astype(float)
    df['tip_amount'] = df['tip_amount'].astype(float)
    df['tolls_amount'] = df['tolls_amount'].astype(float)
    df['improvement_surcharge'] = df['improvement_surcharge'].astype(float)
    df['total_amount'] = df['total_amount'].astype(float)
    df['congestion_surcharge'] = df['congestion_surcharge'].astype(float)

    # Add new column
    df['raw_month'] = raw_date_value

    return df

def write_to_gcs(file, dest_file) -> None:
    """
    Writes a local file to a Google Cloud Storage (GCS) bucket.
    
    Args:
    - file: Path of the local file to be uploaded.
    - dest_file: Path in the GCS where the file will be saved.
    """
    gcs_block = GcsBucket.load(GCS_BUCKET)
    gcs_block.upload_from_path(from_path=file, to_path=dest_file)
    return

In [3]:
def write_gbq(df):
    """
    Writes the DataFrame to a Google BigQuery table.
    """
    df.write.format('bigquery') \
        .option('table', f"{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLENAME}") \
        .mode('append') \
        .save()

In [30]:
from google.cloud import storage

In [16]:
year = 2019
month = 1

dataset_file = f"yellow_tripdata_{year}-{month:02}"

dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/{dataset_file}.csv.gz"

In [17]:
dataset_url

'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2019-01.csv.gz'

In [18]:
file_path = fetch(dataset_url)

In [1]:
import pandas as pd
df = pd.read_csv('yellow_tripdata_2019-01.csv.gz')
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.5,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,
1,1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.6,1,N,239,246,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,
2,2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.0,1,N,236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,
3,2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,0.0,1,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,7.55,
4,2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,0.0,2,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,


In [4]:
from google.cloud import storage
path_to_private_key = 'D:\code\github\de-gcp-zoomcamp-project\credentials\de-data-422410-3863bd49070c.json'
client = storage.Client.from_service_account_json(json_credentials_path=path_to_private_key)

In [5]:
bucket = client.bucket('hauct_nyc_project')
# The name assigned to the CSV file on GCS
blob = bucket.blob('yellow_tripdata_2019-01.parquet')

blob.upload_from_filename('yellow_tripdata_2019-01.parquet')