In [8]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, ForeignKey, Date, Text, Boolean
from pprint import pprint as pp
from datetime import datetime

from config import local_mysql_password, local_mysql_user
from helpers import FIPS_10_country_codes, uwm_location_to_fips_country_map

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 200)

## Combine the process of importing the CSV and exporting to the database and split the process to handle chunks of data

In [9]:
class DatabaseInterface:
    def __init__(self,
                 db_name,
                 user,
                 password,
                 host='localhost',
                 port=3306,
                 driver='mysql+pymysql',
                 show_query_output=False):

        with create_engine(f'{driver}://{user}:{password}@{host}').connect() as db_connection:
            db_connection.execute(f"CREATE DATABASE IF NOT EXISTS {db_name}")
        self.engine = create_engine(f"{driver}://{user}:{password}@{host}:{port}/{db_name}", echo=show_query_output)

    def insert_data(self, df: pd.DataFrame, table_name: str, dtypes: dict = {}, if_exists: str = 'replace', use_df_index: bool = False, chunk_size: int = 500):
        df.to_sql(
            name=table_name, 
            con=self.engine, 
            if_exists=if_exists, 
            index=use_df_index,
            chunksize=chunk_size,
            dtype=dtypes
        )

    def set_existing_field_as_primary_key(self, table_name: str, primary_key: str, constraints: str = ''):
        with self.engine.connect() as con:
            con.execute(f"ALTER TABLE `{table_name}` ADD PRIMARY KEY (`{primary_key}`) {constraints};")

    def add_new_field_as_primary_key(self, table_name: str, primary_key: str, primary_key_dtype: str, constraints: str = ''):
        with self.engine.connect() as con:
            con.execute(f"ALTER TABLE `{table_name}` ADD `{primary_key}` {primary_key_dtype} PRIMARY KEY {constraints};")

    def set_foreign_key(self, table_name: str, foreign_key: str, foreign_table_name: str, foreign_table_key:str):
        with self.engine.connect() as con:
            con.execute(f"ALTER TABLE `{table_name}` ADD FOREIGN KEY (`{foreign_key}`) REFERENCES `{foreign_table_name}`(`{foreign_table_key}`);")

    def close_connection(self):
        self.engine.dispose()
        
        
        
data_types_to_keep = [
    {'data_type_id': 'PRCP', 'description': 'Precipitation', 'units': 'tenths of mm'},
    {'data_type_id': 'SNOW', 'description': 'Snowfall', 'units': 'mm'},
    {'data_type_id': 'SNWD', 'description': 'Snow depth', 'units': 'mm'},
    {'data_type_id': 'TMAX', 'description': 'Maximum temperature', 'units': 'tenths of degrees C'},
    {'data_type_id': 'TMIN', 'description': 'Minimum temperature', 'units': 'tenths of degrees C'},
    {'data_type_id': 'ACMH', 'description': 'Average cloudiness midnight to midnight from manual observations', 'units': 'percent'},
    {'data_type_id': 'AWND', 'description': 'Average daily wind speed', 'units': 'tenths of meters per second'},
    {'data_type_id': 'EVAP', 'description': 'Evaporation of water from evaporation pan', 'units': 'tenths of mm'},
    {'data_type_id': 'FRTH', 'description': 'Thickness of frozen ground layer', 'units': 'cm'},
    {'data_type_id': 'PSUN', 'description': 'Daily percent of possible sunshine', 'units': 'percent'},
    {'data_type_id': 'THIC', 'description': 'Thickness of ice on water', 'units': 'tenths of mm'},
    {'data_type_id': 'TOBS', 'description': 'Temperature at the time of observation', 'units': 'tenths of degrees C'},
    {'data_type_id': 'TSUN', 'description': 'Daily total sunshine', 'units': 'minutes'},
    {'data_type_id': 'WDMV', 'description': '24-hour wind movement', 'units': 'km'},
    {'data_type_id': 'TAVG', 'description': "Average temperature', 'units': 'tenths of degrees C"},
    {'data_type_id': 'WT01', 'description': "Weather type: Fog, ice fog, or freezing fog (may include heavy fog)", 'units': 'Boolean'},
    {'data_type_id': 'WT02', 'description': "Weather type: Heavy fog or heaving freezing fog (not always distinguished from fog)", 'units': 'Boolean'},
    {'data_type_id': 'WT03', 'description': "Weather type: Thunder", 'units': 'Boolean'},
    {'data_type_id': 'WT04', 'description': "Weather type: Ice pellets, sleet, snow pellets, or small hail", 'units': 'Boolean'},
    {'data_type_id': 'WT05', 'description': "Weather type: Hail (may include small hail)", 'units': 'Boolean'},
    {'data_type_id': 'WT06', 'description': "Weather type: Glaze or rime", 'units': 'Boolean'},
    {'data_type_id': 'WT07', 'description': "Weather type: Dust, volcanic ash, blowing dust, blowing sand, or blowing obstruction", 'units': 'Boolean'},
    {'data_type_id': 'WT08', 'description': "Weather type: Smoke or haze", 'units': 'Boolean'},
    {'data_type_id': 'WT09', 'description': "Weather type: Blowing or drifting snow", 'units': 'Boolean'},
    {'data_type_id': 'WT10', 'description': "Weather type: Tornado, waterspout, or funnel cloud", 'units': 'Boolean'},
    {'data_type_id': 'WT11', 'description': "Weather type: High or damaging winds", 'units': 'Boolean'},
    {'data_type_id': 'WT12', 'description': "Weather type: Blowing spray", 'units': 'Boolean'},
    {'data_type_id': 'WT13', 'description': "Weather type: Mist", 'units': 'Boolean'},
    {'data_type_id': 'WT14', 'description': "Weather type: Drizzle", 'units': 'Boolean'},
    {'data_type_id': 'WT15', 'description': "Weather type: Freezing drizzle", 'units': 'Boolean'},
    {'data_type_id': 'WT16', 'description': "Weather type: Rain (may include freezing rain, drizzle, and freezing drizzle)", 'units': 'Boolean'},
    {'data_type_id': 'WT17', 'description': "Weather type: Freezing rain", 'units': 'Boolean'},
    {'data_type_id': 'WT18', 'description': "Weather type: Snow, snow pellets, snow grains, or ice crystals", 'units': 'Boolean'},
    {'data_type_id': 'WT19', 'description': "Weather type: Unknown source of precipitation", 'units': 'Boolean'},
    {'data_type_id': 'WT21', 'description': "Weather type: Ground fog", 'units': 'Boolean'},
    {'data_type_id': 'WT22', 'description': "Weather type: Ice fog or freezing fog", 'units': 'Boolean'},
]

data_ids_to_keep = [data_type['data_type_id'] for data_type in data_types_to_keep]
        
crop_yield_country_data = pd.read_csv('./country_codes.csv')
crop_yield_country_data.head()

Unnamed: 0.1,Unnamed: 0,country_name,FIPS_10_country_code,noaa_country_code,fao_country_code,fao_year_min,fao_year_max
0,1,Albania,AL,AL,3,1961,2020
1,2,Algeria,AG,AG,4,1961,2020
2,3,Angola,AO,AO,7,1961,2020
3,5,Argentina,AR,AR,9,1961,2020
4,6,Armenia,AM,AM,1,1992,2020


In [10]:
# Create the table with a predefined autoincrementing primary key so further data can be added
dbi = DatabaseInterface(db_name='crop_yield_prediction', 
                        user=local_mysql_user, 
                        password=local_mysql_password)


# meta = MetaData()
# meta.bind = dbi.engine
# meta.reflect()



noaa_ghcnd_file_path = './data/noaa_ghcn_aws_data/'
for file_name in ['2019', '2018', '2017', '2016', '2015', 
                  '2014', '2013', '2012', '2011', '2010', '2009', '2008']:

    custom_date_parser = lambda x: datetime.strptime(x, "%Y%m%d")

    with pd.read_csv(
        f"{noaa_ghcnd_file_path}{file_name}.csv", 
        header=None, 
        usecols=[0,1,2,3],
        names=['station_id', 'date', 'data_type_id', 'data_value'],#, 'measurement_id', 'quality_id', 'source_id', 'observation_time'],
        chunksize=50000, 
        parse_dates=['date'],
        date_parser=custom_date_parser,
        ) as reader:

       chunk_num = 1
       for noaa_ghcn_aws_data in reader: 

          # Extract country code from station_id
          noaa_ghcn_aws_data['noaa_country_code'] = noaa_ghcn_aws_data.station_id.str[:2]
          # Drop countries that are not in the fao dataset
          noaa_ghcn_aws_data = noaa_ghcn_aws_data[noaa_ghcn_aws_data.noaa_country_code.isin(crop_yield_country_data.noaa_country_code)]
          # Drop unnecessary data types
          noaa_ghcn_aws_data = noaa_ghcn_aws_data[noaa_ghcn_aws_data.data_type_id.isin(data_ids_to_keep)]
          # Pivot the table.
          noaa_weather_data = noaa_ghcn_aws_data.pivot(index=['noaa_country_code', 'station_id', 'date'], columns='data_type_id', values='data_value')
          # Fill in the weather type columns with zeroes because they are one-hot encoded/binary
          weather_type_cols = noaa_weather_data.columns[noaa_weather_data.columns.str.contains('WT')]
          noaa_weather_data[weather_type_cols] = noaa_weather_data[weather_type_cols].fillna(0)

          noaa_weather_data.reset_index(inplace=True)

          dbi.insert_data(noaa_weather_data, 'noaa_weather_data', if_exists='append')

          print(f'{file_name}: {chunk_num} chunks processed, {noaa_weather_data.date.min()}')
          chunk_num += 1

print("Closing connection")
dbi.close_connection()
print("Done")

# 347 chunks = 39min 26sec

InternalError: (pymysql.err.InternalError) (3, "Error writing file '/tmp/MLfd=44' (OS errno 28 - No space left on device)")
[SQL: INSERT INTO noaa_weather_data (noaa_country_code, station_id, date, `EVAP`, `PRCP`, `SNOW`, `SNWD`, `TAVG`, `TMAX`, `TMIN`, `TOBS`, `WT01`) VALUES (%(noaa_country_code)s, %(station_id)s, %(date)s, %(EVAP)s, %(PRCP)s, %(SNOW)s, %(SNWD)s, %(TAVG)s, %(TMAX)s, %(TMIN)s, %(TOBS)s, %(WT01)s)]
[parameters: ({'noaa_country_code': 'AS', 'station_id': 'ASN00009519', 'date': datetime.datetime(2019, 1, 1, 0, 0), 'EVAP': None, 'PRCP': 0.0, 'SNOW': None, 'SNWD': None, 'TAVG': None, 'TMAX': 260.0, 'TMIN': 155.0, 'TOBS': None, 'WT01': 0.0}, {'noaa_country_code': 'AS', 'station_id': 'ASN00009520', 'date': datetime.datetime(2019, 1, 1, 0, 0), 'EVAP': None, 'PRCP': 0.0, 'SNOW': None, 'SNWD': None, 'TAVG': None, 'TMAX': None, 'TMIN': None, 'TOBS': None, 'WT01': 0.0}, {'noaa_country_code': 'AS', 'station_id': 'ASN00009523', 'date': datetime.datetime(2019, 1, 1, 0, 0), 'EVAP': None, 'PRCP': 0.0, 'SNOW': None, 'SNWD': None, 'TAVG': None, 'TMAX': None, 'TMIN': None, 'TOBS': None, 'WT01': 0.0}, {'noaa_country_code': 'AS', 'station_id': 'ASN00009527', 'date': datetime.datetime(2019, 1, 1, 0, 0), 'EVAP': None, 'PRCP': 0.0, 'SNOW': None, 'SNWD': None, 'TAVG': None, 'TMAX': None, 'TMIN': None, 'TOBS': None, 'WT01': 0.0}, {'noaa_country_code': 'AS', 'station_id': 'ASN00009531', 'date': datetime.datetime(2019, 1, 1, 0, 0), 'EVAP': None, 'PRCP': 0.0, 'SNOW': None, 'SNWD': None, 'TAVG': None, 'TMAX': None, 'TMIN': None, 'TOBS': None, 'WT01': 0.0}, {'noaa_country_code': 'AS', 'station_id': 'ASN00009532', 'date': datetime.datetime(2019, 1, 1, 0, 0), 'EVAP': None, 'PRCP': 0.0, 'SNOW': None, 'SNWD': None, 'TAVG': None, 'TMAX': None, 'TMIN': None, 'TOBS': None, 'WT01': 0.0}, {'noaa_country_code': 'AS', 'station_id': 'ASN00009534', 'date': datetime.datetime(2019, 1, 1, 0, 0), 'EVAP': None, 'PRCP': 0.0, 'SNOW': None, 'SNWD': None, 'TAVG': None, 'TMAX': None, 'TMIN': None, 'TOBS': None, 'WT01': 0.0}, {'noaa_country_code': 'AS', 'station_id': 'ASN00009538', 'date': datetime.datetime(2019, 1, 1, 0, 0), 'EVAP': None, 'PRCP': 0.0, 'SNOW': None, 'SNWD': None, 'TAVG': None, 'TMAX': 338.0, 'TMIN': 170.0, 'TOBS': None, 'WT01': 0.0}  ... displaying 10 of 500 total bound parameter sets ...  {'noaa_country_code': 'AS', 'station_id': 'ASN00014821', 'date': datetime.datetime(2019, 1, 1, 0, 0), 'EVAP': None, 'PRCP': 40.0, 'SNOW': None, 'SNWD': None, 'TAVG': None, 'TMAX': None, 'TMIN': None, 'TOBS': None, 'WT01': 0.0}, {'noaa_country_code': 'AS', 'station_id': 'ASN00014824', 'date': datetime.datetime(2019, 1, 1, 0, 0), 'EVAP': None, 'PRCP': 150.0, 'SNOW': None, 'SNWD': None, 'TAVG': None, 'TMAX': None, 'TMIN': None, 'TOBS': None, 'WT01': 0.0})]
(Background on this error at: https://sqlalche.me/e/14/2j85)