The wx_data directory has files containing weather data records from 1985-01-01 to 2014-12-31. Each file corresponds to a particular weather station from Nebraska, Iowa, Illinois, Indiana, or Ohio.

Each line in the file contains 4 records separated by tabs: 

1. The date (YYYYMMDD format)
2. The maximum temperature for that day (in tenths of a degree Celsius)
3. The minimum temperature for that day (in tenths of a degree Celsius)
4. The amount of precipitation for that day (in tenths of a millimeter)



In [1]:
import pandas as pd # great for data manipulation
import os # used to play with git repository clone
import psycopg2 # postgresql/python connector

#### Creating Master File by combining all given csv files into a '|' delimited csv file

In [2]:
# Inside 'code-challenge-template/src' directory so below variables are relative
weather_records_directory = '../wx_data/'
answers_directory = '../answers/'
master_file = answers_directory + 'master.csv'

with open(master_file, 'w') as w1:
    for file in os.listdir(weather_records_directory):
        filename = os.path.join(weather_records_directory, file)
        #print(filename)
        if os.path.isfile(filename):
            # Use the file's name to identify weather stations
            weather_station_code = os.path.splitext(file)[0] 
            with open(filename, 'r') as r1:
                for line in r1:
                    date, max_temp, min_temp, precip_total = line.split()
                    yyyy = date[:4] # Needed to construct postgresql date format 
                    mm = date[4:6] # Needed to construct postgresql date format 
                    dd = date[6:] # Needed to construct postgresql date format 
                    master_file_row = f"{weather_station_code}|{yyyy}-{mm}-{dd}|{max_temp}|{min_temp}|{precip_total}"
                    #print(master_file_row)
                    w1.write(f"{master_file_row}\n")

#### Verify via bash shell that all went well with below commands
1. cat master.csv  |  awk -F\| '{print $1}' | sort | uniq | wc -l = 167 rows
2. ls -l ../wx_data/ | grep -v total | wc -l = 167 rows

#### Populate Pandas dataframe with masterfile rows (1,729,957 rows)

In [3]:
headers = ['Weather Station Code', 'Date', 'Max Temperature', 'Minimum temperature', 'Precipitation']
weather_df = pd.read_csv(master_file, sep='|', names = headers)
weather_df

Unnamed: 0,Weather Station Code,Date,Max Temperature,Minimum temperature,Precipitation
0,USC00257515,1985-01-01,-67,-167,0
1,USC00257515,1985-01-02,11,-139,0
2,USC00257515,1985-01-03,61,-111,0
3,USC00257515,1985-01-04,72,-33,0
4,USC00257515,1985-01-05,122,-78,0
...,...,...,...,...,...
1729952,USC00123527,2014-12-27,106,6,13
1729953,USC00123527,2014-12-28,78,-11,122
1729954,USC00123527,2014-12-29,6,-61,0
1729955,USC00123527,2014-12-30,17,-61,0


##### I created a postgresql database to hold our solution. However, any db type (relational/nosql) can be used. Since the data is well defined, csv format, I chose a relational db for this assignment.

Below are some tasks performed during installation:
CREATE USER dex WITH PASSWORD 'dex';
CREATE DATABASE my_weather_stations OWNER dex;

#### The python class, 'DBConnection' is used to manage any postgresql database connection and offers utilities to convert between dataframes and db tables

In [4]:
class DBConnection:

    def __init__(self, db_details):
        self.db_name = db_details.get('db_name')
        self.user = db_details.get('user')
        self.password = db_details.get('password')
        self.host = db_details.get('host')
        self.port = db_details.get('port')
        self.conn = None        
    
    # This method keeps the __init__ method clean to simplify unit testing
    def initialize(self):
        self.connect_to_db() 
        
    def connect_to_db(self):
        try:
            self.conn = psycopg2.connect(dbname = self.db_name,
                                            user = self.user,
                                            password = self.password,
                                            host = self.host,
                                            port = self.port
                                           )
        except Exception as e:
            print('Uh oh! We could not connect to the db with the following details:'
                  f"{DB_DETAILS} -> {str(e)}")

    # Method ensures new cursors are created for executed queries.
    # This ensures that cache is not used        
    def get_cursor(self):
        try:
            curs = self.conn.cursor()
        except Exception as e:
            print(f"Could not retrieve cursor to {self.db_name} -> {str(e)}")
        return curs

    
    def postgresql_to_dataframe(self, select_query, column_names=[]):
        
        try:
            cursor = self.get_cursor()
            cursor.execute(select_query)
        except Exception as e:
            print(f"Uh oh. Could not import the table data into a dataframe: {str(e)}")
            cursor.close()
            return

        rows = cursor.fetchall()
        cursor.close()

        df = pd.DataFrame(rows, columns=column_names)
        return df
    
    def dataframe_to_postgresql(self, data_frame, tbl):
        try:
            from sqlalchemy import create_engine
            engine = create_engine(f"postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.db_name}")
            data_frame.to_sql(tbl, engine, if_exists='replace', index = False)
        except Exception as e:
            print(f"Uh oh! Could not import the dataframe into table {tbl}: {str(e)}")

#### The python classes,  'WeatherStationDailyRecord' and 'WeatherStationAnnualRecord' have class methods (@classmethod) that define the schemas of the postgresql tables we will populate.

#### The __init__ method of their classes, WeatherStationDailyRecord' and 'WeatherStationAnnualRecord',  will populate their class's objects with one line from the master file created earlier or one line from the Pandas dataframe I will create later, respectively.  The objects' attributes will correspond to each row of their database table once populated.

#### Notice that the primary key chosen is a composite, using columns, 'weather_station_id' and 'record_date' for   Using these two columns ensures that duplicate records can not get stored in the same database table.

In [5]:
class WeatherStationDailyRecord:
    DB_DETAILS = {'db_name': 'my_weather_stations',
                  'user': 'd3x',
                  'password': 'd3x',
                  'host': '127.0.0.1',
                  'port': '5432'
                 }
    WEATHER_STATION_DAILY_RECORD = 'weather_station_daily_record'
    WEATHER_STATION_DAILY_RECORD_COLUMNS = """(weather_station_id, record_date,
                                               maximum_temperature, minimum_temperature,
                                               total_precipitation)
                                           """
    
    WEATHER_STATION_DAILY_RECORD_COLUMNS_LIST = ('weather_station_id',
                                                 'record_date',
                                                 'maximum_temperature',
                                                 'minimum_temperature',
                                                 'total_precipitation')
    
    WEATHER_STATION_DAILY_RECORD_SCHEMA = """
                                            (weather_station_id VARCHAR(30) NOT NULL,
                                             record_date DATE NOT NULL,
                                             maximum_temperature INTEGER,
                                             minimum_temperature INTEGER,
                                             total_precipitation INTEGER,
                                             PRIMARY KEY(weather_station_id, record_date)
                                             )
                                             """
        
    def __init__(self):
        self.db = None
        self.weather_station_id = None
        self.record_date = None
        self.maximum_temperature = None
        self.minimum_temperature = None
        self.total_precipitation = None
    
    # This method keeps the __init__ method clean to simplify unit testing
    def initialize(self):
        self.db = DBConnection(WeatherStationDailyRecord.DB_DETAILS)
        self.db.initialize()
        curs = self.db.get_cursor()
        WeatherStationDailyRecord.create_table(self.db)

    @classmethod 
    def create_table(cls, db_conn):
        create_table_command = f"""
                            CREATE TABLE IF NOT EXISTS {WeatherStationDailyRecord.WEATHER_STATION_DAILY_RECORD} 
                            {WeatherStationDailyRecord.WEATHER_STATION_DAILY_RECORD_SCHEMA};
                            """  
        try:
            curs = db_conn.get_cursor()
            curs.execute(create_table_command)
            db_conn.conn.commit()
        except Exception as e:
            print(f"Could not create table: {WeatherStationDailyRecord.WEATHER_STATION_DAILY_RECORD} -> {str(e)}")
    
    

In [6]:
class WeatherStationAnnualRecord:
    DB_DETAILS = {'db_name': 'my_weather_stations',
                  'user': 'd3x',
                  'password': 'd3x',
                  'host': '127.0.0.1',
                  'port': '5432'
                 }
    WEATHER_STATION_ANNUAL_RECORD = 'weather_station_annual_record'
    WEATHER_STATION_ANNUAL_RECORD_COLUMNS = """(weather_station_id, year,
                                                average_maximum_temperature, 
                                                average_minimum_temperature,
                                                total_precipitation)
                                            """
    
    WEATHER_STATION_ANNUAL_RECORD_COLUMNS_LIST = ('weather_station_id',
                                                  'year',
                                                  'average_maximum_temperature',
                                                  'average_minimum_temperature',
                                                  'total_precipitation')
    
    WEATHER_STATION_ANNUAL_RECORD_SCHEMA = """
                                            (weather_station_id VARCHAR(30) NOT NULL,
                                             year VARCHAR(4) NOT NULL,
                                             average_maximum_temperature DECIMAL,
                                             average_minimum_temperature DECIMAL,
                                             total_precipitation INTEGER,
                                             PRIMARY KEY(weather_station_id, year)
                                             )
                                          """
    
    def __init__(self):
        self.db = None
        self.weather_station_id = None
        self.year = None
        self.avg_max_temp = None
        self.avg_min_temp = None
        self.tot_precip = None
        
    # This method keeps the __init__ method clean to simplify unit testing
    def initialize(self):
        self.db = DBConnection(WeatherStationAnnualRecord.DB_DETAILS)
        self.db.initialize()
        curs = self.db.get_cursor()
        WeatherStationAnnualRecord.create_table(self.db)
        
    @classmethod 
    def create_table(cls, db_conn):
        create_table_command = f"""
                            CREATE TABLE IF NOT EXISTS {WeatherStationAnnualRecord.WEATHER_STATION_ANNUAL_RECORD} 
                            {WeatherStationAnnualRecord.WEATHER_STATION_ANNUAL_RECORD_SCHEMA};
                            """  
        try:
            curs = db_conn.get_cursor()
            curs.execute(create_table_command)
            db_conn.conn.commit()
        except Exception as e:
            print(f"Could not create table: {WeatherStationAnnualRecord.WEATHER_STATION_ANNUAL_RECORD} -> {str(e)}")
    

#### Since we are in a Jupyter notebook, we will define the below segment as our 'main' without having to create a function named 'main'

In [7]:
from datetime import datetime

try:
    weather_station = WeatherStationDailyRecord()
    weather_station.initialize()
    #logging.debug(weather_station.__dict__)
except Exception as e:
    print(f"Error(s) when working with WeatherStationDailyRecord object: {str(e)}")
    
undigested_records = []
start_ingest_time = datetime.now()
with open(master_file, 'r') as r1:
    for line in r1:
        weather_station_code, date, max_temp, min_temp, precip = line.split('|')
        #According to the assignment, 'Missing values are indicated by the value -9999.'
        # Convert value to None so that Pandas can convert these values to NaN.
        
        max_temp = None if '-9999' in max_temp else int(max_temp)
        min_temp = None if '-9999' in min_temp else int(min_temp)
        precip = None if '-9999' in precip else int(precip)
        
        row_to_insert = (weather_station_code, date, max_temp, min_temp, precip)
        row_to_insert_placeholder = '(%s, %s, %s, %s, %s)'
        #   {WeatherStationRecord.MIDWEST_STATION_COLUMNS} 
        insert_query = f"""
                        INSERT INTO 
                        {WeatherStationDailyRecord.WEATHER_STATION_DAILY_RECORD}
                        VALUES
                        {row_to_insert_placeholder};
                        """
        #print(insert_query)
        try:
            cursor = weather_station.db.get_cursor()
            cursor.execute(insert_query, row_to_insert)
        except Exception as e:
            # print(f"Error while inserting into db: {str(e)}")
            undigested_records.append(line) # Keep a list of records that could not be added

# Commit all inserts made above to the postgresql table outside of for loop
# Purposely kept out of the for loop to improve performance when working with larger datasets
# A batch function can be used within the for loop to find a sweet spot (performance vs quicker persistence to disk)
weather_station.db.conn.commit() 
end_ingest_time = datetime.now()
#if undigested_records:
#    logging.error(print(f"Lines that could not be ingested: {undigested_records}"))
print(f"Started Table Population @ {start_ingest_time}")
print(f"Ended Table Population @ {end_ingest_time}")
duration = end_ingest_time - start_ingest_time
print(f"Table Population Duration: {duration}")


# Get dataframe from database
select_query = f"SELECT * FROM {WeatherStationDailyRecord.WEATHER_STATION_DAILY_RECORD}"
weather_df = weather_station.db.postgresql_to_dataframe(select_query,
                                                        WeatherStationDailyRecord.WEATHER_STATION_DAILY_RECORD_COLUMNS_LIST)

Started Table Population @ 2023-02-17 09:45:48.412903
Ended Table Population @ 2023-02-17 09:47:42.533682
Table Population Duration: 0:01:54.120779


##### Here we see a sample of our Pandas dataframe after fetching from the database (1,729,957 rows)

In [8]:
weather_df


Unnamed: 0,weather_station_id,record_date,maximum_temperature,minimum_temperature,total_precipitation
0,USC00257515,1985-01-01,-67.0,-167.0,0.0
1,USC00257515,1985-01-02,11.0,-139.0,0.0
2,USC00257515,1985-01-03,61.0,-111.0,0.0
3,USC00257515,1985-01-04,72.0,-33.0,0.0
4,USC00257515,1985-01-05,122.0,-78.0,0.0
...,...,...,...,...,...
1729952,USC00123527,2014-12-27,106.0,6.0,13.0
1729953,USC00123527,2014-12-28,78.0,-11.0,122.0
1729954,USC00123527,2014-12-29,6.0,-61.0,0.0
1729955,USC00123527,2014-12-30,17.0,-61.0,0.0


For every year, for every weather station, calculate:

* Average maximum temperature (in degrees Celsius)
* Average minimum temperature (in degrees Celsius)
* Total accumulated precipitation (in centimeters)

Since we want averages or sums of the different years found in the dataset, I will extract and place in a new column the first 4 digits of 'record_date'


In [9]:
weather_df['year'] = weather_df.record_date.astype(str).str[:4]

##### Here we see a sample of data that contains NaN values (formerly -9999).  There are 60850 rows with at least one column of 'missing data'

In [10]:
weather_df[weather_df.isna().any(axis=1)]


Unnamed: 0,weather_station_id,record_date,maximum_temperature,minimum_temperature,total_precipitation,year
95,USC00257515,1985-04-06,,,0.0,1985
96,USC00257515,1985-04-07,,,0.0,1985
97,USC00257515,1985-04-08,,,0.0,1985
98,USC00257515,1985-04-09,,,0.0,1985
99,USC00257515,1985-04-10,,,0.0,1985
...,...,...,...,...,...,...
1728202,USC00123527,2010-03-13,128.0,,119.0,2010
1728205,USC00123527,2010-03-16,72.0,,3.0,2010
1729330,USC00123527,2013-04-14,106.0,,5.0,2013
1729331,USC00123527,2013-04-15,217.0,,0.0,2013


In [11]:
weather_df[weather_df[['maximum_temperature', 'minimum_temperature', 'total_precipitation']].isna().all(1)]

Unnamed: 0,weather_station_id,record_date,maximum_temperature,minimum_temperature,total_precipitation,year
654,USC00257515,1986-10-17,,,,1986
655,USC00257515,1986-10-18,,,,1986
656,USC00257515,1986-10-19,,,,1986
657,USC00257515,1986-10-20,,,,1986
658,USC00257515,1986-10-21,,,,1986
...,...,...,...,...,...,...
1718573,USC00256970,2013-08-31,,,,2013
1721051,USC00123527,1990-08-14,,,,1990
1726617,USC00123527,2005-11-09,,,,2005
1726722,USC00123527,2006-02-22,,,,2006


##### We will now  drop these records leaving us with (1,729,957 - 11,898 = 1,718,059  rows)

In [12]:
weather_df = weather_df.dropna(subset=['maximum_temperature', 'minimum_temperature', 'total_precipitation'], how='all')
weather_df

Unnamed: 0,weather_station_id,record_date,maximum_temperature,minimum_temperature,total_precipitation,year
0,USC00257515,1985-01-01,-67.0,-167.0,0.0,1985
1,USC00257515,1985-01-02,11.0,-139.0,0.0,1985
2,USC00257515,1985-01-03,61.0,-111.0,0.0,1985
3,USC00257515,1985-01-04,72.0,-33.0,0.0,1985
4,USC00257515,1985-01-05,122.0,-78.0,0.0,1985
...,...,...,...,...,...,...
1729952,USC00123527,2014-12-27,106.0,6.0,13.0,2014
1729953,USC00123527,2014-12-28,78.0,-11.0,122.0,2014
1729954,USC00123527,2014-12-29,6.0,-61.0,0.0,2014
1729955,USC00123527,2014-12-30,17.0,-61.0,0.0,2014


##### There are 4820 unique weather station/ year combinations.  For the most part there are 365/366 records shown per weather station, as expected when we take leap years into account.  However, there are exceptions as seen in 1986 for weather station USC00110072.

In [13]:
weather_df.groupby(['weather_station_id', 'year']).count()

Unnamed: 0_level_0,Unnamed: 1_level_0,record_date,maximum_temperature,minimum_temperature,total_precipitation
weather_station_id,year,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
USC00110072,1985,365,365,363,365
USC00110072,1986,273,273,273,273
USC00110072,1987,365,365,365,365
USC00110072,1988,366,366,366,366
USC00110072,1989,365,365,365,365
...,...,...,...,...,...
USC00339312,2010,365,365,365,365
USC00339312,2011,365,365,365,365
USC00339312,2012,366,366,366,366
USC00339312,2013,365,365,365,365


##### For each weather station, the average of its (non-Nan) columns is calculated using either the 'mean'  or 'sum' functions with 'groupby'. Below is a snippet of the 167 weather stations and the calculated values of their respective columns.  Also note that the results are stored in a new dataframe

In [14]:
renamed_columns = {'maximum_temperature': 'average_maximum_temperature',
                   'minimum_temperature': 'average_minimum_temperature',
                   'total_precipitation':'total_precipitation'}

weather_calculated_df = weather_df.groupby(['weather_station_id', 'year']).agg({'maximum_temperature':'mean',
                                                        'minimum_temperature':'mean',
                                                        'total_precipitation':'sum'
                                                       }).rename(columns=renamed_columns).reset_index()
weather_calculated_df

Unnamed: 0,weather_station_id,year,average_maximum_temperature,average_minimum_temperature,total_precipitation
0,USC00110072,1985,153.347945,43.264463,7801.0
1,USC00110072,1986,126.963370,21.761905,5053.0
2,USC00110072,1987,177.602740,63.298630,7936.0
3,USC00110072,1988,173.472678,45.349727,5410.0
4,USC00110072,1989,156.515068,39.835616,7937.0
...,...,...,...,...,...
4815,USC00339312,2010,161.676712,43.347945,10340.0
4816,USC00339312,2011,163.076712,50.460274,13607.0
4817,USC00339312,2012,177.983607,51.650273,9108.0
4818,USC00339312,2013,154.736986,40.183562,10990.0


In [15]:
try:
    weather_station = WeatherStationAnnualRecord()
    weather_station.initialize()
    #logging.debug(weather_station.__dict__)
except Exception as e:
    print(f"Error(s) when working with WeatherStationAnnualRecord object: {str(e)}")

start_ingest_time = datetime.now()
weather_station.db.dataframe_to_postgresql(weather_calculated_df,
                                           WeatherStationAnnualRecord.WEATHER_STATION_ANNUAL_RECORD)
end_ingest_time = datetime.now()
print(f"Started Table Population @ {start_ingest_time}")
print(f"Ended Table Population @ {end_ingest_time}")
duration = end_ingest_time - start_ingest_time
print(f"Table Population Duration: {duration}")

Started Table Population @ 2023-02-17 09:47:47.403397
Ended Table Population @ 2023-02-17 09:47:47.656306
Table Population Duration: 0:00:00.252909


When deploying to the cloud, I will look to first find a tool that is agnostic in case I decide to switch cloud providers down the road.  A great tool for IoC and automation purposes, such as deployment, is Terraform.  Terraform allows one to build or to tear an infrastructure arrangement (provisioning) with the press of a button.  Unlike Ansible which leans more toward configuration management, Terraform is a declarative language that will ensure the right amount of declared resources remains the same regardless of the number of times the plan is executed 

If I wanted to look for a simple, fully partially managed solution, I would choose cloud services similar to API Gateway and Lamda to expose my API.  Of course, we will need to focus on other concepts first that would improve security, isolation, organization, performance, logging etc. such as firewalls and load balancers, Identity and Access Management, Cloud Trail/Cloud Watch logging/monitoring, etc., but that is irrelevant for this exercise.

Although I am using AWS terminology in this passage, the concepts are the same among the major cloud providers.  For instance, finance tends to be the biggest factor to making the appropriate business decisions.  Managed services are great until you see the bill, for example.  The question becomes, how much does one really want to labor over the installation and maintenance of my infrastructure vs how much does one want to pay.  Databases and applications on EC2 instances may be the only option for a company if they find Aurora databases too expensive.

There are also cloud services that propel the CI/CD process such as AWS Step Functions that may replace the familiar Jenkins, Bamboo, Gitlab, etc.  At the end of the day, they all do the same.  They all can coordinate blue/green, canary scenarios, among many other things.

There's a lot to discuss.