<hr>

## How to use:

- Install the dependencies:
    ```shell
        python -m pip install -r requirements.txt
    ```
    
- Request an free open weather API key **[here](https://home.openweathermap.org/api_keys)**, you will need to make an account.
<br>

- Create a file called `.env` and set a variable with your open weather API key.
    ```shell
        OPEN_WEATHER_API_KEY = "key"
    ```

- Run all cells.

<hr>

- Imports and setting variables.

In [1]:
import findspark
findspark.init()

import copy
import dotenv
import requests
import pandas as pd
from pyspark.sql import SparkSession

TARGET_REGION_ID = 3513
_OPEN_WEATHER_API_KEY = dotenv.get_key(key_to_get="OPEN_WEATHER_API_KEY", dotenv_path=".env")

if _OPEN_WEATHER_API_KEY == "":
    raise Exception('".env" file don\'t contains the API key.\nPlease, read the instructions at the top of the file.')

IBGE_API_URL = f"https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/{TARGET_REGION_ID}/municipios"

_CITY = "{city}"
_LANG = "pt_br"
_UNIT = "metric"
_STATE = "São Paulo,076"
OPEN_WEATHER_API_URL = f"http://api.openweathermap.org/data/2.5/forecast?q={_CITY},{_STATE}&lang={_LANG}&units={_UNIT}&appid={_OPEN_WEATHER_API_KEY}"

In [2]:
class SQL:
    TABLE_1 = """
        SELECT 
            c.nome AS Cidade, 
            c.id AS CodigoDaCidade, 
            from_unixtime(f.dt, "yyyy-MM-dd") AS Data, 
            c.microrregiao_nome AS Regiao, 
            i.city_country AS Pais, 
            i.city_coord_lat AS Latitude, 
            i.city_coord_lon AS Longitude, 
            f.main_temp_max AS TemperaturaMaxima, 
            f.main_temp_min AS TemperaturaMinima, 
            f.main_temp AS TemperaturaMedia,
            
            CASE WHEN f.weather_id < 600 THEN 'Sim'
            ELSE 'Não' END
            AS VaiChover, 
            
            f.pop AS ChanceDeChuva, 
            f.weather_description AS CondicaoDoTempo, 
            from_unixtime(i.city_sunrise,"HH:mm") AS NascerDoSol, 
            from_unixtime(i.city_sunset,"HH:mm") AS PorDoSol, 
            f.wind_speed AS VelocidadeMaximaDoVento 
        FROM 
            cities AS c 
        INNER JOIN forecasts f ON c.nome=f.city 
        INNER JOIN cities_info i ON c.nome=i.city_name;
    """
    
    TABLE_2 = """
        SELECT 
            Cidade,
            
            SUM(CASE WHEN VaiChover = 'Sim' THEN 1 
            ELSE 0 END) AS QtdDiasVaiChover,
            
            SUM(CASE WHEN VaiChover = 'Não' THEN 1 
            ELSE 0 END) AS QtdDiasNaoVaiChover,
            
            COUNT(Cidade) AS TotalDiasMapeados
        FROM 
            table_1
        GROUP BY
            Cidade
        ORDER BY
            Cidade;
        """

- Creates the **pyspark** session.

In [3]:
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkNotebook") \
    .config('spark.sql.session.timeZone', 'America/Sao_Paulo') \
    .getOrCreate()

- Requests cities data by calling the **IBGE API**.
- Create a temporary view called `"cities"`.

In [4]:
ibge_response = requests.get(IBGE_API_URL).json()

cities_pandas_df = pd.json_normalize(data=ibge_response, sep="_")

cities_df = spark.createDataFrame(data=cities_pandas_df) \
    .filter(f"microrregiao_mesorregiao_id == {TARGET_REGION_ID}")

cities_df.createOrReplaceTempView("cities")

- Requests the forecast data for each city by calling the Open Weather API and appends it into a list.

In [5]:
def weather_request(city):
    return requests.get(OPEN_WEATHER_API_URL.format(city=city)).json()

weather_data_all_cities = [weather_request(row.nome) for row in cities_df.collect()]

- Creates two DFs and temp views for:<br>
    1. Cities informations;<br>
    2. Cities forecasts;<br>

In [None]:
def extract_forecasts_for_city(city_data):
    """Extract the forecasts for 5 days.

    Args:
        `city_data (dict):` The response returned by the Open Weather API.

    Returns:
        `Pandas Daframe:` A dataframe containing the forecasts for the city.
    """
    weather_data = city_data['list']

    day_count = 1
    forecasts_dfs = []
    
    for data in weather_data:
        # Collects only one data entry per day:
        if data["dt_txt"].find("12:00:00") != -1:
            
            # Fix a problem with some weather dicts inside lists.
            if isinstance(data["weather"], list):
                data["weather"] = data["weather"][0]
                
            data["city"] = city_data["city"]["name"]
                        
            pandas_df = pd.json_normalize(data=data, sep="_")
            forecasts_dfs.append(pandas_df)
            
            if day_count >= 5: break
            day_count += 1

    # Clears the dict for later use
    del city_data['list']
    del city_data['message']
    del city_data['cod']
    del city_data['cnt']
    
    return pd.concat(forecasts_dfs, ignore_index = True, axis=0)

# ------------------------------------------------------------------- #

weather_data_all_cities_copy = copy.deepcopy(weather_data_all_cities)

# Separate forecast DF for each city.
forecasts_dfs = [extract_forecasts_for_city(city_data_dict) for city_data_dict in weather_data_all_cities_copy]

# Joining them into one DF.
forecasts_table_df = pd.concat(forecasts_dfs, ignore_index = True, axis=0)

# Create DF with forecasts.
spark.createDataFrame(forecasts_table_df).createOrReplaceTempView("forecasts")

# ------------------------------------------------------------------- #

city_info_pd_dfs = [pd.json_normalize(data=city_data_dict, sep="_") for city_data_dict in weather_data_all_cities_copy]
city_infos_pd_df = pd.concat(city_info_pd_dfs, ignore_index = True, axis=0)

# Just to not insert the city info in each forecast.
spark.createDataFrame(city_infos_pd_df).createOrReplaceTempView("cities_info")

- Creates data frames for table 1 and 2.

In [None]:
table_1 = spark.sql(SQL.TABLE_1)
table_1.createOrReplaceTempView("table_1")

table_2 = spark.sql(SQL.TABLE_2)

- Exports the tables to CSV.

In [None]:
table_1.toPandas().to_csv("tabela_1.csv", sep="|", index=False)
table_2.toPandas().to_csv("tabela_2.csv", sep="|", index=False)

### Some considerations:

The data returned by the **Open Weather API** does not appear to be entirely correct.<br>

The chance of rain always seems to be 0% and the maximum and minimum temperature is the same. <br>
Which is the "default" for the 5-days forecast endpoint.

But I believe it does not interfere with the data extraction itself.