# first run (21/03/2025)
- year 2009
- one base existing in this year, each month around 500mb with 8 minutes total time
- Using 1 Driver 14 GB Memory, 4 Cores

Done with this one, did some parametrizations but there`s a request limit to download. Also I saw that they have an api on NYC data. Moving to study with dbdemos instead.

In [0]:
# Ensure you are running this in a Databricks notebook
secret = dbutils.secrets.get(scope="kvlagodedados", key="blobstorage")
display(secret)

In [0]:
from datetime import datetime
from azure.storage.blob import BlobServiceClient
import requests
from concurrent.futures import ThreadPoolExecutor
from retry import retry

class IngestorTaxiData:
    def __init__(self, path_to_json):
        self.parameters_dict, self.url_dict = self.get_parameters(path_to_json)
        self.blob_service_client = BlobServiceClient(
            account_url=f"https://{self.parameters_dict['storage_account_name']}.blob.core.windows.net",
            credential=self.parameters_dict['storage_account_key']
        )

    def get_parameters(self, path_to_json):
        # reading parameters from json
        df = spark.read.option("multiline", "true").json(path_to_json)
        df = df.selectExpr("parameters.*")
        # setting up parameters dict
        years = list(range(2009, datetime.now().year + 1))
        months = [f"{m:02d}" for m in range(1, 13)]  # 01 to 12
        parameters_dict = df.first().asDict()
        parameters_dict['years'] = years
        parameters_dict['months'] = months
        # dict to iterate throught all urls that are needed to download
        url_dict = {
            f"staging/{trip}/{year}/{month}/{trip}_tripdata_{year}-{month}.parquet":
            f"{parameters_dict['base_url']}{trip}_tripdata_{year}-{month}.parquet"
            for year in parameters_dict['years'] for month in parameters_dict['months'] for trip in parameters_dict['trip_types']
        }
        # base url
        return parameters_dict, url_dict

    def error_handler(self, response, path_file, file_url, file_data):
        # Check HTTP response status and validate file content
        error_message = f"Failed to get data from {file_url}: HTTP {response.status_code}" if response.status_code != 200 else f"Failed to get data from: {file_data.decode('utf-8')}"
        log_path_file = f"staging/logs/upload_errors_staging_{path_file.replace('/', '_')}_{datetime.now().strftime('%Y%m%d%H%M%S')}.log"
        log_blob_client = self.blob_service_client.get_blob_client(container=self.parameters_dict['container_name'], blob=log_path_file)
        log_content = f"Error: {str(error_message)}\nURL: {file_url}\n"
        log_blob_client.upload_blob(log_content, overwrite=True)
        raise Exception(error_message)

    def upload_file(self, blob_client, file_data):
        blob_client.upload_blob(file_data, overwrite=True)

    @retry(tries=3, delay=2)
    def get_data(self, path_file, file_url):
        try:
            # Check if the blob already exists
            blob_client = self.blob_service_client.get_blob_client(
                container=self.parameters_dict['container_name'], blob=path_file
            )
            if blob_client.exists():
                print(f"Blob {path_file} already exists. Skipping download and upload.")
                return

            # Download the file from the URL
            response = requests.get(file_url)
            file_data = response.content

            # Handle errors
            if response.status_code != 200 or b"<Error>" in file_data:
                self.error_handler(response, path_file, file_url, file_data)
            
            # Upload the file to Azure Blob Storage
            self.upload_file(blob_client, file_data)
            print(f"Uploaded {path_file} successfully.")
        except Exception as e:
            print(e)

    def ingestor(self, num_workers):
        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            futures = [executor.submit(self.get_data, path_file, file_url) for path_file, file_url in self.url_dict.items()]
            for future in futures:
                future.result()
        print("File upload process completed.")

In [0]:
# Usage in parallel
path_to_json = "abfss://lagodedadosv1@lagodedadosalttab.dfs.core.windows.net/parametros/parameters.json"
data_ingestor = IngestorTaxiData(path_to_json)
data_ingestor.ingestor(num_workers=2)

In [0]:
df = spark.read.text("abfss://lagodedadosv1@lagodedadosalttab.dfs.core.windows.net/staging/*")
display(df.where("value like '%https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2013-08.parquet%'"))

# Next steps: 

- Hide secret key using key vault (done)
- parametrize everything (done)
- improve ingestion speed (parallelize) (done but we have request limits)
- add meta data (stop this project)