# Download Tables from Data Rio Data Lake

---

## Tables in the Data Lake

- **adm_cor_comando**
  - ocorrencias: Data on city event records from Sistema Comando.

- **clima_estacao_meteorologica**
  - estacoes_alertario: Data on meteorological stations in the Alertario network.
  - estacoes_alertario_met: Metadata about meteorological stations in Alertario.
  - estacoes_inmet: Data from INMET meteorological stations.
  - estacoes_redemet: Data from REDEMET meteorological stations.
  - intensidade_vento: Wind intensity measurements from various meteorological stations.
  - meteorologia_alertario: Meteorological data from Alertario stations.
  - meteorologia_inmet: Meteorological data from INMET stations.
  - meteorologia_redemet: Meteorological data from REDEMET stations.

- **clima_fluviometro**
  - estacoes_inea: Data from INEA fluviometric stations.
  - lamina_agua_inea: Water depth measurements from INEA stations.

- **clima_pluviometro**
  - estacoes_alertario: Data from precipitation monitoring stations in Alertario.
  - estacoes_cemaden: Data from Cemaden precipitation stations.
  - estacoes_inea: Data from INEA precipitation stations.
  - estacoes_websirene: Data from Websirene precipitation stations.
  - taxa_precipitacao_alertario: Precipitation rate data from Alertario.
  - taxa_precipitacao_alertario_5min: Five-minute interval precipitation rate from Alertario.
  - taxa_precipitacao_cemaden: Precipitation rate data from Cemaden stations.
  - taxa_precipitacao_inea: Precipitation rate data from INEA stations.
  - taxa_precipitacao_websirene: Precipitation rate data from Websirene stations.

- **clima_radar**
  - taxa_precipitacao_guaratiba: Precipitation rate data from Guaratiba radar.

- **dados_mestres**
  - bairro: Geographic and administrative data about neighborhoods.

- **rionowcast**
  - grid_points: Grid-based forecast model data for real-time weather prediction.
  - radar: Radar data for real-time weather monitoring.
  - rain_gauge: Data from rain gauges used for precipitation measurement.


---

## Initial Set Up

### Install Modules

Uncomment to install.

In [7]:
# !pip install pandas google-cloud-bigquery google-auth

### Import Modules

In [5]:
import os
import sys
import pandas as pd
from google.cloud import bigquery
from google.oauth2 import service_account
import csv

### Define function to download single table

In [12]:
def fetch_bigquery_table(table_path: str, service_account_path: str, limit: int = 1000):
    """
    Fetch data from a BigQuery table.
    
    Parameters:
        table_path (str): Full BigQuery table path in the format `project.dataset.table`.
        service_account_path (str): Path to the service account JSON file.
        limit (int): Number of rows to fetch. Default is 1000.
    
    Returns:
        pd.DataFrame: A pandas DataFrame containing the query results.
    """
    google_credentials = service_account.Credentials.from_service_account_file(service_account_path)
    client = bigquery.Client(credentials=google_credentials)
    
    query = f'SELECT * FROM `{table_path}`'
    if limit is not None:
        query += f' LIMIT {limit}'

    job = client.query(query)
    results = job.result()
    
    rows = []
    total_rows = results.total_rows if results.total_rows else limit
    
    for i, row in enumerate(results):
        rows.append(dict(row))
        if (i + 1) % 10000 == 0 or (i + 1) == total_rows:
            sys.stdout.write(f'\rProcessed: {i + 1}/{total_rows}')
            sys.stdout.flush()
    
    print("\nQuery completed.")
    return pd.DataFrame(rows)

import sys
import csv
import pandas as pd
from google.cloud import bigquery
from google.oauth2 import service_account

def fetch_and_save_bigquery_table(table_path: str, service_account_path: str, output_file_path: str, limit: int = 1000, batch_size: int = 10000):
    """
    Fetch data from a BigQuery table and progressively write rows in batches to a CSV file.

    Parameters:
        table_path (str): Full BigQuery table path in the format `project.dataset.table`.
        service_account_path (str): Path to the service account JSON file.
        output_file_path (str): Path to the output CSV file where rows will be written.
        limit (int): Number of rows to fetch. Default is 1000.
        batch_size (int): Number of rows to accumulate before writing to disk. Default is 10,000.

    Returns:
        pd.DataFrame: A pandas DataFrame containing the query results (loaded from the CSV file).
    """
    # Initialize BigQuery client using the provided service account.
    google_credentials = service_account.Credentials.from_service_account_file(service_account_path)
    client = bigquery.Client(credentials=google_credentials)
    
    # Build the query string.
    query = f"SELECT * FROM `{table_path}`"
    if limit is not None:
        query += f" LIMIT {limit}"

    job = client.query(query)
    results = job.result()
    
    total_rows = results.total_rows if results.total_rows else limit

    # Open the output CSV file.
    with open(output_file_path, mode="w", newline="", encoding="utf-8") as csvfile:
        writer = None
        batch_rows = []
        for i, row in enumerate(results):
            row_dict = dict(row)
            # Initialize the CSV writer with header based on the first row.
            if writer is None:
                writer = csv.DictWriter(csvfile, fieldnames=list(row_dict.keys()))
                writer.writeheader()
            batch_rows.append(row_dict)
            
            # Write rows in batches.
            if len(batch_rows) >= batch_size:
                writer.writerows(batch_rows)
                batch_rows = []
                sys.stdout.write(f'\rProcessed: {i + 1}/{total_rows}')
                sys.stdout.flush()
        
        # Write any remaining rows.
        if batch_rows:
            writer.writerows(batch_rows)
            sys.stdout.write(f'\rProcessed: {i + 1}/{total_rows}')
            sys.stdout.flush()
    
    print("\nQuery completed.")
    

---

## Download Tables One by One

### Define Bigquery Credentials

In [7]:
project_id = 'octacity'
service_account_path = '../../auth/octacity-3da367fcc38d.json'

### Define Output Directory

In [8]:
data_directory = '../../../../data/meteorologia/raw'

### Create Directories for Tables

In [9]:
datasets = [
    'adm_cor_comando',
    'clima_estacao_meteorologica',
    'clima_fluviometro',
    'clima_pluviometro',
    'clima_radar',
    'dados_mestres',
    'rionowcast'
]

for dataset in datasets:
    os.makedirs(f'{data_directory}/{dataset}', exist_ok=True)

### Download Tables

#### adm_cor_comando.ocorrencias

In [9]:
df0 = fetch_bigquery_table("octacity.adm_cor_comando.ocorrencias", service_account_path, limit=None)
df0.to_csv(f"{data_directory}/adm_cor_comando/ocorrencias.csv", index=False)

Processed: 87620/87620
Query completed.


#### clima_estacao_meteorologica.estacoes_alertario

In [17]:
df1 = fetch_bigquery_table("octacity.clima_estacao_meteorologica.estacoes_alertario", service_account_path, limit=None)
df1.to_csv(f"{data_directory}/clima_estacao_meteorologica/estacoes_alertario.csv", index=False)

Processed: 8/8
Query completed.


#### clima_estacao_meteorologica.estacoes_alertario_met

In [18]:
df2 = fetch_bigquery_table("octacity.clima_estacao_meteorologica.estacoes_alertario_met", service_account_path, limit=None)
df2.to_csv(f"{data_directory}/clima_estacao_meteorologica/estacoes_alertario_met.csv", index=False)

Processed: 8/8
Query completed.


#### clima_estacao_meteorologica.estacoes_inmet

In [19]:
df3 = fetch_bigquery_table("octacity.clima_estacao_meteorologica.estacoes_inmet", service_account_path, limit=None)
df3.to_csv(f"{data_directory}/clima_estacao_meteorologica/estacoes_inmet.csv", index=False)

Processed: 9/9
Query completed.


#### clima_estacao_meteorologica.estacoes_redemet

In [20]:
df4 = fetch_bigquery_table("octacity.clima_estacao_meteorologica.estacoes_redemet", service_account_path, limit=None)
df4.to_csv(f"{data_directory}/clima_estacao_meteorologica/estacoes_redemet.csv", index=False)

Processed: 4/4
Query completed.


#### clima_estacao_meteorologica.intensidade_vento

In [21]:
df5 = fetch_bigquery_table("octacity.clima_estacao_meteorologica.intensidade_vento", service_account_path, limit=None)
df5.to_csv(f"{data_directory}/clima_estacao_meteorologica/intensidade_vento.csv", index=False)

Processed: 4/4
Query completed.


#### clima_estacao_meteorologica.meteorologia_alertario

In [22]:
df6 = fetch_bigquery_table("octacity.clima_estacao_meteorologica.meteorologia_alertario", service_account_path, limit=None)
df6.to_csv(f"{data_directory}/clima_estacao_meteorologica/meteorologia_alertario.csv", index=False)

Processed: 175728/175728
Query completed.


#### clima_estacao_meteorologica.meteorologia_inmet

In [23]:
df7 = fetch_bigquery_table("octacity.clima_estacao_meteorologica.meteorologia_inmet", service_account_path, limit=None)
df7.to_csv(f"{data_directory}/clima_estacao_meteorologica/meteorologia_inmet.csv", index=False)

Processed: 1110438/1110438
Query completed.


#### clima_estacao_meteorologica.meteorologia_redemet

In [24]:
df8 = fetch_bigquery_table("octacity.clima_estacao_meteorologica.meteorologia_redemet", service_account_path, limit=None)
df8.to_csv(f"{data_directory}/clima_estacao_meteorologica/meteorologia_redemet.csv", index=False)

Processed: 49598/49598
Query completed.


#### clima_fluviometro.estacoes_inea

In [25]:
df9 = fetch_bigquery_table("octacity.clima_fluviometro.estacoes_inea", service_account_path, limit=None)
df9.to_csv(f"{data_directory}/clima_fluviometro/estacoes_inea.csv", index=False)

Processed: 2/2
Query completed.


#### clima_fluviometro.lamina_agua_inea

In [26]:
df10 = fetch_bigquery_table("octacity.clima_fluviometro.lamina_agua_inea", service_account_path, limit=None)
df10.to_csv(f"{data_directory}/clima_fluviometro/lamina_agua_inea.csv", index=False)

Processed: 15049/15049
Query completed.


#### clima_pluviometro.estacoes_alertario

In [27]:
df11 = fetch_bigquery_table("octacity.clima_pluviometro.estacoes_alertario", service_account_path, limit=None)
df11.to_csv(f"{data_directory}/clima_pluviometro/estacoes_alertario.csv", index=False)

Processed: 37/37
Query completed.


#### clima_pluviometro.estacoes_cemaden

In [28]:
df12 = fetch_bigquery_table("octacity.clima_pluviometro.estacoes_cemaden", service_account_path, limit=None)
df12.to_csv(f"{data_directory}/clima_pluviometro/estacoes_cemaden.csv", index=False)

Processed: 28/28
Query completed.


#### clima_pluviometro.estacoes_inea

In [29]:
df13 = fetch_bigquery_table("octacity.clima_pluviometro.estacoes_inea", service_account_path, limit=None)
df13.to_csv(f"{data_directory}/clima_pluviometro/estacoes_inea.csv", index=False)

Processed: 6/6
Query completed.


#### clima_pluviometro.estacoes_websirene

In [30]:
df14 = fetch_bigquery_table("octacity.clima_pluviometro.estacoes_websirene", service_account_path, limit=None)
df14.to_csv(f"{data_directory}/clima_pluviometro/estacoes_websirene.csv", index=False)

Processed: 83/83
Query completed.


#### clima_pluviometro.taxa_precipitacao_alertario

In [14]:
# df15 = fetch_bigquery_table("octacity.clima_pluviometro.taxa_precipitacao_alertario", service_account_path, limit=None)
# df15.to_csv(, index=False)
fetch_and_save_bigquery_table(
    table_path="octacity.clima_pluviometro.taxa_precipitacao_alertario",
    service_account_path=service_account_path,
    output_file_path=f'{data_directory}/clima_pluviometro/taxa_precipitacao_alertario.csv',
    limit=None,
    batch_size=100000
)

Processed: 30670856/30670856
Query completed.


#### clima_pluviometro.taxa_precipitacao_alertario_5min

In [None]:
df16 = fetch_bigquery_table("octacity.clima_pluviometro.taxa_precipitacao_alertario_5min", service_account_path, limit=None)
df16.to_csv(f"{data_directory}/clima_pluviometro/taxa_precipitacao_alertario_5min.csv", index=False)

#### clima_pluviometro.taxa_precipitacao_cemaden

In [None]:
df17 = fetch_bigquery_table("octacity.clima_pluviometro.taxa_precipitacao_cemaden", service_account_path, limit=None)
df17.to_csv(f"{data_directory}/clima_pluviometro/taxa_precipitacao_cemaden.csv", index=False)

#### clima_pluviometro.taxa_precipitacao_inea

In [None]:
df18 = fetch_bigquery_table("octacity.clima_pluviometro.taxa_precipitacao_inea", service_account_path, limit=None)
df18.to_csv(f"{data_directory}/clima_pluviometro/taxa_precipitacao_inea.csv", index=False)

#### clima_pluviometro.taxa_precipitacao_websirene

In [None]:
df19 = fetch_bigquery_table("octacity.clima_pluviometro.taxa_precipitacao_websirene", service_account_path, limit=None)
df19.to_csv(f"{data_directory}/clima_pluviometro/taxa_precipitacao_websirene.csv", index=False)

#### clima_radar.taxa_precipitacao_guaratiba

In [None]:
df20 = fetch_bigquery_table("octacity.clima_radar.taxa_precipitacao_guaratiba", service_account_path, limit=None)
df20.to_csv(f"{data_directory}/clima_radar/taxa_precipitacao_guaratiba.csv", index=False)

#### dados_mestres.bairro

In [None]:
df21 = fetch_bigquery_table("octacity.dados_mestres.bairro", service_account_path, limit=None)
df21.to_csv(f"{data_directory}/dados_mestres/bairro.csv", index=False)

#### rionowcast.grid_points

In [None]:
df22 = fetch_bigquery_table("octacity.rionowcast.grid_points", service_account_path, limit=None)
df22.to_csv(f"{data_directory}/rionowcast/grid_points.csv", index=False)

#### rionowcast.radar

In [None]:
df23 = fetch_bigquery_table("octacity.rionowcast.radar", service_account_path, limit=None)
df23.to_csv(f"{data_directory}/rionowcast/radar.csv", index=False)

#### rionowcast.rain_gauge

In [None]:
df24 = fetch_bigquery_table("octacity.rionowcast.rain_gauge", service_account_path, limit=None)
df24.to_csv(f"{data_directory}/rionowcast/rain_gauge.csv", index=False)

---

## Download All Tables at Once

In [None]:
# Project and credentials
project = "octacity"
service_account_path = "../auth/octacity-3da367fcc38d.json"

# List of datasets and tables
datasets = {
    "adm_cor_comando": ["ocorrencias"],
    "clima_estacao_meteorologica": [
        "estacoes_alertario", "estacoes_alertario_met", "estacoes_inmet", "estacoes_redemet", 
        "intensidade_vento", "meteorologia_alertario", "meteorologia_inmet", "meteorologia_redemet"
    ],
    "clima_fluviometro": ["estacoes_inea", "lamina_agua_inea"],
    "clima_pluviometro": [
        "estacoes_alertario", "estacoes_cemaden", "estacoes_inea", "estacoes_websirene",
        "taxa_precipitacao_alertario", "taxa_precipitacao_alertario_5min", "taxa_precipitacao_cemaden",
        "taxa_precipitacao_inea", "taxa_precipitacao_websirene"
    ],
    "clima_radar": ["taxa_precipitacao_guaratiba"],
    "dados_mestres": ["bairro"],
    "rionowcast": ["grid_points", "radar", "rain_gauge"]
}

# Iterate over datasets and tables
for dataset, tables in datasets.items():
    for table in tables:
        table_path = f"{project}.{dataset}.{table}"
        save_path = f"{data_directory}/{dataset}/{table}.csv"
        
        print(f"Fetching data from {table_path}")
        df = fetch_bigquery_table(table_path, service_account_path, limit=None)
        
        print(f"Saving data to {save_path}")
        df.to_csv(save_path, index=False)

print("All tables have been Downloaded.")
