## Script for extracting clean temperature data from BigQuery
This runs on datalab.
The end result are the 1666 weather station files each containing min and max daily temperature data spanning 50 years.
 - from 1960-01-02
 - to 2009-12-31 included.
 
All files have the exact same number of entries and the same date spans. Some interpolation
was necessary to fill measurement gaps in the original data. Filled-in data points are marked as such in the "interpolated" column. Weather stations were selected based on the following criteria:
 - no more than 300 gaps
 - no more than 30 consecutive gaps
 
The script runs for a long time. It also has a tendency to fail after a whie and must be restarted. It will skip stations that have already been downloaded but it will retry stations that have been discarded as incomplete. You can fill in the list of discarded stations in 'stations_skip' to avoid re-loading and discarding them again.

***Results are now available on a public GCS bucket: gs://good-temperatures-public***

In [1]:
import pandas as pd
import os
from __future__ import print_function
import google.datalab.bigquery as bq

In [2]:
DATADIR = "good_temperature_data"
if not os.path.exists(DATADIR):
  os.mkdir(DATADIR)

In [None]:
%bq query --name daily_tminmax
# Format weather station data between 1960 and 2010
# as daily min and max temperatures
SELECT stationid, date, daystamp,
      MIN(tmin) AS tmini,
      MAX(tmax) AS tmaxi
    FROM(
      SELECT
        wx.id AS stationid,
        wx.date AS date,
        wx.element AS element,
        wx.value AS value,
        UNIX_SECONDS(CAST(wx.date AS TIMESTAMP))/(24*3600) AS daystamp,
        IF (wx.element = 'TMIN', wx.value/10, NULL) AS tmin,
        IF (wx.element = 'TMAX', wx.value/10, NULL) AS tmax
      FROM
        (SELECT * FROM `bigquery-public-data.ghcn_d.ghcnd_1*` UNION ALL
        SELECT * FROM `bigquery-public-data.ghcn_d.ghcnd_2*`) AS wx
      WHERE
        #id = 'USC00457478' AND
        (wx.element ='TMIN'OR wx.element ='TMAX')
    )
    WHERE
      daystamp < 14610      # year 2010
      AND daystamp > -3653  # year 1960
    GROUP BY
      stationid, date, daystamp
    HAVING tmini IS NOT NULL AND tmaxi IS NOT NULL

In [None]:
%bq query --name good_stations --subqueries daily_tminmax
# select stations that have an almost uninterrupted series of data records
# allow 100 holes max
SELECT stationid, MIN(daystamp) AS minday, MAX(daystamp) AS maxday, COUNT(tmini) AS count_mini, count(tmaxi) AS count_maxi,
       MAX(daystamp)-MIN(daystamp)-COUNT(tmini) AS nb_holes, MAX(daystamp)-MIN(daystamp) AS nb_days
FROM daily_tminmax
GROUP BY stationid
HAVING nb_holes <300 AND nb_days >= 50*365-300

In [None]:
%bq query --name get_one_station --subqueries daily_tminmax good_stations
# Select weather stations with an at least 50-year measurement span (1960-2010)
# and a maximum of 100 measurement gaps. This yields 1221 weather stations and
# approx. 22M days of min-max temperature data.

# Get daily min-max data from all these stations
SELECT *
FROM daily_tminmax
WHERE
  daily_tminmax.stationid = @thisstation AND
  daily_tminmax.stationid IN (SELECT stationid FROM good_stations)
ORDER BY daystamp

In [None]:
%bq query --name good_stationids --subqueries good_stations
SELECT stationid
FROM good_stations

In [None]:
# 'ASN00014015'
# this one does not work. Gets a "undeclared query parameters" argument
#df = get_one_station.execute(output_options=bq.QueryOutput.dataframe(), query_params={'thisstation':'ASN00014015'}).result()
#df.head(100)

In [None]:
def find_gaps(dfmm, maxgaps):
  i = 1
  prevday = dfmm.iloc[0]['daystamp']
  gaps = [0 for _ in range(maxgaps+1)]
  for _, row in dfmm.iterrows():
    day = row['daystamp']
    if i > 0:
      for k in range(maxgaps):
        if day - prevday == k+2:
          gaps[k] += 1
      if day - prevday > maxgaps+1:
        gaps[maxgaps] += 1
    prevday = day
    i+=1
    
  return gaps

In [None]:
stations_df = good_stationids.execute(output_options=bq.QueryOutput.dataframe()).result()

In [None]:
#Scan directory for files that have already been processed
stations_done = []
for filename in os.listdir(DATADIR):
  print(filename[:-4])
  stations_done.append(filename[:-4])
# skip stations that have already been done
stations_todo = stations_df.loc[~stations_df['stationid'].isin(stations_done)]
# skip additional stations
stations_skip = []
stations_todo = stations_todo.loc[~stations_todo['stationid'].isin(stations_skip)]
stations_todo

In [None]:
LARGEST_GAP = 31
for i, stationid in stations_todo.iterrows():
  sid = stationid['stationid']
  print(sid)
  q = get_one_station.sql.replace('@thisstation', "\'"+sid+"\'")
  df = bq.Query(q).execute_async(output_options=bq.QueryOutput.dataframe()).result()
  gaps = find_gaps(df, maxgaps=LARGEST_GAP+1)
  print(gaps)
  if gaps[LARGEST_GAP+1] == 0:
    # interpolate missing values
    # but only on stations where the gaps are no larger than LARGEST_GAP days
    ddf = df.set_index(pd.DatetimeIndex(df['date']))
    ddf = ddf.resample("1D").interpolate()
    ddf = ddf.assign(interpolated=pd.isnull(ddf['stationid']).astype(int)) # mark interpolated values as such
    print(find_gaps(ddf, maxgaps=LARGEST_GAP+1))
    ddf.to_csv(DATADIR+"/"+sid+".csv", columns=['tmini', 'tmaxi', 'interpolated']) # and also the date
  else:
    print("Gaps too large. Dropped.")

## Additional head and tail cleaning
Lists files that are missing only a few lines at the begining or the end.
You can add the missing lines manually by duplicating the first/last lines
and marking them as "interpolated". This code only lists the files, all
corrections are manual.

In [3]:
bad_files=[]
good_files = []
recoverable_files=[]
for filename in os.listdir(DATADIR):
  with open(DATADIR + "/" + filename, mode="r") as f:
    line = f.readline()
    linenb = 1
    while line:
      line = f.readline()
      linenb += 1  
      if linenb == 2:
        firstline = line
    # Good ?
    if linenb == 18264 and firstline[:10] == "1960-01-02":
      good_files.append(filename)
      print('.', end='')
    # Bad ?
    else:
      print()
      print(linenb)
      print(firstline, end='')
      bad_files.append(filename)
      # Recoverable ?
      if (linenb == 18264 or
          linenb == 18263 or
          linenb == 18262 or
          linenb == 18261):
        if (firstline[:10] == "1960-01-02" or
            firstline[:10] == "1960-01-03" or
            firstline[:10] == "1960-01-04" or
            firstline[:10] == "1960-01-05"):
          print("recoverable")
          recoverable_files.append(bad_files.pop()) # transfer it to recoverable list

........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

In [4]:
print("Bad files:", len(bad_files))
print(bad_files)
print("Recoverable files:", len(recoverable_files))
print(recoverable_files)
print("Good files:", len(good_files))
print(good_files)

Bad files: 0
[]
Recoverable files: 0
[]
Good files: 1666
['USW00003822.csv', 'USW00023258.csv', 'RSM00028537.csv', 'USC00215598.csv', 'RSM00027823.csv', 'RSM00030393.csv', 'USC00163313.csv', 'RSM00023884.csv', 'RSM00024641.csv', 'NOE00109514.csv', 'USC00416792.csv', 'CHM00050527.csv', 'CHM00059265.csv', 'USW00023072.csv', 'CHM00054027.csv', 'USC00425402.csv', 'RSM00025563.csv', 'USW00023066.csv', 'USC00053005.csv', 'USC00295617.csv', 'USC00124272.csv', 'USW00024090.csv', 'USC00407884.csv', 'USC00203661.csv', 'USC00315771.csv', 'NO000050540.csv', 'USC00113384.csv', 'USC00352374.csv', 'CHM00052495.csv', 'CA004028060.csv', 'GMW00034068.csv', 'USC00352406.csv', 'USC00233601.csv', 'USW00093821.csv', 'CHM00056778.csv', 'USW00024133.csv', 'USW00024127.csv', 'CHM00053614.csv', 'CHM00051765.csv', 'CHM00058027.csv', 'USC00332251.csv', 'AU000011801.csv', 'USW00014755.csv', 'CA004020560.csv', 'USC00134101.csv', 'CA007060400.csv', 'USW00013748.csv', 'USC00298535.csv', 'USC00355221.csv', 'IT00001632