This is the extraction of the data from the AEMET API and saving it to our own csv for transforming.
The GET method for the stations returns an url with a json format that we can read with the urllib library. 
Check the decode function since otherwise it returns an error with the coding (utf-8 didn't work either).

In [None]:
import pandas as pd
import requests
import json
from urllib.request import urlopen
from dotenv import load_dotenv
import os

url = "https://opendata.aemet.es/opendata/api/valores/climatologicos/inventarioestaciones/todasestaciones"

#This method allows us to access the .env file and load our env variables
load_dotenv()

api_key = os.environ.get('api_key')
path = os.environ.get('path')

querystring = {"api_key":api_key}

headers = {
    'cache-control': "no-cache"
    }

response = requests.request("GET", url, headers=headers, params=querystring)

if response.ok:
    data = response.json()
    print(data['datos'])


# store the URL in url as 
# parameter for urlopen
url = data['datos']
  
# store the response of URL
response = urlopen(url)
  
# storing the JSON response 
# from url in data
data_json = json.loads(response.read().decode('ISO-8859-1'))
  
# moving data to csv

table_stations = pd.json_normalize(data_json)
table_stations.to_csv(f"{path}estaciones.csv",sep=";")

In [None]:
import pandas as pd
import requests
import json
import urllib
from urllib.request import urlopen
import sys
import os

import time

from datetime import date, timedelta
from dateutil.relativedelta import relativedelta

import dask.dataframe as da

from dotenv import load_dotenv

import uuid


#This function will allow us to customize the name of the parquet files

def batch_id(n):
    id = uuid.uuid4()
    return f"part-{n}-{id}.parquet"

table = pd.DataFrame()


def save_data(data):

    # store the URL in url as 
    # parameter for urlopen
    # the data has to have 4 fields; otherwise the API is sending back an error

    if len(data) == 4:
        url = data['datos']
        
        # store the response of URL

        try:
            response = urllib.request.urlopen(url)
            if str(response.code).startswith('2'):
            # storing the JSON response 
            # from url in data
                data_json = json.loads(response.read().decode('ISO-8859-1'))
                aux_table = pd.json_normalize(data_json)
                return aux_table
        except urllib.error.HTTPError as err:
            print(f'A HTTPError was thrown: {err.code} {err.reason}')
            #usually the main error is a 500, so we wait because we're doing too many requests to the server
            time.sleep(120)

    else:
        return None
    

def data_extract():
    global table
    data = response.json()
    aux_table = save_data(data)
    #This allow you to see what's going on, especially in long executions such as extracting the historical data
    print("month: " + str(single_date.month) + " |--| table size:" + str(table.shape))
    if aux_table is not None:
        if aux_table.shape[0] > 0:
            table = pd.concat([table, aux_table])


#We already accessed our .env variable in the first cell of the notebook
#Nonetheless, we might want to just execute this part of the code and not extract the whole catalogue of stations
#This method allows us to access the .env file and load our env variables
load_dotenv()

api_key = os.environ.get('api_key')
path = os.environ.get('path')

querystring = {"api_key":api_key}


headers = {
    'cache-control': "no-cache"
    }


#We're going to extract the historical data and create a file per year
year_df = pd.date_range(start='1/1/1920', end='10/1/2023', freq='YS')  
for single_year in year_df:
    
    start_year = single_year
    end_year = single_year + relativedelta(years = 1)- timedelta(days = 1)

    d = pd.date_range(start=str(start_year.date()), end=str(end_year.date()), freq='MS') 
    
    for single_date in d:
        start_date = single_date
        end_date = single_date + relativedelta(months = 1) - timedelta(days = 1)

        url = "https://opendata.aemet.es/opendata/api/valores/climatologicos/diarios/datos/fechaini/" + start_date.strftime("%Y-%m-%dT%H%%3A%M%%3A%SUTC") + "/fechafin/" + end_date.strftime("%Y-%m-%dT23%%3A59%%3A59UTC") + "/todasestaciones"
        
        response = requests.request("GET", url, headers=headers, params=querystring)

        if response.ok:
           data_extract()
        else:
            print(response.reason)
            time.sleep(60)
            response = requests.request("GET", url, headers=headers, params=querystring)
            if response.ok:
                data_extract()
            else:
                sys.exit("Exit process - not completed")
    
    time.sleep(15)
        
 
    ddf = da.from_pandas(table, chunksize=150000)
    save_dir = f"{path}parquets/"
    
    #We use this lambda to create a custom filename for our parquet files
    name_function = lambda x: f"data-{single_year.year}.parquet"  
    
    ddf.to_parquet(save_dir,name_function=name_function)
    table.drop(table.index , inplace=True)


In [None]:
#We want to update weekly our data, so we're doing something similar
#to the historical data process but relative to the current date
#DO NOT USE THIS CODE JUST YET

import pandas as pd
import requests
import json
import urllib
from urllib.request import urlopen
import sys

import time

from datetime import date, timedelta
from dateutil.relativedelta import relativedelta

import dask.dataframe as da

from dotenv import load_dotenv

table = pd.DataFrame()


def save_data(data):

    # store the URL in url as 
    # parameter for urlopen
    # the data has to have 4 fields; otherwise the API is sending back an error

    if len(data) == 4:
        url = data['datos']
        
        # store the response of URL

        try:
            response = urllib.request.urlopen(url)
        except urllib.error.HTTPError as err:
            print(f'A HTTPError was thrown: {err.code} {err.reason}')
            #usually the main error is a 500, so we wait because we're doing too many requests to the server
            time.sleep(60)


        if str(response.code).startswith('2'):
            # storing the JSON response 
            # from url in data
            data_json = json.loads(response.read().decode('ISO-8859-1'))
            aux_table = pd.json_normalize(data_json)
            return aux_table
    else:
        return None
    

def data_extract():
    global table
    data = response.json()
    aux_table = save_data(data)
    #print("month: " + str(single_date.month) + " |--| table size:" + str(table.shape))
    if aux_table is not None:
        if aux_table.shape[0] > 0:
            table = pd.concat([table, aux_table])


#We already accessed our .env variable in the first cell of the notebook
#Nonetheless, we might want to just execute this part of the code and not extract the whole catalogue of stations
#This method allows us to access the .env file and load our env variables
load_dotenv()

api_key = os.environ.get('api_key')
path = os.environ.get('path')

querystring = {"api_key":api_key}


headers = {
    'cache-control': "no-cache"
    }


#OJO PORQUE SOBREESCRIBE LOS PARQUETS QUE HAY
#url = "https://opendata.aemet.es/opendata/api/valores/climatologicos/diarios/datos/fechaini/" + start_date.strftime("%Y-%m-%dT%H%%3A%M%%3A%SUTC") + "/fechafin/" + end_date.strftime("%Y-%m-%dT23%%3A59%%3A59UTC") + "/todasestaciones"
url = "https://opendata.aemet.es/opendata/api/valores/climatologicos/diarios/datos/fechaini/2023-09-26T00:00:00UTC/fechafin/2023-10-01T23:59:59UTC/todasestaciones"

response = requests.request("GET", url, headers=headers, params=querystring)

if response.ok:
    data_extract()
else:
    print(response.reason)
    time.sleep(60)
    response = requests.request("GET", url, headers=headers, params=querystring)
    if response.ok:
        data_extract()
    else:
        sys.exit("Exit process - not completed")

#file_name = path + str(single_year.year) + "datos_climatologicos.parquet"
#file_name = path + "nuevos_datos_climatologicos.parquet"
table.to_csv(f"{path}parquets/nuevos2/prueba2.csv", sep=";", index=False)

aux_df = pd.read_csv(f"{path}parquets/nuevos2/prueba2.csv", sep=";", index_col=False)

#ddf = da.from_pandas(table, chunksize=50000)
ddf = da.from_pandas(aux_df, chunksize=50000)
save_dir = f"{path}parquets/nuevos2"
ddf.to_parquet(save_dir)
table.drop(table.index , inplace=True)


In [None]:
# Uploading the parquets to Google Cloud
# We're using the transfer_manager method

import os

from pathlib import Path
from google.cloud.storage import Client, transfer_manager

# First, you have to install the google cloud sdk -> https://cloud.google.com/sdk/docs/install
# Second, you have to initialize it and log in (manually)
# You also have to set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of your json file

# When you're initializing the whole thing it will ask you to select your project, keep that information at hand
# You have to have permissions in Google Cloud for your user, so if it crashes because of it, 
# go to the Cloud Console and create/ the user to give it the role you need

def upload_directory_with_transfer_manager(bucket_name, source_directory):
    """Upload every file in a directory, including all files in subdirectories.

    Each blob name is derived from the filename, not including the `directory`
    parameter itself. For complete control of the blob name for each file (and
    other aspects of individual blob metadata), use
    transfer_manager.upload_many() instead.
    """

    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The directory on your computer to upload. Files in the directory and its
    # subdirectories will be uploaded. An empty string means "the current
    # working directory".
    # source_directory=""

    #HAD TO REMOVE THIS IN JUPYTER, DOESN'T WORK WITH MULTIPROCESSING
    # The maximum number of processes to use for the operation. The performance
    # impact of this value depends on the use case, but smaller files usually
    # benefit from a higher number of processes. Each additional process occupies
    # some CPU and memory resources until finished. Threads can be used instead
    # of processes by passing `worker_type=transfer_manager.THREAD`.
    # workers=8

    storage_client = Client()
    bucket = storage_client.bucket(bucket_name)

    # Generate a list of paths (in string form) relative to the `directory`.
    # This can be done in a single list comprehension, but is expanded into
    # multiple lines here for clarity.

    # First, recursively get all files in `directory` as Path objects.
    directory_as_path_obj = Path(source_directory)
    paths = directory_as_path_obj.rglob("*")

    # Filter so the list only includes files, not directories themselves.
    file_paths = [path for path in paths if path.is_file()]

    # These paths are relative to the current working directory. Next, make them
    # relative to `directory`
    relative_paths = [path.relative_to(source_directory) for path in file_paths]

    # Finally, convert them all to strings.
    string_paths = [str(path) for path in relative_paths]

    print("Found {} files.".format(len(string_paths)))

    # Start the upload.
    results = transfer_manager.upload_many_from_filenames(
        bucket, string_paths, source_directory=source_directory
    )

    for name, result in zip(string_paths, results):
        # The results list is either `None` or an exception for each filename in
        # the input list, in order.

        if isinstance(result, Exception):
            print("Failed to upload {} due to exception: {}".format(name, result))
        else:
            print("Uploaded {} to {}.".format(name, bucket.name))


# To begin the transfer, we need access to the .json file that allows us 
# to connect your script to Google Cloud Platform. Access the .env file to obtain the route
# and also the name of the bucket where you want to load your parquet files

file = os.environ['GOOGLE_APPLICATION_CREDENTIALS']  
directory_in_str = os.environ['path_parquet']
bucket_name = os.environ['bucket_name']
upload_directory_with_transfer_manager(bucket_name, directory_in_str)

In [2]:
# We have our data stored in the Google Cloud
# Now we move it to BQ
# First, if you haven't already, give access to your user in BQ by "Sharing" in your project and adding your user/role
# Then, you can execute this script. In the .env store your uri and your BigQuery table ID

from google.cloud import bigquery
from dotenv import load_dotenv

load_dotenv()

# Construct a BigQuery client object.
client = bigquery.Client()
table_id = os.environ.get('table_id')
uri = os.environ.get('uri')

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
)

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

Loaded 4075207 rows.
