In [1]:
import os
import requests
from bs4 import BeautifulSoup

base_url = 'https://www.ncei.noaa.gov/data/local-climatological-data/access/{year}'

years = range(2000, 2024)
os.makedirs('data', exist_ok=True)
for year in years:
    os.makedirs(f'data/{year}', exist_ok=True)
    url = base_url.format(year)
    res = requests.get(url)
    soup = BeautifulSoup(res.text)
    table = soup.find('table')
    anchors = table.find_all('a')
    anchors = [a for a in anchors if 'csv' in a.text]
    for anchor in anchors:
        file = anchor.text
        file_url = f'{url}/{file}'
        res = requests.get(file_url)
        csv = res.text
        with open(f'data/{year}/{file}', 'w') as f:
            f.write(csv)

In [38]:
# %pip install apache-airflow

In [43]:
import os
import requests
from bs4 import BeautifulSoup
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def scrape_data(year):
    base_url = 'https://www.ncei.noaa.gov/data/local-climatological-data/access/{year}'
    os.makedirs(f'data/{year}', exist_ok=True)
    url = base_url.format(year)
    res = requests.get(url)
    soup = BeautifulSoup(res.text, 'html.parser')
    table = soup.find('table')
    anchors = table.find_all('a')
    anchors = [a for a in anchors if 'csv' in a.text]
    for anchor in anchors:
        file = anchor.text
        file_url = f'{url}/{file}'
        res = requests.get(file_url)
        csv = res.text
        with open(f'data/{year}/{file}', 'w') as f:
            f.write(csv)

In [41]:
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
}

dag = DAG(
    'noaa_data_scraping',
    default_args=default_args,
    schedule_interval=None,  # Set the schedule_interval as needed
)

for year in range(2000, 2024):
    task_id = f'scrape_data_{year}'
    scrape_task = PythonOperator(
        task_id=task_id,
        python_callable=scrape_data,
        op_args=[year],
        dag=dag,
    )

In [42]:
docker build -t my_airflow_image ./
docker compose up
docker exec -it 8ea0cd0a78a2 airflow dags trigger -r 123345 task_1_dag



In [2]:
import os
import requests
from bs4 import BeautifulSoup
# from airflow import DAG
# from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def scrape_data(ds, year, **kwargs):
    base_url = 'https://www.ncei.noaa.gov/data/local-climatological-data/access/{year}'
    os.makedirs(f'data/{year}', exist_ok=True)
    url = base_url.format(year=year)
    res = requests.get(url)
    soup = BeautifulSoup(res.text, 'html.parser')
    table = soup.find('table')
    anchors = table.find_all('a')
    anchors = [a for a in anchors if 'csv' in a.text]
    for anchor in anchors:
        file = anchor.text
        file_url = f'{url}/{file}'
        res = requests.get(file_url)
        csv = res.text
        with open(f'data/{year}/{file}', 'w') as f:
            f.write(csv)


In [3]:
scrape_data(None,2020)

KeyboardInterrupt: 

In [None]:
!unzip

In [4]:
os.listdir('output')

['data.zip']

In [2]:
index_cols = ['LATITUDE','LONGITUDE','DATE']

fields = ['HourlyAltimeterSetting',
'HourlyDewPointTemperature',
'HourlyDryBulbTemperature',
'HourlyPrecipitation',
'HourlyPresentWeatherType',
'HourlyPressureChange',
'HourlyPressureTendency',
'HourlyRelativeHumidity',
'HourlySkyConditions',
'HourlySeaLevelPressure',
'HourlyStationPressure',
'HourlyVisibility',
'HourlyWetBulbTemperature',
'HourlyWindDirection',
'HourlyWindGustSpeed',
'HourlyWindSpeed']

cols = index_cols + fields

In [2]:
import os
import requests
from bs4 import BeautifulSoup
# from airflow import DAG
# from airflow.operators.python_operator import PythonOperator
# from airflow.operators.bash_operator import BashOperator
# from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
import pandas as pd

def filesensor():
    files = os.listdir('output')
    return  'data.zip' in files

def list_files(start_path):
    all_files = []
    for root, dirs, files in os.walk(start_path):
        for file in files:
            file_path = os.path.join(root, file)
            all_files.append(file_path)
    return all_files

status = filesensor()
unzip_bash = 'unzip ./output/data.zip -d ./extracted_data/'

if status:
    # run unzip bash command
    pass

base = 'extracted_data/data/'
years = os.listdir()

folder_path = 'extracted_data/data/'
files = list_files(folder_path)
files = [file for file in files if file.endswith('.csv')]

# to do using python operator over apachebeam with direct runner
df = pd.DataFrame()
for file in files[:5]:
    df = pd.concat([df,pd.read_csv(file,low_memory=False,usecols=cols)])

In [3]:
# %pip install apache-beam

In [14]:
df.DATE

0       2018-01-01T00:00:00
1       2018-01-01T01:00:00
2       2018-01-01T02:00:00
3       2018-01-01T03:00:00
4       2018-01-01T04:00:00
               ...         
8611    2018-12-31T19:00:00
8612    2018-12-31T20:00:00
8613    2018-12-31T21:00:00
8614    2018-12-31T22:00:00
8615    2018-12-31T23:00:00
Name: DATE, Length: 39993, dtype: object

In [3]:
import apache_beam as beam
import pandas as pd
from apache_beam.options.pipeline_options import PipelineOptions

def merge_csvs(element, columns):
    df = pd.read_csv(element)
    selected_df = df[columns]
    return [tuple(x) for x in selected_df.to_records(index=False)]

columns_of_interest = cols

pipeline_options = PipelineOptions(
    runner='DirectRunner',
    project='bdl-task-2',
    job_name='part-3',
    temp_location='/tmp',
    direct_num_workers=6,
    direct_running_mode='multi_threading' #multi_threading | multi_processing | in_memory
)

with beam.Pipeline(options=pipeline_options) as pipeline:
    tuples = ( 
        pipeline 
        | 'Create file paths' >> beam.Create(files)  
        | 'Merge and select columns' >> beam.FlatMap(merge_csvs, columns_of_interest)
    )

usage: ipykernel_launcher.py [-h] [--dataflow_endpoint DATAFLOW_ENDPOINT]
                             [--project PROJECT] [--job_name JOB_NAME]
                             [--staging_location STAGING_LOCATION]
                             [--temp_location TEMP_LOCATION] [--region REGION]
                             [--service_account_email SERVICE_ACCOUNT_EMAIL]
                             [--no_auth]
                             [--template_location TEMPLATE_LOCATION]
                             [--label LABELS] [--update]
                             [--transform_name_mapping TRANSFORM_NAME_MAPPING]
                             [--enable_streaming_engine]
                             [--dataflow_kms_key DATAFLOW_KMS_KEY]
                             [--create_from_snapshot CREATE_FROM_SNAPSHOT]
                             [--flexrs_goal {COST_OPTIMIZED,SPEED_OPTIMIZED}]
                             [--dataflow_service_option DATAFLOW_SERVICE_OPTIONS]
                           

AttributeError: 'tuple' object has no attribute 'tb_frame'

In [35]:
df

Unnamed: 0,DATE,LATITUDE,LONGITUDE,HourlyAltimeterSetting,HourlyDewPointTemperature,HourlyDryBulbTemperature,HourlyPrecipitation,HourlyPresentWeatherType,HourlyPressureChange,HourlyPressureTendency,HourlyRelativeHumidity,HourlySkyConditions,HourlySeaLevelPressure,HourlyStationPressure,HourlyVisibility,HourlyWetBulbTemperature,HourlyWindDirection,HourlyWindGustSpeed,HourlyWindSpeed
0,2018-01-01T00:00:00,78.250000,22.816667,,5.0,11.0,,,-0.01,2.0,76.0,,29.80,29.75,,10.0,50.0,,16.0
1,2018-01-01T01:00:00,78.250000,22.816667,,4.0,10.0,,,-0.01,2.0,79.0,,29.80,29.75,,9.0,50.0,,16.0
2,2018-01-01T02:00:00,78.250000,22.816667,,4.0,9.0,,,-0.01,2.0,78.0,,29.80,29.75,,8.0,70.0,,13.0
3,2018-01-01T03:00:00,78.250000,22.816667,,2.0,8.0,,,-0.01,2.0,76.0,,29.80,29.76,,7.0,50.0,,11.0
4,2018-01-01T04:00:00,78.250000,22.816667,,3.0,8.0,,,0.0,4.0,80.0,,29.80,29.75,,7.0,50.0,,9.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8611,2018-12-31T19:00:00,70.933333,-8.666667,,11,22.0,,,-0.15,3.0,62.0,,29.92,29.88,,19.0,2.0,38.0,25.0
8612,2018-12-31T20:00:00,70.933333,-8.666667,,12,21.0,,,-0.15,3.0,67.0,,29.97,29.93,,19.0,353.0,37.0,25.0
8613,2018-12-31T21:00:00,70.933333,-8.666667,,12,21.0,,,-0.14,3.0,66.0,,30.01,29.98,,19.0,342.0,40.0,25.0
8614,2018-12-31T22:00:00,70.933333,-8.666667,,11,21.0,,,-0.15,3.0,66.0,,30.07s,30.03s,,18.0,350.0,41.0,24.0


In [25]:
# %pip install apache-beam

In [27]:
for col in df.columns:print(col)

STATION
DATE
LATITUDE
LONGITUDE
ELEVATION
NAME
REPORT_TYPE
SOURCE
HourlyAltimeterSetting
HourlyDewPointTemperature
HourlyDryBulbTemperature
HourlyPrecipitation
HourlyPresentWeatherType
HourlyPressureChange
HourlyPressureTendency
HourlyRelativeHumidity
HourlySkyConditions
HourlySeaLevelPressure
HourlyStationPressure
HourlyVisibility
HourlyWetBulbTemperature
HourlyWindDirection
HourlyWindGustSpeed
HourlyWindSpeed
Sunrise
Sunset
DailyAverageDewPointTemperature
DailyAverageDryBulbTemperature
DailyAverageRelativeHumidity
DailyAverageSeaLevelPressure
DailyAverageStationPressure
DailyAverageWetBulbTemperature
DailyAverageWindSpeed
DailyCoolingDegreeDays
DailyDepartureFromNormalAverageTemperature
DailyHeatingDegreeDays
DailyMaximumDryBulbTemperature
DailyMinimumDryBulbTemperature
DailyPeakWindDirection
DailyPeakWindSpeed
DailyPrecipitation
DailySnowDepth
DailySnowfall
DailySustainedWindDirection
DailySustainedWindSpeed
DailyWeather
MonthlyAverageRH
MonthlyDaysWithGT001Precip
MonthlyDaysWithGT0

In [13]:
import pandas as pd
pd.read_csv('extracted_data/data/2000/01023099999.csv')

  pd.read_csv('extracted_data/data/2000/01023099999.csv')


Unnamed: 0,STATION,DATE,LATITUDE,LONGITUDE,ELEVATION,NAME,REPORT_TYPE,SOURCE,HourlyAltimeterSetting,HourlyDewPointTemperature,...,BackupDirection,BackupDistance,BackupDistanceUnit,BackupElements,BackupElevation,BackupEquipment,BackupLatitude,BackupLongitude,BackupName,WindEquipmentChangeDate
0,1023099999,2000-01-01T00:50:00,69.055758,18.540356,76.8,"BARDUFOSS, NO",FM-15,4,29.94,1,...,,,,,,,,,,
1,1023099999,2000-01-01T01:00:00,69.055758,18.540356,76.8,"BARDUFOSS, NO",FM-12,4,,2,...,,,,,,,,,,
2,1023099999,2000-01-01T01:50:00,69.055758,18.540356,76.8,"BARDUFOSS, NO",FM-15,4,29.91,3,...,,,,,,,,,,
3,1023099999,2000-01-01T02:50:00,69.055758,18.540356,76.8,"BARDUFOSS, NO",FM-15,4,29.91,5,...,,,,,,,,,,
4,1023099999,2000-01-01T03:50:00,69.055758,18.540356,76.8,"BARDUFOSS, NO",FM-15,4,29.88,5,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
10967,1023099999,2000-12-31T20:50:00,69.055758,18.540356,76.8,"BARDUFOSS, NO",FM-15,4,29.56,12,...,,,,,,,,,,
10968,1023099999,2000-12-31T21:50:00,69.055758,18.540356,76.8,"BARDUFOSS, NO",FM-15,4,29.56,12,...,,,,,,,,,,
10969,1023099999,2000-12-31T22:00:00,69.055758,18.540356,76.8,"BARDUFOSS, NO",FM-12,4,,12,...,,,,,,,,,,
10970,1023099999,2000-12-31T22:50:00,69.055758,18.540356,76.8,"BARDUFOSS, NO",FM-15,4,29.56,7,...,,,,,,,,,,
