Dear Data Science Team,

We are seeking your expertise to develop an ETL (Extract, Transform, Load) pipeline that will enable us to analyze historical weather data. Our focus is on understanding the changes in temperature and precipitation across the 48 contiguous United States between the years 1950 and 2000.

Your primary data source will be the NOAA GSOD dataset, accessed here: https://www.ncei.noaa.gov/access/metadata/landing-page/bin/iso?id=gov.noaa.ncdc:C00516.  Please use the gzip bulk download source

We need detailed statistical data on temperature and precipitation, aggregated monthly. This data should include metrics such as mean, median, variance, minimum, and maximum values.

Technical Requirements:
All scripts and data processing must be conducted in Jupyter Notebook. 
Please set up a dedicated project folder and Git repository for this assignment, using the naming convention <firstname>-<lastname>-weather-etl.  For example, Tom Cruise would name his repository -tom-cruise-weather-etl. All code should be done in a jupyter notebook 

Goals:
1. Programatically (in python) download the appropriate gzip data from the link above one at a time.
2. Unzip the data, and then delete the gzip file. 
3. Go through each csv and filter / clean out the appropriate data into a dataframe. This may require other libraries, such as reverse_geocoder.
4. Delete the csvs when you are finished with them.
5. Repeat steps 3-4 until you have a months worth of data, then transform that data to get the requested information above.
6. Repeat steps 1-5 for each month and then for year between 1950 and 2000.
At this point you should have a fully transformed dataset with yearly statistical data between 1950 and 2000.
7. Export that data into a postgres database using sql alchemy.
NOTE:  You will want to stop your existing container from running, then start a fresh database by making a new docker-compose file.  Ensure you have a .gitignore file so that the data on this postgres database isn't stored in git.

Final Execution:
Upon completion of the above steps, the following actions should replicate the database successfully:
- Run docker-compose up
- Run the jupyter notebook.

End-State:
- The project should be finalized with a clean workspace, meaning no unnecessary files remain in the project directory, and a fully operational database reflecting the processed data.

Project Maintenance (KEEP THESE IN MIND):
- Efficient Development: Be mindful of the time taken to download files. Develop strategies to test your program without needing to download all 50 years of data simultaneously.
- File and Memory Management: Ensure the deletion of files post-processing and avoid retaining unnecessary data in memory. This approach is crucial to prevent system overload or program crashes due to excessive memory usage or storage constraints.
- Many of these steps may require you to do additional research to complete.  Ensure you have understanding of the code in your project, even if its code copied from online.

In [90]:
import requests 
import tarfile
from io import BytesIO
import shutil
from sqlalchemy import create_engine
import pandas as pd
import numpy as np

In [91]:
def extract_weather_file(start_year, end_year):
    try:
        base_url = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/archive/"
        finalized_df = []
        for year in range(start_year, end_year + 1):
            url = f"{base_url}{year}.tar.gz"
            response = requests.get(url, stream=True)
            if response.status_code == 200:
                    with tarfile.open(fileobj=BytesIO(response.content), mode="r:gz") as tar:
                        # Extract all members to a temporary directory
                        tar.extractall(path="temp_dir")
                        
                        # Combine CSV files into a single DataFrame
                        df_list = []
                        for member in tar.getmembers():
                            if member.isfile() and (not member.name.startswith('71' or '76')) and member.name.endswith('.csv'):
                                csv_content = tar.extractfile(member).read()
                                df = pd.read_csv(BytesIO(csv_content))
                                df_list.append(df)
                        
                        # Concatenate DataFrames
                        combined_df = pd.concat(df_list, ignore_index=True)

                        # lower-case columns
                        combined_df.columns = combined_df.columns.str.lower()

                        # filter out latitudes and longitudes
                        combined_df = combined_df[
                            (combined_df["latitude"] > 25) & (combined_df["latitude"] < 50) & 
                            (combined_df["longitude"] > -125) & (combined_df["longitude"] < -65)
                            ]
                        
                        # replace 99.99 with null
                        combined_df["temp"].replace(99.99, np.nan, inplace=True)
                        combined_df["prcp"].replace(99.99, np.nan, inplace=True) 
                        combined_df["date"] = pd.to_datetime(combined_df["date"])

                        # aggregating temps and prcps
                        temp_monthly_stats = combined_df.groupby([combined_df["date"].dt.to_period('M')]).agg({'temp': ['mean', 'median', 'var', 'min', 'max',]}).reset_index()
                        prcp_monthly_stats = combined_df.groupby([combined_df["date"].dt.to_period('M')]).agg({'prcp': ['mean', 'median', 'var', 'min', 'max',]}).reset_index()

                        # set and reset index after aggregating temps and prcps
                        temp_monthly_stats.set_index('date', inplace=True)
                        prcp_monthly_stats.set_index('date', inplace=True)
                        combined_df = pd.concat([temp_monthly_stats, prcp_monthly_stats], axis=1)
                        combined_df = combined_df.reset_index()

                        # convert columns "date" back to yyyy/mm/dd
                        combined_df["date"] = combined_df["date"].dt.to_timestamp()

                        # resetting column names
                        new_column_names = [
                            "date",
                            "temp_mean", "temp_median", "temp_var", "temp_min", "temp_max",
                            "prcp_mean", "prcp_median", "prcp_var", "prcp_min", "prcp_max"
                        ]
                        combined_df.columns = new_column_names

                        # append to finalized dataframe
                        finalized_df.append(combined_df)

        # combine finalized dataframes
        combined_finalized_df = pd.concat(finalized_df)

        # exporting to postgres database
        db_url = 'postgresql://postgres:password@localhost:5432/postgres'
        engine = create_engine(db_url)
        combined_finalized_df.to_sql('weather_stats_1950_2000', engine, index=False, if_exists='replace')   

        return None

                         
                
    except requests.exceptions.RequestException as e:
        print(f"Failed to download or extract the file: {e}")
        return None
    except tarfile.TarError as e:
        print(f"Failed to extract the file: {e}")
        return None
    except pd.errors.EmptyDataError:
        print("No CSV files found in the archive.")
        return None
    finally:
        shutil.rmtree("temp_dir", ignore_errors=True)
        return print("Files extracted successfully!")
    

In [92]:
# run this code to start extracting files
weather_stats = extract_weather_file(1950, 2000)

Files extracted successfully!
