In [91]:
import requests
import datetime
import json
import gzip
import pandas as pd
import numpy as np
import os
import time
import logging
import tempfile
from pyspark.sql import DataFrame
from pyspark.sql import Window
from IPython.core.display import HTML
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, IntegerType
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col, sum, md5, when, to_timestamp, substring, to_date, trim
from datetime import datetime
from bs4 import BeautifulSoup
from io import StringIO

In [20]:
display(HTML('<style>pre { white-space: pre !important; }</style>'))

In [2]:
spark = SparkSession.builder.master('local[*]').appName('weather_etl').getOrCreate()
sc = spark.sparkContext

In [3]:
# Definimos el url de la web de conagua y un header con una identificación del navegador para evitar problemas de acceso
url = 'https://smn.conagua.gob.mx/tools/GUI/webservices/?method=1'
header = {'User-Agent': 'Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 \
          (KHTML, like Gecko) Mobile/15E148'}
ruta_logs = '/home/jovyan/logs'

In [4]:
# Configuramos la ruta de guardado, el formato del log y con el modo de guardado en concatenación
logging.basicConfig(filename = f'{ruta_logs}logs.log',format = '%(asctime)s %(message)s',filemode = 'a')
logger_scraping = logging.getLogger()  

In [87]:
def scrapData(url: str, headers: dict) -> str:
    compressed_scrapped_data = requests.get(url=url, headers=headers).content 
    return gzip.decompress(compressed_scrapped_data).decode('utf-8')

def stringToDF(data: str) -> DataFrame:
    data_list = [data]
    data_rdd = sc.parallelize(data_list)
    return spark.read.json(data_rdd)

In [81]:
def formatCols(dataframe: DataFrame) -> DataFrame:
    dataframe = (
        dataframe.select(
            col('cc').cast(DecimalType(6,2)).alias('cloud_cover_per'),
            col('desciel').alias('sky_desc'),
            col('dh').cast(IntegerType()).alias('utc_time_dif_number'),
            col('dirvieng').cast(DecimalType(5,1)).alias('wind_dir_degrees_number'),
            to_date(substring(col('dloc'), 0, 8), 'yyyyMMdd').alias('local_day_date'),
            col('ides').cast(IntegerType()).alias('state_id'),
            col('idmun').cast(IntegerType()).alias('municipality_id'),
            col('lat').cast(DecimalType(8,4)).alias('latitude_number'),
            col('lon').cast(DecimalType(8,4)).alias('longitude_number'),
            col('ndia').cast(IntegerType()).alias('day_number'),
            col('nes').alias('state_name'),
            col('nmun').alias('municipality_name'),
            col('prec').cast(DecimalType(10,1)).alias('precipitation_number'),
            col('probprec').cast(DecimalType(6,2)).alias('precipitation_prob_number'),
            col('raf').cast(DecimalType(14,1)),
            col('tmax').cast(DecimalType(6,1)).alias('max_temperature_number'),
            col('tmin').cast(DecimalType(6,1)).alias('min_temperature_number'),
            col('velvien').cast(DecimalType(10,1)).alias('wind_speed_number')  
        )
    )
    return dataframe

In [None]:
def removeDuplicates(dataframe: DataFrame) -> DataFrame:
    columns = dataframe.columns
    w = Window.partitionBy(columns)

In [88]:
scrapped_weather_data = scrapData(url, header)
df_weather = stringToDF(scrapped_weather_data)

In [83]:
df_weather = formatCols(df_weather)

In [84]:
df_weather.show(3, False)

+---------------+-------------+-------------------+-----------------------+--------------+--------+---------------+---------------+----------------+----------+----------+-------------------+--------------------+-------------------------+----+----------------------+----------------------+-----------------+
|cloud_cover_per|sky_desc     |utc_time_dif_number|wind_dir_degrees_number|local_day_date|state_id|municipality_id|latitude_number|longitude_number|day_number|state_name|municipality_name  |precipitation_number|precipitation_prob_number|raf |max_temperature_number|min_temperature_number|wind_speed_number|
+---------------+-------------+-------------------+-----------------------+--------------+--------+---------------+---------------+----------------+----------+----------+-------------------+--------------------+-------------------------+----+----------------------+----------------------+-----------------+
|78.96          |Cielo nublado|6                  |45.0                   |2024