In [102]:

%load_ext autoreload
%load_ext Cython

%autoreload 2

import importlib


import pandas as pd
import os
import re
from tqdm import tqdm
import math
import numpy as np

# Explicitely For Crawling
import requests
import sys
from lxml import etree
import asyncio
import aiohttp  # pip install aiohttp
import aiofiles  # pip install aiofiles

import compress as comp

import nest_asyncio
nest_asyncio.apply()

OUTPUT_FILE_PATH = "sources"

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


[autoreload of compress failed: Traceback (most recent call last):
  File "c:\tmp\rtvis22-cde\.venv\Lib\site-packages\IPython\extensions\autoreload.py", line 273, in check
    superreload(m, reload, self.old_objects)
  File "c:\tmp\rtvis22-cde\.venv\Lib\site-packages\IPython\extensions\autoreload.py", line 471, in superreload
    module = reload(module)
             ^^^^^^^^^^^^^^
  File "C:\Users\Vorto\AppData\Local\Programs\Python\Python311\Lib\importlib\__init__.py", line 169, in reload
    _bootstrap._exec(spec, module)
  File "<frozen importlib._bootstrap>", line 621, in _exec
  File "<frozen importlib._bootstrap_external>", line 936, in exec_module
  File "<frozen importlib._bootstrap_external>", line 1074, in get_code
  File "<frozen importlib._bootstrap_external>", line 1004, in source_to_code
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "c:\tmp\rtvis22-cde\data\compress.py", line 2
    %load_ext Cython
    ^
SyntaxError: invalid syntax


ModuleNotFoundError: No module named 'Cython'

# Fetch Station Summaries
Since there is no already zipped file of the station data @http://berkeleyearth.lbl.gov/auto/Stations/TAVG/Text/, I had to write a little crawler which downloads all of the files.

In [None]:
# If started you need to completely kill the kernel to get rid of the asynchronous download tasks
LOCAL_FILE_PATH = "sources/berkley_station_summaries/"
WEB_FILE_PATH = "http://berkeleyearth.lbl.gov/auto/Stations/TAVG/Text/"

def getSiteDom(url):
    resp = requests.get(url)
    if resp.status_code != 200:
        print(f"Couldn't download url {url}")
        return None
    return etree.HTML(resp.text)

def downloadFile(url, lpath):
    resp = requests.get(url)
    if resp.status_code != 200:
        print(f"Couldn't download file {url}")
        return
    open(lpath, "wb").write(resp.content)

def downloadFilesParallel(urls, lpaths, maxp = 10):
    sema = asyncio.BoundedSemaphore(maxp)
    pbar = tqdm(total=len(urls), desc="Downloading files")
    async def fetch_file(url, lpath):
        async with sema, aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                if resp.status != 200:
                    print(f" -> Couldn't download file {url}")
                    pbar.update(1)
                    return
                data = await resp.read()
        async with aiofiles.open(lpath, "wb") as outfile:
            await outfile.write(data)
            pbar.update(1)

    async def fetch_files(urls, lpaths):
        tasks = [asyncio.ensure_future(fetch_file(url,lpath)) for url,lpath in zip(urls, lpaths)]
        await asyncio.gather(*tasks)
    
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(fetch_files(urls, lpaths))
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
    pbar.close()

def getAllFileNames(url):
    dom = getSiteDom(url)
    elements = dom.xpath("//td/a[contains(@href,'.txt')]")
    ret = []
    for element in elements:
        ret.append(element.get("href"))
    return ret

filenames = getAllFileNames(WEB_FILE_PATH)
#filenames = filenames[slice(20)]
urls = [WEB_FILE_PATH + filename for filename in filenames]
lpaths = [LOCAL_FILE_PATH + filename for filename in filenames]
files = len(os.listdir(LOCAL_FILE_PATH))
if (len(os.listdir(LOCAL_FILE_PATH)) >= len(urls)):
    print("Aborted because more or the same amount of files exists in the download folder than there are to download.")
#downloadFilesParallel(urls, lpaths, maxp=20)
#for filename in tqdm(filenames, desc="Downloading files"):
#    url = WEB_FILE_PATH + filename
#    localFile = LOCAL_FILE_PATH + filename
#    downloadFile(url, localFile)

### Import Station Summaries
Imports all files inside `data\sources\berkley_station_summaries`. Buffers it in a binary file.

In [41]:
LOCAL_FILE_PATH = "sources/berkley_station_summaries"
OUTPUT_FILE_BINARY = "sources/stationummaries.parquet.gzip"

NEW_PART_EVERY = 1000
FORCE_RECREATION = False

#name = 0.80N-8.84E
def getDataForOnePosition(id):
    file = f"{LOCAL_FILE_PATH}/{id}-TAVG-Data.txt"

    # Extract Data from comment at begin of trend file:
    chunk = ""
    with open(file) as myFile:
        chunk = myFile.read(4069)

    #x = re.findall(r"for the location:[\s%]+([\d.\dNSEW ]+),([\d.\dNSEW ]+)", chunk); #for location
    lat = re.findall(r"Latitude:\s+([\w\S ]+) \+", chunk)
    lat = lat[0] if len(lat) > 0 else ""
    lon = re.findall(r"Longitude:\s+([\w\S ]+) \+", chunk)
    lon = lon[0] if len(lon) > 0 else ""
    if lat == "" or lon == "":
        #print(f"Couldn't  evaluate latitude and longitude for station {id}")
        return None

    header = ["Year", "Month", "Raw Data Temperature", "Raw Data Anomaly", "QC Failed",   "Continuity Breaks", "Adjusted Data Temperature", "Adjusted Data Anomaly", "Regional Temperature", "Regional Anomaly"]
    df = pd.read_csv(file, delimiter="\s+", header=None, comment="%", names=header, usecols = [0,1,2,3,6,7,8,9], encoding='latin-1')

    df["lat"] = lat
    df["lon"] = lon
    df["stationId"] = id
    return df


def getIDsFromDirectory(dir):
    ids = set()
    for path in os.listdir(dir):
        if not os.path.isfile(os.path.join(dir, path)):
            continue
        if not "-TAVG-Data.txt" in path:
            continue
        ids.add(path.replace("-TAVG-Data.txt", ""))
    return ids

if os.path.isfile(OUTPUT_FILE_BINARY) and not FORCE_RECREATION:
    df_merged = pd.read_parquet(OUTPUT_FILE_BINARY)
    print("Data imported from binary file!!!")
else:
    allIds = getIDsFromDirectory(LOCAL_FILE_PATH)
    idCount = len(allIds)
    partCount = math.ceil(idCount / NEW_PART_EVERY)
    curPartId = 0
    df_parts = [ pd.DataFrame({'A':[]}) for _ in range(partCount) ]
    sumEntries = 0
    pbar = tqdm(total=idCount, desc="Merging files")
    for ind, id in enumerate(allIds):
        pbar.update(1)
        pbar.set_description(f"Merging files (Counts - Part:{len(df_parts[curPartId])}, Overall:{len(df_parts[curPartId]) + sumEntries}) [Part {curPartId + 1}/{partCount}]")
        #pbar.write(f"Working on {positionName}...")
        df = getDataForOnePosition(id)
        if not df is None and len(df) > 0:
            #pbar.write(f" -> Read {len(df)} entries")
            if len(df_parts[curPartId]) > 0:
                df_parts[curPartId] = pd.concat([df_parts[curPartId], df], ignore_index=True)
            else:
                df_parts[curPartId] = df
        
        if ind > 0 and ind % NEW_PART_EVERY == 0:
            sumEntries += len(df_parts[curPartId])
            curPartId += 1
    pbar.close()

    df_merged = pd.DataFrame({'A':[]})
    for pId in tqdm(range(partCount), desc="Merging dataframe parts"):
        if len(df_merged) > 0:
            df_merged = pd.concat([df_merged, df_parts[pId]], ignore_index=True)
        else:
            df_merged = df_parts[pId]

    df_parts = None
    print("Saving to binary file...")
    df_merged.to_parquet(OUTPUT_FILE_BINARY, compression="gzip")

df_stations = df_merged

Data imported from binary file!!!


In [None]:

df = df_stations
df.rename(columns={"Adjusted Data Temperature":"AverageTemperature", "lat":"Latitude", "lon":"Longitude"}, inplace=True)
df = df[["Year", "Month", "AverageTemperature", "Latitude", "Longitude"]]
df = df.astype({
        'Latitude': np.float32, 
        'Longitude': np.float32,
        'Year': np.uint32,
        'Month': np.uint32,
        'AverageTemperature': np.float32,
        'AverageTemperatureUncertainty': np.float32,
        'Interpolated': bool
        })
df.dtypes

### Generate Output File

In [42]:
df_data = df_stations
df_data.rename(columns={"Adjusted Data Temperature":"AverageTemperature", "lat":"Latitude", "lon":"Longitude"}, inplace=True)
df_data = df_data[["Year", "Month", "AverageTemperature", "Latitude", "Longitude"]]

In [101]:
#importlib.reload(comp)
comp.compress_dataset(df_data, output_path=OUTPUT_FILE_PATH, discretizeresolution=1)

Dropping NaNs... -> removed 5312332 rows
Converting data types...
Creating location ID... -> Found 40596 distinct locations
Merging rows with same year, month and locid... -> merged 141757 rows
Creating CTILB ID... -> Found 498148 distinct continous temperature index blocks
Get Header-Data...
Create DM-Column...
== HEADER ==
 -> Counts: Temperatures=15841847; Locations=40596; CTILBs=498148
 -> Datebounds:  {'db_first_year': 1701, 'db_last_year': 2013, 'db_first_month': 1, 'db_last_month': 10}
 -> Temperaturebounds:  {'db_min_temp': -72.312, 'db_max_temp': 42.108}
 -> Byte-Counts: bc_temperature=1, bc_temperatureindex=3, bc_monthdifference=2, bc_ctilbindex=3
Discretize Temperature...
Calculating discretization error...
       Discretization error  Discretization error with Uncertainty
count          1.584185e+07                           1.584185e+07
mean           2.243828e-01                           2.243828e-01
std            1.295455e-01                           1.295455e-01
min 

Unnamed: 0,locid,Year,Month,AverageTemperature,AverageTemperatureUncertainty,Latitude,Longitude,Interpolated,ctilbid,dm,disTemp,disError,disErrorUnc
5278,39,1982,8,-72.311996,0.0,-78.45507,106.846252,False,315,3379,0,0.0,0.0
5605,41,1958,8,-71.900002,0.0,-78.400002,87.599998,False,348,3091,0,0.411995,0.411995


Unnamed: 0,locid,Year,Month,AverageTemperature,AverageTemperatureUncertainty,Latitude,Longitude,Interpolated,ctilbid,dm,disTemp,disError,disErrorUnc
0,0,1957,1,-27.799999,0.0,-90.000000,0.000000,False,0,3072,99,0.090115,0.090115
1,0,1957,2,-38.217999,0.0,-90.000000,0.000000,False,0,3073,75,0.441055,0.441055
2,0,1957,3,-53.665001,0.0,-90.000000,0.000000,False,0,3074,41,0.250053,0.250053
3,0,1957,4,-56.368000,0.0,-90.000000,0.000000,False,0,3075,35,0.239292,0.239292
4,0,1957,5,-55.792000,0.0,-90.000000,0.000000,False,0,3076,36,0.366585,0.366585
...,...,...,...,...,...,...,...,...,...,...,...,...,...
15841842,40595,1958,6,-1.700000,0.0,86.199997,-113.099998,False,498146,3089,157,0.165173,0.165173
15841843,40595,1958,8,0.000000,0.0,86.199997,-113.099998,False,498147,3091,161,0.070351,0.070351
15841844,40595,1958,9,-11.100000,0.0,86.199997,-113.099998,False,498147,3092,136,0.187994,0.187994
15841845,40595,1958,10,-22.200001,0.0,86.199997,-113.099998,False,498147,3093,111,0.305645,0.305645


---
# Import Local Summaries
Imports all %position%-TAVG-Counts and %position%-TAVG-Trend Text-Files in a certain path and combines all the data in one dataframe. This dataframe is then being saved as a binary file. (Since the process takes quite some time and the dataset is huge)
The data can be downloaded here: http://berkeleyearth.lbl.gov/auto/Local/TAVG/Text/

In [None]:
LOCAL_FILE_PATH = "sources/berkley_local_summaries"
OUTPUT_FILE_BINARY = "sources/localsummaries.parquet.gzip"
FORCE_RECREATION = False

#name = 0.80N-8.84E
def getDataForOnePosition(name):
    count_file = f"{LOCAL_FILE_PATH}/{name}-TAVG-Counts.txt"
    trends_file = f"{LOCAL_FILE_PATH}/{name}-TAVG-Trend.txt"

    # Extract Data from comment at begin of trend file:
    chunk = ""
    with open(trends_file) as myFile:
        chunk = myFile.read(4069)

    #x = re.findall(r"for the location:[\s%]+([\d.\dNSEW ]+),([\d.\dNSEW ]+)", chunk); #for location
    country = re.findall(r"Country: ([\w\S ]+)", chunk) #for Country
    country = country[0] if len(country) > 0 else ""
    citys = re.findall(r"Nearby Cities: ([\w\S ]+)", chunk)
    citys = citys[0] if len(citys) > 0 else ""
    temperatureList = re.findall(r"Jan[\s]+Feb[\s]+Mar[\s]+Apr[\s]+May[\s]+Jun[\s]+Jul[\s]+Aug[\s]+Sep[\s]+Oct[\s]+Nov[\s]+Dec\s+%%([\w\s\S]+)%%", chunk)[0].strip().split(" ")
    temperatureList = list(filter(None, temperatureList))
    temperatureList = [eval(i) for i in temperatureList]

    latlon = name.split("-")

    assert(len(temperatureList) == 12)
    #df_tempMonthList = pd.DataFrame.from_dict(temperatureList)

    header = ["Year", "Month", "", "Within 10 km", "Within 50 km", "Within 100 km", "Within 250 km", "Within 500 km", "Within 1000 km"]
    df_counts = pd.read_csv(count_file, delimiter="\s+", header=None, comment="%", names=header, usecols = [0,1,3,4,5,6,7,8], encoding='latin-1')
    header = ["Year", "Month",  "Monthly Anomaly", "Monthly Unc.", "Annual Anomaly", "Annual Unc.",   "Five-year Anomaly", "Five-year Unc.", "Ten-year Anomaly", "Ten-year Unc.",  "Twenty-year Anomaly", "Twenty-year Unc."]
    df_trends = pd.read_csv(trends_file, delimiter="\s+", header=None, comment="%", names=header, usecols = [0,1,2,3], encoding='latin-1')
    
    # Drop Records based on the distance of the record stations
    #df_counts = df_counts[df_counts["Within 500 km"] > 0]
    if len(df_counts) == 0:
        return None

    df_joined = pd.merge(df_counts, df_trends, how='left', left_on=['Year', 'Month'], right_on=['Year', 'Month'])
    df_joined['AverageTemperature'] = df_joined.apply(lambda x: x['Monthly Anomaly'] + temperatureList[x['Month'].astype(int) - 1], axis=1)
    df_joined["Latitude"] = latlon[0]
    df_joined["Longitude"] = latlon[1]
    df_joined["Country"] = country
    df_joined["City"] = citys
    df_joined["dt"] = df_joined.apply(lambda x: str(x['Year']) + "-" + '{0:0>2}'.format(x['Month']) + "-01", axis=1)
    df_joined.rename(columns={"Monthly Unc.": "AverageTemperatureUncertainty"}, inplace=True)

    #df_joined = df_joined[["dt", "AverageTemperature", "AverageTemperatureUncertainty", "City", "Country", "Latitude", "Longitude"]]
    #df_joined = df_joined[["dt", "AverageTemperature", "AverageTemperatureUncertainty", "Latitude", "Longitude"]]
    return df_joined


def getPositionsFromDirectory(dir):
    positions = set()
    for path in os.listdir(dir):
        if not os.path.isfile(os.path.join(dir, path)):
            continue
        if not "-TAVG-Trend.txt" in path:
            continue
        positions.add(path.replace("-TAVG-Trend.txt", ""))
    return positions

if os.path.isfile(OUTPUT_FILE_BINARY) and not FORCE_RECREATION:
    df_merged = pd.read_parquet(OUTPUT_FILE_BINARY)
    print("Data imported from binary file!!!")
else:
    allPositions = getPositionsFromDirectory(LOCAL_FILE_PATH)
    print(f"{len(allPositions)} different Location-Files will be combined")
    df_merged = pd.DataFrame({'A' : []})
    pbar = tqdm(total=len(allPositions), desc="Merging files")
    for ind, positionName in enumerate(allPositions):
        pbar.update(1)
        pbar.set_description(f"Merging files ({len(df_merged)} temp entries)")
        #pbar.write(f"Working on {positionName}...")
        df = getDataForOnePosition(positionName)
        if df is None:
            continue
        if len(df) < 0:
            #pbar.write(f" -> Returned empty dataframe")
            continue
        #pbar.write(f" -> Read {len(df)} entries")
        if len(df_merged) > 0:
            df_merged = pd.concat([df_merged, df], ignore_index=True)
        else:
            df_merged = df
    pbar.close()
    df_merged.to_parquet(OUTPUT_FILE_BINARY, compression="gzip")

df_local = df_merged


### OUTPUT FOR INTERPOLATION
The following code outputs the data such that we can further process it with `interpolate_data.py`

In [None]:
# OUTPUT FOR INTERPOLATION
OUTPUT_FILE = "C:/Users/gkimmersdorfer/Documents/rtvis22-cde/data/sources/GlobalLandTemperaturesByCity.csv"
pd.options.mode.chained_assignment = None  # default='warn'
df_output = df_local
df_output['City'] = ""
df_output['Country'] = ""
df_output['src'] = 0
pd.options.mode.chained_assignment = 'warn'
df_output = df_output[["dt", "AverageTemperature", "AverageTemperatureUncertainty", "City", "Country", "Latitude", "Longitude"]]
df_output.to_csv(OUTPUT_FILE, index=False, sep=",", encoding="utf-8", header=True)
print(f"Output-File created successfully")

In [None]:
# OUTPUT WITH DELETED NANS WITHOUT INTERPOLATION READY FOR COMPRESS
OUTPUT_FILE = "C:/Users/gkimmersdorfer/Documents/rtvis22-cde/data/sources/GlobalLandTemperaturesByCity_interpolated.csv"

pd.options.mode.chained_assignment = None  # default='warn'
df_output = df_local.dropna()
print(f"Dropped {len(df_local) - len(df_output)} NaN-entries.")
df_output['City'] = ""
df_output['Country'] = ""
df_output['src'] = 0
pd.options.mode.chained_assignment = 'warn'

df_output = df_output[["dt", "AverageTemperature", "AverageTemperatureUncertainty", "City", "Country", "Latitude", "Longitude", "src"]]
df_output.to_csv(OUTPUT_FILE, index=False, sep=",", encoding="utf-8", header=True)
print(f"Output-File created successfully")
