In [1]:
import os
import pandas as pd
import numpy as np
from datetime import datetime
from PIL import Image, ImageDraw, ImageFont
from dateutil.relativedelta import *

In [2]:
def get_time(items):
    date = items[1][-8:]
    time = items[2]
    return datetime.strptime(date + " " + time, '%m/%d/%y %H:%M:%S')

def fraction_to_float(fraction):
    parts = fraction.split("/")
    # Consider saving the M here as a sepearte column     
    num = float(parts[0].replace("M", ""))
    if len(parts) > 1:
        num = num / float(parts[1])
    return num

def get_visibility(items):
    visibility_items = [item for item in items if len(item) >= 3 and item[-2:] == "SM"]
    if len(visibility_items) == 0:
        return None
    else:
        visibility_item = visibility_items[0]
        return fraction_to_float(visibility_item[:-2])

Pull out designations

In [3]:
data_dicts = []
folderName = "./boeing2018"
for path in os.listdir(folderName):
    with open(os.path.join(folderName, path)) as fp:
        for line in fp:
            items = line.split()
            data_dict = {
                "time": get_time(items),
                "visibility": get_visibility(items)
            }
            for item in items:
                if len(item) >= 3:
                    designation = item[0:3]
                    if designation == "CLR":
                        data_dict["CLR"] = True
                    elif designation in ["FEW", "SCT", "BKN", "OVC", "VV0"]:
                        if len(item) > 6:
                            height = int(item[3:6])
                        else:
                            height = int(item[3:])
                        data_dict[designation] = height
            data_dicts.append(data_dict)
cloud_df = pd.DataFrame(data_dicts,
                     columns=['time', "visibility", 'CLR', 'FEW','SCT','BKN', 'OVC', 'VV0'])
cloud_df = cloud_df.set_index("time")
cloud_df.head(500)

Unnamed: 0_level_0,visibility,CLR,FEW,SCT,BKN,OVC,VV0
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2018-01-01 00:00:31,10.0,True,,,,,
2018-01-01 00:05:31,10.0,True,,,,,
2018-01-01 00:10:31,10.0,True,,,,,
2018-01-01 00:15:31,10.0,True,,,,,
2018-01-01 00:20:31,10.0,True,,,,,
...,...,...,...,...,...,...,...
2018-01-02 17:15:31,10.0,,120.0,,,,
2018-01-02 17:20:31,10.0,,120.0,,,,
2018-01-02 17:25:31,10.0,,120.0,,,,
2018-01-02 17:30:31,10.0,,120.0,,,,


Remove duplicates and fill in missing data

In [4]:
cloud_df = cloud_df[~cloud_df.index.duplicated(keep='last')]
idx = pd.date_range('2018-01-01 00:00:31', '2018-12-31 23:55:31', freq='5min')
cloud_df = cloud_df.reindex(idx, fill_value=None)

In [5]:
def get_ceiling(row):
    ceiling_heights = []
    if not pd.isna(row["SCT"]):
        ceiling_heights.append(row["SCT"])
    if not pd.isna(row["BKN"]):
        ceiling_heights.append(row["BKN"])
    if not pd.isna(row["VV0"]):
        ceiling_heights.append(row["VV0"])
    if len(ceiling_heights) == 0:
        return None
    else:
        return min(ceiling_heights)
    

# LIFR = <500′ and/or <1 mile
# IFR = 500-1000′ and/or 1-3 miles
# MVFR = 1000-3000′ and/or 3-5 miles
# VFR = >3000′ and >5 miles
def get_interpretation(row, last_interpretation):
    ceiling = get_ceiling(row)
    ceiling_valid = ceiling != None
    visibility = row["visibility"]
    visibility_valid = not pd.isna(visibility)
    
    if not ceiling_valid and not visibility_valid:
        return last_interpretation
    
    if (ceiling_valid and ceiling < 5) or (visibility_valid and visibility < 1.0):
        return "LIFR"
    elif (ceiling_valid and ceiling < 10) or (visibility_valid and visibility < 3.0):
        return "IFR"
    elif (ceiling_valid and ceiling < 30) or (visibility_valid and visibility < 5.0):
        return "MVFR"
    elif ((ceiling_valid and ceiling >= 30) or (not ceiling_valid)) and (visibility_valid and visibility >= 5.0):
        return "VFR"
    else:
#         This only occurs because of some missing data
        return last_interpretation

In [6]:
ignore_clouds_above = 100
def get_opacity(row, last_opacity):
    if row["CLR"] == True:
        return 0.0
    elif not pd.isna(row["FEW"]):
        if row["FEW"] < ignore_clouds_above:
            return 2.0/8.0
    elif not pd.isna(row["SCT"]):
        if row["SCT"] < ignore_clouds_above:
            return 4.0/8.0
    elif not pd.isna(row["BKN"]):
        if row["BKN"] < ignore_clouds_above:
            return 6.0/8.0
    elif not pd.isna(row["OVC"]):
        if row["OVC"] < ignore_clouds_above:
            return 1.0
    elif not pd.isna(row["VV0"]):
        if row["VV0"] < ignore_clouds_above:
            return 1.0
    return last_opacity
    

In [82]:
# Setup chart
width = 365 * 12
height = 24 * 60
pixelsPerDay = 12
pixelsPerMinute = 1
minutesPerSection = 5
im = Image.new('RGBA', (width, height))
draw = ImageDraw.Draw(im)

# iterate data
opacity = 0.0
first_time = cloud_df.first_valid_index()
for time, row in cloud_df.iterrows():
    opacity = get_opacity(row, opacity)
    minute = time.hour * 60 + time.minute
    day = (time - first_time).days
    draw.rectangle([(day * pixelsPerDay, minute * pixelsPerMinute), (day * pixelsPerDay + pixelsPerDay, minute * pixelsPerMinute + pixelsPerMinute * minutesPerSection)], fill=(0,0,0, int(255 * opacity)))
im.show()

In [7]:
def get_color(indication):
    if indication == "LIFR":
#         return magenta
        return (238,0,238,255)
    elif indication == "IFR":
#         return red
        return (226,0,0,255)
    elif indication == "MVFR":
#         blue
        return (0,103,243,255)
    elif indication == "VFR":
#         green
        return (0,225,0,255)

In [10]:
# Setup chart
width = 365 * 12
height = 24 * 60
pixelsPerDay = 12
pixelsPerMinute = 1
minutesPerSection = 5
im = Image.new('RGBA', (width, height))
draw = ImageDraw.Draw(im)

# iterate data
interpretation = "VFR"
first_time = cloud_df.first_valid_index()
for time, row in cloud_df.iterrows():
    interpretation = get_interpretation(row, interpretation)
    minute = time.hour * 60 + time.minute
    day = (time - first_time).days
    draw.rectangle([(day * pixelsPerDay, minute * pixelsPerMinute), (day * pixelsPerDay + pixelsPerDay, minute * pixelsPerMinute + pixelsPerMinute * minutesPerSection)], fill=get_color(interpretation))
im.show()

In [8]:
# Setup chart
offset = 100
width = 365 * 12
height = 24 * 60
pixelsPerDay = 12
pixelsPerMinute = 1
minutesPerSection = 5
grid_width = 4
tick_length = 12
im = Image.new('RGBA', (width + offset * 2, height + offset * 2))
draw = ImageDraw.Draw(im)


# iterate data
# interpretation = "VFR"
# for time, row in cloud_df.iterrows():
#     interpretation = get_interpretation(row, interpretation)
#     minute = time.hour * 60 + time.minute
#     day = (time - first_time).days
#     start_x = offset + day * pixelsPerDay
#     start_y = offset + minute * pixelsPerMinute
#     draw.rectangle([(start_x, start_y), (start_x + pixelsPerDay, start_y + pixelsPerMinute * minutesPerSection)], fill=get_color(interpretation))


for i in range(0, 25):
    minutes = i * 60
    y = offset + minutes * pixelsPerMinute + 1
    shape = [(offset - tick_length, y), (offset,y)]
    draw.line(shape, fill="black", width=grid_width)


draw.rectangle([(offset, offset), (offset+width, offset+height)], outline="black", width=grid_width)

fnt = ImageFont.truetype("arial.ttf", 32)

first_time = cloud_df.first_valid_index()
for i in range(0, 12):
    new_time = first_time + relativedelta(months=+i)
    days = (new_time - first_time).days
    x = offset + days * pixelsPerDay
    shape = [(x, offset),(x, offset + height)]
    draw.line(shape, fill="black", width=grid_width)
    
    # position month
    month_name = new_time.strftime("%B")
    w, h = draw.textsize(month_name, font=fnt)
    draw.text((x - w / 2, offset + height), month_name, font=fnt, fill="black", align='left')

    
    # for time, row in cloud_df.iterrows():
#     interpretation = get_interpretation(row, interpretation)
#     minute = time.hour * 60 + time.minute
#     day = (time - first_time).days
#     draw.rectangle([(day * pixelsPerDay, minute * pixelsPerMinute), (day * pixelsPerDay + pixelsPerDay, minute * pixelsPerMinute + pixelsPerMinute * minutesPerSection)], fill=get_color(interpretation))


im.show()

In [11]:
draw.rectangle??

Mostly looks good, but there are two issues
1, plots don't mesh together well, it'd be cool if there was a smooth gradient between sections or maybe just do it binary cause somehow that's easier to read.
2. Not every 5 minutes has a designation.  Could convert it to pandas, then merge it with another dataset that has no info, but somehow iterate that, always using the last one that had info to fill in that section.