In [None]:
import requests
from bs4 import BeautifulSoup
import re
import pandas as pd
from datetime import datetime
import dateutil
import json
from google.cloud import bigquery
from google.cloud.exceptions import NotFound


city_file = "/Users/drewwhite/Desktop/Epicodus/team-week3/drew-work/cities.json"

with open(city_file) as f:
    cities = json.load(f)

data = []

for city in cities:
    response = requests.get(city['NWS_URL'])
    soup = BeautifulSoup(response.content, 'html.parser')

    location = soup.find('h2', {'class': 'panel-title'})
    lat_lon_elev = soup.find('span', {'class': 'smallTxt'}).text.strip()
    lat, lon, elev = re.findall(r'[-+]?\d*\.\d+|\d+', lat_lon_elev)
    temperature = soup.find('p', {'class': 'myforecast-current-lrg'})
    humidity_elem = soup.find('td', text='Humidity')
    humidity = humidity_elem.find_next('td').text.strip() if humidity_elem else 'NA'
    wind_speed_elem = soup.find('td', text='Wind Speed')
    wind_speed = wind_speed_elem.find_next('td').text.strip() if wind_speed_elem else 'NA'
    barometer_elem = soup.find('td', text='Barometer')
    barometer = barometer_elem.find_next('td').text.strip() if barometer_elem else 'NA'
    dewpoint_elem = soup.find('td', text='Dewpoint')
    dewpoint = dewpoint_elem.find_next('td').text.strip() if dewpoint_elem else 'NA'
    visibility_elem = soup.find('td', text='Visibility')
    visibility = visibility_elem.find_next('td').text.strip() if visibility_elem else 'NA'
    wind_chill_elem = soup.find('td', text='Wind Chill')
    wind_chill = wind_chill_elem.find_next('td').text.strip() if wind_chill_elem else 'NA'
    last_update_elem = soup.find('td', text='Last update')
    last_update = last_update_elem.find_next('td').text.strip() if last_update_elem else 'NA'

    data.append({
        'location': city['Name'],
        'lat': lat,
        'lon': lon,
        'elev_ft': elev,
        'temperature': temperature.text if temperature else 'NA',
        'humidity': humidity,
        'wind_speed': wind_speed,
        'barometer': barometer,
        'dewpoint': dewpoint,
        'vis_miles': visibility,
        'wind_chill': wind_chill,
        'last_update': last_update
    })

df = pd.DataFrame(data)

# Split the 'location' column into separate 'city' and 'state' columns
df[['city', 'state']] = df['location'].str.split(', ', expand=True)

# Convert 'lat' and 'lon' columns to float type
df[['lat', 'lon']] = df[['lat', 'lon']].astype(float)

# Convert 'elev' column to int type
df['elev_ft'] = df['elev_ft'].astype(int)

# Extract the numeric part of the temperature string and convert it to int
df['temp_f'] = df['temperature'].str.extract('(\d+)').astype(int)

# Convert temperature to Celsius and add to new column 'temp_c'
df['temp_c'] = (df['temp_f'] - 32) * 5/9

# Round 'temp_c' to nearest integer and cast to int type
df['temp_c'] = df['temp_c'].round().astype(int)

# Convert 'humidity' column to float type
df['humidity'] = df['humidity'].str.extract('(\d+)', expand=False).astype(float) / 100

# Split wind speed values into components and convert speed to int type
df['wind_speed'] = df['wind_speed'].str.extract('(\d+)', expand=False).fillna(0).astype(int)

# Set any missing or non-numeric wind speed values to 0
df['wind_speed'] = df['wind_speed'].replace('Calm', 0)

# Convert 'barometer' column to float type, and convert inches to millibars
df['barometer'] = df['barometer'].apply(lambda x: float(x.split()[0]) * 33.8639 if 'in' in x and x != 'NA' else None)

# Round 'barometer' to two decimal places
df['barometer'] = df['barometer'].round(2)

# Split 'dewpoint' column into separate 'dewpoint_f' and 'dewpoint_c' columns
df[['dewpoint_f', 'dewpoint_c']] = df['dewpoint'].str.extract('(\d+).*?(\d+)', expand=True).astype(int)

# Convert 'vis_miles' column to float type
df['vis_miles'] = df['vis_miles'].str.extract('(\d+\.\d+|\d+)', expand=False).astype(float).round(2)

# Split 'wind_chill' column into separate 'wind_chill_f' and 'wind_chill_c' columns
df[['wind_chill_f', 'wind_chill_c']] = df['wind_chill'].str.extract('(\d+).*?(\d+)', expand=True).astype(float)

# Convert 'last_update' column to datetime type with the desired format and time zone
df['last_update'] = df['last_update'].apply(lambda x: dateutil.parser.parse(x, tzinfos={'CST': dateutil.tz.tzoffset(None, -21600)}))

# Convert 'last_update' column to UTC
df['last_update'] = df['last_update'].apply(lambda x: x.astimezone(dateutil.tz.tzutc()))

# Drop columns that were split into two values
df = df.drop(['temperature', 'dewpoint', 'wind_chill'], axis=1)



# Set up the BigQuery client
client = bigquery.Client()

# Set the project and dataset IDs
project_id = "deb-dev-dw"
dataset_id = "weather"
table_id = "daily"

# Set the table schema
schema = [
    bigquery.SchemaField("location", "STRING"),
    bigquery.SchemaField("lat", "FLOAT"),
    bigquery.SchemaField("lon", "FLOAT"),
    bigquery.SchemaField("elev_ft", "INTEGER"),
    bigquery.SchemaField("humidity", "FLOAT"),
    bigquery.SchemaField("wind_speed", "INTEGER"),
    bigquery.SchemaField("barometer", "FLOAT"),
    bigquery.SchemaField("vis_miles", "FLOAT"),
    bigquery.SchemaField("dewpoint_f", "INTEGER"),
    bigquery.SchemaField("dewpoint_c", "INTEGER"),
    bigquery.SchemaField("wind_chill_f", "FLOAT"),
    bigquery.SchemaField("wind_chill_c", "FLOAT"),
    bigquery.SchemaField("city", "STRING"),
    bigquery.SchemaField("state", "STRING"),
    bigquery.SchemaField("temp_f", "INTEGER"),
    bigquery.SchemaField("temp_c", "INTEGER"),
    bigquery.SchemaField("last_update", "TIMESTAMP"),
]

# Check if the dataset exists, and create it if it does not
try:
    dataset_ref = client.dataset(dataset_id)
    dataset = client.get_dataset(dataset_ref)
except NotFound:
    dataset_ref = client.dataset(dataset_id)
    dataset = bigquery.Dataset(dataset_ref)
    dataset.location = "US"
    dataset = client.create_dataset(dataset)

# Get a reference to the table
table_ref = dataset.table(table_id)

# Create the table if it doesn't exist
try:
    client.get_table(table_ref)
except NotFound:
    table = bigquery.Table(table_ref, schema=schema)
    table = client.create_table(table)

# Write the DataFrame to BigQuery
job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_APPEND)
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()

In [3]:
import requests
from bs4 import BeautifulSoup
import re
import pandas as pd
from datetime import datetime
import dateutil
import json
from google.cloud import bigquery
from google.cloud.exceptions import NotFound

city_file = "/Users/drewwhite/Desktop/Epicodus/team-week3/drew-work/cities.json"

def scrape_weather_data(city_file):
    with open(city_file) as f:
        cities = json.load(f)

    data = []

    for city in cities:
        response = requests.get(city['NWS_URL'])
        soup = BeautifulSoup(response.content, 'html.parser')

        location = soup.find('h2', {'class': 'panel-title'})
        lat_lon_elev = soup.find('span', {'class': 'smallTxt'}).text.strip()
        lat, lon, elev = re.findall(r'[-+]?\d*\.\d+|\d+', lat_lon_elev)
        temperature = soup.find('p', {'class': 'myforecast-current-lrg'})
        humidity_elem = soup.find('td', text='Humidity')
        humidity = humidity_elem.find_next('td').text.strip() if humidity_elem else 'NA'
        wind_speed_elem = soup.find('td', text='Wind Speed')
        wind_speed = wind_speed_elem.find_next('td').text.strip() if wind_speed_elem else 'NA'
        barometer_elem = soup.find('td', text='Barometer')
        barometer = barometer_elem.find_next('td').text.strip() if barometer_elem else 'NA'
        dewpoint_elem = soup.find('td', text='Dewpoint')
        dewpoint = dewpoint_elem.find_next('td').text.strip() if dewpoint_elem else 'NA'
        visibility_elem = soup.find('td', text='Visibility')
        visibility = visibility_elem.find_next('td').text.strip() if visibility_elem else 'NA'
        wind_chill_elem = soup.find('td', text='Wind Chill')
        wind_chill = wind_chill_elem.find_next('td').text.strip() if wind_chill_elem else 'NA'
        last_update_elem = soup.find('td', text='Last update')
        last_update = last_update_elem.find_next('td').text.strip() if last_update_elem else 'NA'

        data.append({
            'location': city['Name'],
            'lat': lat,
            'lon': lon,
            'elev_ft': elev,
            'temperature': temperature.text if temperature else 'NA',
            'humidity': humidity,
            'wind_speed': wind_speed,
            'barometer': barometer,
            'dewpoint': dewpoint,
            'vis_miles': visibility,
            'wind_chill': wind_chill,
            'last_update': last_update
        })

    df = pd.DataFrame(data)

    return df

scrape_weather_data(city_file)




Unnamed: 0,location,lat,lon,elev_ft,temperature,humidity,wind_speed,barometer,dewpoint,vis_miles,wind_chill,last_update
0,"Portland, OR",45.59578,122.60917,20,39°F,75%,WNW 3 MPH,30.24 in (1024.04 mb),32°F (0°C),10.00 mi,,16 Feb 10:35 AM PST
1,"San Diego, CA",32.73361,117.18306,13,64°F,13%,ENE 14 MPH,30.27 in (1025.06 mb),13°F (-11°C),10.00 mi,,16 Feb 10:51 AM PST
2,"Duluth, MN",46.72,92.04,607,16°F,53%,NE 6 G 13 mph,30.26 in,1°F (-17°C),10.00 mi,7°F (-14°C),16 Feb 12:55 pm CST
3,"Minneapolis, MN",44.88,93.23,840,17°F,56%,N 13 mph,30.20 in (1024.4 mb),4°F (-16°C),10.00 mi,3°F (-16°C),16 Feb 12:53 pm CST
4,"Salt Lake City, UT",40.77069,111.96503,4226,31°F,39%,N 0 MPH,30.36 in (1028.11 mb),9°F (-13°C),10.00 mi,,16 Feb 11:54 AM MST
5,"Denver, CO",39.71,104.76,5577,22°F,33%,Calm,30.15 in (1026.8 mb),-3°F (-19°C),10.00 mi,,16 Feb 11:58 am MST
6,"San Francisco, CA",37.77056,122.42694,150,50°F,44%,NA NA MPH,,29°F (-2°C),,,16 Feb 10:43 AM PST
7,"New York City, NY",40.78,73.97,154,69°F,32%,Calm,29.96 in (1013.8 mb),38°F (3°C),10.00 mi,,16 Feb 1:51 pm EST
8,"Portland, ME",43.64,70.3,72,55°F,40%,SW 10 mph,29.90 in (1012.4 mb),31°F (-1°C),10.00 mi,52°F (11°C),16 Feb 1:51 pm EST
9,"Seattle, WA",47.54548,122.3147,20,39°F,75%,SE 8 MPH,30.2 in (1022.69 mb),32°F (0°C),10.00 mi,33°F (1°C),16 Feb 10:35 AM PST


In [9]:
def transform_weather_data(df):

    df[['city', 'state']] = df['location'].str.split(', ', expand=True)
    df[['lat', 'lon']] = df[['lat', 'lon']].astype(float)
    df['elev_ft'] = df['elev_ft'].astype(int)
    df['temp_f'] = df['temperature'].str.extract('(\d+)').astype(int)
    df['temp_c'] = (df['temp_f'] - 32) * 5/9
    df['temp_c'] = df['temp_c'].round().astype(int)
    df['humidity'] = df['humidity'].str.extract('(\d+)', expand=False).astype(float) / 100
    df['wind_speed'] = df['wind_speed'].str.extract('(\d+)', expand=False).fillna(0).astype(int)
    df['wind_speed'] = df['wind_speed'].replace('Calm', 0)
    df['barometer'] = df['barometer'].apply(lambda x: float(x.split()[0]) * 33.8639 if 'in' in x and x != 'NA' else None)
    df['barometer'] = df['barometer'].round(2)
    df[['dewpoint_f', 'dewpoint_c']] = df['dewpoint'].str.extract('(\d+).*?(\d+)', expand=True).astype(int)
    df['vis_miles'] = df['vis_miles'].str.extract('(\d+\.\d+|\d+)', expand=False).astype(float).round(2)
    df[['wind_chill_f', 'wind_chill_c']] = df['wind_chill'].str.extract('(\d+).*?(\d+)', expand=True).astype(float)
    df['last_update'] = df['last_update'].apply(lambda x: dateutil.parser.parse(x, tzinfos={'CST': dateutil.tz.tzoffset(None, -21600)}))
    df['last_update'] = df['last_update'].apply(lambda x: x.astimezone(dateutil.tz.tzutc()))
    df = df.drop(['temperature', 'dewpoint', 'wind_chill'], axis=1)

    return df
transform_weather_data(scrape_weather_data(city_file))



Unnamed: 0,location,lat,lon,elev_ft,humidity,wind_speed,barometer,vis_miles,last_update,city,state,temp_f,temp_c,dewpoint_f,dewpoint_c,wind_chill_f,wind_chill_c
0,"Portland, OR",45.59578,122.60917,20,0.73,0,1023.71,10.0,2023-02-16 18:53:00+00:00,Portland,OR,41,5,33,1,,
1,"San Diego, CA",32.73361,117.18306,13,0.13,14,1025.06,10.0,2023-02-16 18:51:00+00:00,San Diego,CA,64,18,13,11,,
2,"Duluth, MN",46.72,92.04,607,0.53,6,1024.72,10.0,2023-02-16 18:55:00+00:00,Duluth,MN,16,-9,1,17,7.0,14.0
3,"Minneapolis, MN",44.88,93.23,840,0.56,13,1022.69,10.0,2023-02-16 18:53:00+00:00,Minneapolis,MN,17,-8,4,16,3.0,16.0
4,"Salt Lake City, UT",40.77069,111.96503,4226,0.39,0,1028.11,10.0,2023-02-16 19:54:00+00:00,Salt Lake City,UT,31,-1,9,13,,
5,"Denver, CO",39.71,104.76,5577,0.33,0,1021.0,10.0,2023-02-16 19:58:00+00:00,Denver,CO,22,-6,3,19,,
6,"San Francisco, CA",37.77056,122.42694,150,0.44,0,,,2023-02-16 18:43:00+00:00,San Francisco,CA,50,10,29,2,,
7,"New York City, NY",40.78,73.97,154,0.32,0,1014.56,10.0,2023-02-16 21:51:00+00:00,New York City,NY,69,21,38,3,,
8,"Portland, ME",43.64,70.3,72,0.4,10,1012.53,10.0,2023-02-16 21:51:00+00:00,Portland,ME,55,13,31,1,52.0,11.0
9,"Seattle, WA",47.54548,122.3147,20,0.73,5,1022.69,10.0,2023-02-16 18:53:00+00:00,Seattle,WA,41,5,33,1,38.0,3.0


In [None]:
def write_weather_data_to_bigquery(df):


    client = bigquery.Client()
    project_id = "deb-dev-dw"
    dataset_id = "weather"
    table_id = "daily"

    schema = [
        bigquery.SchemaField("location", "STRING"),
        bigquery.SchemaField("lat", "FLOAT"),
        bigquery.SchemaField("lon", "FLOAT"),
        bigquery.SchemaField("elev_ft", "INTEGER"),
        bigquery.SchemaField("humidity", "FLOAT"),
        bigquery.SchemaField("wind_speed", "INTEGER"),
        bigquery.SchemaField("barometer", "FLOAT"),
        bigquery.SchemaField("vis_miles", "FLOAT"),
        bigquery.SchemaField("dewpoint_f", "INTEGER"),
        bigquery.SchemaField("dewpoint_c", "INTEGER"),
        bigquery.SchemaField("wind_chill_f", "FLOAT"),
        bigquery.SchemaField("wind_chill_c", "FLOAT"),
        bigquery.SchemaField("city", "STRING"),
        bigquery.SchemaField("state", "STRING"),
        bigquery.SchemaField("temp_f", "INTEGER"),
        bigquery.SchemaField("temp_c", "INTEGER"),
        bigquery.SchemaField("last_update", "TIMESTAMP"),
    ]

    try:
        dataset_ref = client.dataset(dataset_id)
        dataset = client.get_dataset(dataset_ref)
    except NotFound:
        dataset_ref = client.dataset(dataset_id)
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = "US"
        dataset = client.create_dataset(dataset)

    table_ref = dataset.table(table_id)

    try:
        client.get_table(table_ref)
    except NotFound:
        table = bigquery.Table(table_ref, schema=schema)
        table = client.create_table(table)

    job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_APPEND)
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()

write_weather_data_to_bigquery(df)

In [None]:
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
import re
import pandas as pd
from datetime import datetime
import dateutil
import json
from google.cloud.exceptions import NotFound
from airflow import DAG
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from google.cloud import bigquery

city_file = "/Users/drewwhite/Desktop/Epicodus/team-week3/drew-work/cities.json"
PROJECT_ID = "deb-dev-dw"
DATASET_ID = "weather"
DAILY_TABLE_ID = "daily"

SCHEMA = [
        bigquery.SchemaField("location", "STRING"),
        bigquery.SchemaField("lat", "FLOAT"),
        bigquery.SchemaField("lon", "FLOAT"),
        bigquery.SchemaField("elev_ft", "INTEGER"),
        bigquery.SchemaField("humidity", "FLOAT"),
        bigquery.SchemaField("wind_speed", "INTEGER"),
        bigquery.SchemaField("barometer", "FLOAT"),
        bigquery.SchemaField("vis_miles", "FLOAT"),
        bigquery.SchemaField("dewpoint_f", "INTEGER"),
        bigquery.SchemaField("dewpoint_c", "INTEGER"),
        bigquery.SchemaField("wind_chill_f", "FLOAT"),
        bigquery.SchemaField("wind_chill_c", "FLOAT"),
        bigquery.SchemaField("city", "STRING"),
        bigquery.SchemaField("state", "STRING"),
        bigquery.SchemaField("temp_f", "INTEGER"),
        bigquery.SchemaField("temp_c", "INTEGER"),
        bigquery.SchemaField("last_update", "TIMESTAMP"),
    ]

@task
def scrape_weather_data():
    with open(city_file) as f:
        cities = json.load(f)

    data = []

    for city in cities:
        try:
            response = requests.get(city['NWS_URL'])
        except:
            continue
        soup = BeautifulSoup(response.content, 'html.parser')

        location = soup.find('h2', {'class': 'panel-title'})
        lat_lon_elev = soup.find('span', {'class': 'smallTxt'}).text.strip()
        lat, lon, elev = re.findall(r'[-+]?\d*\.\d+|\d+', lat_lon_elev)
        temperature = soup.find('p', {'class': 'myforecast-current-lrg'})
        humidity_elem = soup.find('td', text='Humidity')
        humidity = humidity_elem.find_next('td').text.strip() if humidity_elem else 'NA'
        wind_speed_elem = soup.find('td', text='Wind Speed')
        wind_speed = wind_speed_elem.find_next('td').text.strip() if wind_speed_elem else 'NA'
        barometer_elem = soup.find('td', text='Barometer')
        barometer = barometer_elem.find_next('td').text.strip() if barometer_elem else 'NA'
        dewpoint_elem = soup.find('td', text='Dewpoint')
        dewpoint = dewpoint_elem.find_next('td').text.strip() if dewpoint_elem else 'NA'
        visibility_elem = soup.find('td', text='Visibility')
        visibility = visibility_elem.find_next('td').text.strip() if visibility_elem else 'NA'
        wind_chill_elem = soup.find('td', text='Wind Chill')
        wind_chill = wind_chill_elem.find_next('td').text.strip() if wind_chill_elem else 'NA'
        last_update_elem = soup.find('td', text='Last update')
        last_update = last_update_elem.find_next('td').text.strip() if last_update_elem else 'NA'

        data.append({
            'location': city['Name'],
            'lat': lat,
            'lon': lon,
            'elev_ft': elev,
            'temperature': temperature.text if temperature else 'NA',
            'humidity': humidity,
            'wind_speed': wind_speed,
            'barometer': barometer,
            'dewpoint': dewpoint,
            'vis_miles': visibility,
            'wind_chill': wind_chill,
            'last_update': last_update
        })

    df = pd.DataFrame(data)

    return df

@task
def transform_weather_data(df):
    transformations = [
        (['city', 'state'], df['location'].str.split(', ', expand=True)),
        (['lat', 'lon'], df[['lat', 'lon']].astype(float)),
        ('elev_ft', df['elev_ft'].astype(int)),
        ('temp_f', df['temperature'].str.extract('(\d+)').astype(int)),
        ('temp_c', (df['temp_f'] - 32) * 5/9),
        ('temp_c', df['temp_c'].round().astype(int)),
        ('humidity', df['humidity'].str.extract('(\d+)', expand=False).astype(float) / 100),
        ('wind_speed', df['wind_speed'].str.extract('(\d+)', expand=False).fillna(0).astype(int)),
        ('wind_speed', df['wind_speed'].replace('Calm', 0)),
        ('barometer', df['barometer'].apply(lambda x: float(x.split()[0]) * 33.8639 if 'in' in x and x != 'NA' else None)),
        ('barometer', df['barometer'].round(2)),
        (['dewpoint_f', 'dewpoint_c'], df['dewpoint'].str.extract('(\d+).*?(\d+)', expand=True).astype(int)),
        ('vis_miles', df['vis_miles'].str.extract('(\d+\.\d+|\d+)', expand=False).astype(float).round(2)),
        ('wind_chill_f', df['wind_chill'].str.extract('(\d+).*?(\d+)', expand=True).astype(float)),
        ('wind_chill_c', (df['wind_chill_f'] - 32) * 5/9),
        ('wind_chill_c', df['wind_chill_c'].round().astype(int)),
        ('last_update', df['last_update'].apply(lambda x: dateutil.parser.parse(x, tzinfos={'CST': dateutil.tz.tzoffset(None, -21600)}).astimezone(dateutil.tz.tzutc()))),
    ]
    for col, transform in transformations:
        if isinstance(col, list):
            df[col] = transform
        else:
            df[col] = transform

    # Drop columns that were split into two values
    df = df.drop(['temperature', 'dewpoint', 'wind_chill'], axis=1)

    return df

@task
def write_weather_data_to_bigquery(df):
    client = bigquery.Client()

    try:
        dataset_ref = client.dataset(DATASET_ID)
        dataset = client.get_dataset(dataset_ref)
    except NotFound:
        dataset_ref = client.dataset(DATASET_ID)
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = "US"
        dataset = client.create_dataset(dataset)

    table_ref = dataset.table(DAILY_TABLE_ID)

    try:
        client.get_table(table_ref)
    except NotFound:
        table = bigquery.Table(table_ref, schema=SCHEMA)
        table = client.create_table(table)

    job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_APPEND)
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()

@dag(
    'weather_data_pipeline',
    description='Scrapes National Weather Service website every 12 hours, transforms data and loads to bigquery',
    default_args=default_args,
    schedule_interval='0 0,12 * * *',
)
def weather_data_pipeline():

    scrape_weather_data_task = scrape_weather_data()


    transform_weather_data_task = transform_weather_data()


    write_weather_data_to_bigquery_task = write_weather_data_to_bigquery()


    done = EmptyOperator(task_id='done')

    scrape_weather_data_task >> transform_weather_data_task >> write_weather_data_to_bigquery_task >> done

dag = weather_data_pipeline()


In [None]:
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
import re
import pandas as pd
from datetime import datetime
import dateutil
import json
from google.cloud.exceptions import NotFound
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.empty import EmptyOperator
from google.cloud import bigquery

city_file = "/Users/drewwhite/Desktop/Epicodus/team-week3/drew-work/cities.json"

def scrape_weather_data(city_file):
    with open(city_file) as f:
        cities = json.load(f)

    data = []

    for city in cities:
        response = requests.get(city['NWS_URL'])
        soup = BeautifulSoup(response.content, 'html.parser')

        location = soup.find('h2', {'class': 'panel-title'})
        lat_lon_elev = soup.find('span', {'class': 'smallTxt'}).text.strip()
        lat, lon, elev = re.findall(r'[-+]?\d*\.\d+|\d+', lat_lon_elev)
        temperature = soup.find('p', {'class': 'myforecast-current-lrg'})
        humidity_elem = soup.find('td', text='Humidity')
        humidity = humidity_elem.find_next('td').text.strip() if humidity_elem else 'NA'
        wind_speed_elem = soup.find('td', text='Wind Speed')
        wind_speed = wind_speed_elem.find_next('td').text.strip() if wind_speed_elem else 'NA'
        barometer_elem = soup.find('td', text='Barometer')
        barometer = barometer_elem.find_next('td').text.strip() if barometer_elem else 'NA'
        dewpoint_elem = soup.find('td', text='Dewpoint')
        dewpoint = dewpoint_elem.find_next('td').text.strip() if dewpoint_elem else 'NA'
        visibility_elem = soup.find('td', text='Visibility')
        visibility = visibility_elem.find_next('td').text.strip() if visibility_elem else 'NA'
        wind_chill_elem = soup.find('td', text='Wind Chill')
        wind_chill = wind_chill_elem.find_next('td').text.strip() if wind_chill_elem else 'NA'
        last_update_elem = soup.find('td', text='Last update')
        last_update = last_update_elem.find_next('td').text.strip() if last_update_elem else 'NA'

        data.append({
            'location': city['Name'],
            'lat': lat,
            'lon': lon,
            'elev_ft': elev,
            'temperature': temperature.text if temperature else 'NA',
            'humidity': humidity,
            'wind_speed': wind_speed,
            'barometer': barometer,
            'dewpoint': dewpoint,
            'vis_miles': visibility,
            'wind_chill': wind_chill,
            'last_update': last_update
        })

    df = pd.DataFrame(data)

    return df

def transform_weather_data(df):
    transformations = [
        (['city', 'state'], df['location'].str.split(', ', expand=True)),
        (['lat', 'lon'], df[['lat', 'lon']].astype(float)),
        ('elev_ft', df['elev_ft'].astype(int)),
        ('temp_f', df['temperature'].str.extract('(\d+)').astype(int)),
        ('temp_c', (df['temp_f'] - 32) * 5/9),
        ('temp_c', df['temp_c'].round().astype(int)),
        ('humidity', df['humidity'].str.extract('(\d+)', expand=False).astype(float) / 100),
        ('wind_speed', df['wind_speed'].str.extract('(\d+)', expand=False).fillna(0).astype(int)),
        ('wind_speed', df['wind_speed'].replace('Calm', 0)),
        ('barometer', df['barometer'].apply(lambda x: float(x.split()[0]) * 33.8639 if 'in' in x and x != 'NA' else None)),
        ('barometer', df['barometer'].round(2)),
        (['dewpoint_f', 'dewpoint_c'], df['dewpoint'].str.extract('(\d+).*?(\d+)', expand=True).astype(int)),
        ('vis_miles', df['vis_miles'].str.extract('(\d+\.\d+|\d+)', expand=False).astype(float).round(2)),
        (['wind_chill_f', 'wind_chill_c'], df['wind_chill'].str.extract('(\d+).*?(\d+)', expand=True).astype(float)),
        ('last_update', df['last_update'].apply(lambda x: dateutil.parser.parse(x, tzinfos={'CST': dateutil.tz.tzoffset(None, -21600)}))),
        ('last_update', df['last_update'].apply(lambda x: x.astimezone(dateutil.tz.tzutc()))),
    ]
    for col, transform in transformations:
        if isinstance(col, list):
            df[col] = transform
        else:
            df[col] = transform

    # Drop columns that were split into two values
    df = df.drop(['temperature', 'dewpoint', 'wind_chill'], axis=1)

    return df

def write_weather_data_to_bigquery():

    client = bigquery.Client()
    project_id = "deb-dev-dw"
    dataset_id = "weather"
    table_id = "daily"

    schema = [
        bigquery.SchemaField("location", "STRING"),
        bigquery.SchemaField("lat", "FLOAT"),
        bigquery.SchemaField("lon", "FLOAT"),
        bigquery.SchemaField("elev_ft", "INTEGER"),
        bigquery.SchemaField("humidity", "FLOAT"),
        bigquery.SchemaField("wind_speed", "INTEGER"),
        bigquery.SchemaField("barometer", "FLOAT"),
        bigquery.SchemaField("vis_miles", "FLOAT"),
        bigquery.SchemaField("dewpoint_f", "INTEGER"),
        bigquery.SchemaField("dewpoint_c", "INTEGER"),
        bigquery.SchemaField("wind_chill_f", "FLOAT"),
        bigquery.SchemaField("wind_chill_c", "FLOAT"),
        bigquery.SchemaField("city", "STRING"),
        bigquery.SchemaField("state", "STRING"),
        bigquery.SchemaField("temp_f", "INTEGER"),
        bigquery.SchemaField("temp_c", "INTEGER"),
        bigquery.SchemaField("last_update", "TIMESTAMP"),
    ]

    try:
        dataset_ref = client.dataset(dataset_id)
        dataset = client.get_dataset(dataset_ref)
    except NotFound:
        dataset_ref = client.dataset(dataset_id)
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = "US"
        dataset = client.create_dataset(dataset)

    table_ref = dataset.table(table_id)

    try:
        client.get_table(table_ref)
    except NotFound:
        table = bigquery.Table(table_ref, schema=schema)
        table = client.create_table(table)

    job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_APPEND)
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()

default_args = {
    'owner': 'Drew White',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 14),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'weather_data_pipeline',
    description='Scrapes National Weather Service website every 12 hours, transforms data and loads to bigquery'
    default_args=default_args,
    schedule_interval='0 0,12 * * *',
) as dag:

scrape_weather_data_task = PythonOperator(
    task_id='scrape_weather_data',
    python_callable=scrape_weather_data
)

transform_weather_data_task = PythonOperator(
    task_id='transform_weather_data',
    python_callable=transform_weather_data
)

write_weather_data_to_bigquery_task = PythonOperator(
    task_id='write_weather_data_to_bigquery',
    python_callable=write_weather_data_to_bigquery
)

done = EmptyOperator(task_id='done')

scrape_weather_data_task >> transform_weather_data_task >> write_weather_data_to_bigquery_task >> done

In [None]:
PROJECT_ID = "deb-dev-dw"
DATASET_ID = "weather"
DAILY_TABLE_ID = "daily"
WEEKLY_TABLE_ID = "weekly_avg"


def calculate_weekly_averages():
    client = bigquery.Client()
    query = f"""
    SELECT location, city, state, lat, lon,
        ROUND(AVG(temp_f), 1) AS temp_f_avg,
        ROUND(AVG(temp_c), 1) AS temp_c_avg,
        ROUND(AVG(humidity), 1) AS humidity_avg,
        ROUND(AVG(barometer), 1) AS barometer_avg,
        ROUND(AVG(dewpoint_f), 1) AS dewpoint_f_avg,
        ROUND(AVG(dewpoint_c), 1) AS dewpoint_c_avg,
        CURRENT_TIMESTAMP() AS modified_at
    FROM `{PROJECT_ID}.{DATASET_ID}.{DAILY_TABLE_ID}`
    WHERE DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) <= DATE(last_update)
    GROUP BY location, city, state, lat, lon
"""
    weekly_df = client.query(query).to_dataframe()
    return weekly_df

print(calculate_weekly_averages())


In [None]:

def write_weekly_averages_to_bigquery(df):
    client = bigquery.Client()
    project_id = "deb-dev-dw"
    dataset_id = "weather"
    table_id = "weekly_avg"

    schema = [
        bigquery.SchemaField("location", "STRING"),
        bigquery.SchemaField("city", "STRING"),
        bigquery.SchemaField("state", "STRING"),
        bigquery.SchemaField("lat", "FLOAT"),
        bigquery.SchemaField("lon", "FLOAT"),
        bigquery.SchemaField("temp_f_avg", "FLOAT"),
        bigquery.SchemaField("temp_c_avg", "FLOAT"),
        bigquery.SchemaField("humidity_avg", "FLOAT"),
        bigquery.SchemaField("barometer_avg", "FLOAT"),
        bigquery.SchemaField("dewpoint_f_avg", "FLOAT"),
        bigquery.SchemaField("dewpoint_c_avg", "FLOAT"),
        bigquery.SchemaField("modified_at", "TIMESTAMP")
    ]

    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)

    try:
        client.get_table(table_ref)
    except NotFound:
        table = bigquery.Table(table_ref, schema=schema)
        table = client.create_table(table)

    job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_APPEND)
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()

write_weekly_averages_to_bigquery(calculate_weekly_averages())


In [None]:
import datetime as dt
from google.cloud import bigquery
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.decorators import dag, task
from airflow.operators.python_operator import EmptyOperator

PROJECT_ID = "deb-dev-dw"
DATASET_ID = "weather"
DAILY_TABLE_ID = "daily"
WEEKLY_TABLE_ID = "weekly_avg"

SCHEMA = [
    bigquery.SchemaField("location", "STRING"),
    bigquery.SchemaField("city", "STRING"),
    bigquery.SchemaField("state", "STRING"),
    bigquery.SchemaField("lat", "FLOAT"),
    bigquery.SchemaField("lon", "FLOAT"),
    bigquery.SchemaField("temp_f_avg", "FLOAT"),
    bigquery.SchemaField("temp_c_avg", "FLOAT"),
    bigquery.SchemaField("humidity_avg", "FLOAT"),
    bigquery.SchemaField("barometer_avg", "FLOAT"),
    bigquery.SchemaField("dewpoint_f_avg", "FLOAT"),
    bigquery.SchemaField("dewpoint_c_avg", "FLOAT"),
    bigquery.SchemaField("modified_at", "TIMESTAMP")
]

@task
def calculate_weekly_averages():
    client = bigquery.Client()
    query = f"""
    SELECT location, city, state, lat, lon,
        ROUND(AVG(temp_f), 1) AS temp_f_avg,
        ROUND(AVG(temp_c), 1) AS temp_c_avg,
        ROUND(AVG(humidity), 1) AS humidity_avg,
        ROUND(AVG(barometer), 1) AS barometer_avg,
        ROUND(AVG(dewpoint_f), 1) AS dewpoint_f_avg,
        ROUND(AVG(dewpoint_c), 1) AS dewpoint_c_avg,
        CURRENT_TIMESTAMP() AS modified_at
    FROM `{PROJECT_ID}.{DATASET_ID}.{DAILY_TABLE_ID}`
    WHERE DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) <= DATE(last_update)
    GROUP BY location, city, state, lat, lon
"""
    weekly_df = client.query(query).to_dataframe()
    return weekly_df

@task
def write_weekly_avg_to_bq(df, **context):
    client = bigquery.Client()

    dataset_ref = client.dataset(DATASET_ID)
    table_ref = dataset_ref.table(WEEKLY_TABLE_ID)

    try:
        client.get_table(table_ref)
    except NotFound:
        table = bigquery.Table(table_ref, schema=SCHEMA)
        table = client.create_table(table)

    job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_APPEND)
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()
    # Create the BigQuery insert job operator
    insert_job = BigQueryInsertJobOperator(
        task_id='insert_weekly_avg_to_bq',
        project_id=PROJECT_ID,
        dataset_id=DATASET_ID,
        table_id=WEEKLY_TABLE_ID,
        schema_fields=SCHEMA,
        bigquery_conn_id='google_cloud_default',
        time_partitioning={'type': 'DAY'},
        use_legacy_sql=False,
        template_suffix=f"_{dt.datetime.now().strftime('%Y%m%d_%H%M%S')}",
        task_concurrency=1,
        dag=dag
    )

    # Execute the BigQuery insert job operator
    insert_job.execute(context={
        'ti': context['ti'],
        'weekly_averages_df': df
    })

@dag(
    schedule_interval="@weekly",
    description='Calculates weekly averages of weather data in bigquery',
    start_date=datetime.utcnow(),
    catchup=False,
    default_view='graph',
    is_paused_upon_creation=True,
    tags=['averages', 'weekly averages'],
)

def weekly_avg():

    calculate_weekly_avg_task = calculate_weekly_averages()

    write_weekly_avg_task = write_weekly_avg_to_bq(calculate_weekly_averages)

    insert_weekly_avg_to_bq_task = BigQueryInsertJobOperator(
        task_id='insert_weekly_avg_to_bq',
        project_id=PROJECT_ID,
        dataset_id=DATASET_ID,
        table_id=WEEKLY_TABLE_ID,
        schema_fields=SCHEMA,
        bigquery_conn_id='google_cloud_default',
        time_partitioning={'type': 'DAY'},
        use_legacy_sql=False,
        template_suffix=f"_{dt.datetime.now().strftime('%Y%m%d_%H%M%S')}",
        task_concurrency=1,
        dag=dag
    )

    done = EmptyOperator(task_id='done')

    calculate_weekly_avg_task >> write_weekly_avg_task >> insert_weekly_avg_to_bq_task >> done

dag = write_weekly_avg_to_bq
