# Parallel writes of fewer files to csvs 

because writing to the database is taking a very long time, we will instead use `dask` to read in and parse a couple of files in parallel. We will save out to spreadsheets since we are doing parallel writes. 

## local install
Install mosparse library locally

In [None]:
%pip install -ve ../

## Import

In [1]:
%load_ext autoreload
%autoreload 2

The libraries that will be used for this example.

In [9]:
from pathlib import Path

from tqdm.auto import tqdm

import dask.bag as db
from dask.diagnostics import ProgressBar

import mosparse.mavreader as mpr
import mosparse.mavparse as mpp

import pandas as pd

## Find the 2017 files
Directions to get to the file you want to open.

In [10]:
mos_files = list(Path("downthemall").iterdir())

In [11]:
len(mos_files)

48

In [12]:
df = pd.read_csv("ghcn_mos_lookup.csv")
df.head()

Unnamed: 0.1,Unnamed: 0,ID,LATITUDE,LONGITUDE,Station,Latitude,Longitude
0,0,USC00026180,36.9208,-111.4483,KPGA,36.9333,-111.45
1,1,USW00003162,36.9261,-111.4478,KPGA,36.9333,-111.45
2,2,USC00080211,29.7258,-85.0206,KAAF,29.7333,-85.0167
3,3,USW00012832,29.7333,-85.0333,KAAF,29.7333,-85.0167
4,4,USC00088782,27.7053,-82.4008,KMCF,27.85,-82.5


In [13]:
stations = df['Station'].unique()

## convert the .gz file to a csv
Extracts data from filepath, parses the file into a spreadsheet, saves out as a .csv

In [14]:
def process_file(filepath):
    with open("completed_files.txt", 'a') as f:
        with mpr.MavReader(filepath, stations=True) as station_generator:
            [mpp.write_station(station, filename=f'{filepath.name}.csv', 
                               stations=stations, saveout="station_filter") for 
                 station in station_generator if len(station)>0]
            print(filepath.name, file=f)

## concurrently process files
apply the process_file function to multiple files in the mos_files list at the same time

In [15]:
pb = db.from_sequence(mos_files).map(process_file)
with ProgressBar():
    pb.compute()

[########################################] | 100% Completed |  2hr 52min 36.6s


In [19]:
with mpr.MavReader(mos_files[0], stations=True) as station_generator:
    next(station_generator)
    for station in station_generator:
        print(mpp.get_header(station[0]))
        break

{'station': 'NSTU', 'short_model': 'GFS', 'model': 'GFS MOS GUIDANCE', 'runtime': datetime.datetime(2019, 1, 1, 0, 0, tzinfo=tzutc())}
