-
-
Notifications
You must be signed in to change notification settings - Fork 106
/
helpers.py
1796 lines (1532 loc) · 68.6 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""General utility functions that are used in a variety of contexts.
The functions in this module are used in various stages of the ETL and post-etl
processes. They are usually not dataset specific, but not always. If a function is
designed to be used as a general purpose tool, applicable in multiple scenarios, it
should probably live here. There are lost of transform type functions in here that help
with cleaning and restructing dataframes.
"""
import importlib.resources
import itertools
import pathlib
import re
import shutil
from collections import defaultdict
from collections.abc import Generator, Iterable
from functools import partial
from io import BytesIO
from typing import Any, Literal
import addfips
import numpy as np
import pandas as pd
import requests
import sqlalchemy as sa
from dagster import AssetKey, AssetsDefinition, AssetSelection, SourceAsset
from pandas._libs.missing import NAType
import pudl.logging_helpers
from pudl.metadata.fields import apply_pudl_dtypes, get_pudl_dtypes
sum_na = partial(pd.Series.sum, skipna=False)
"""A sum function that returns NA if the Series includes any NA values.
In many of our aggregations we need to override the default behavior of treating NA
values as if they were zero. E.g. when calculating the heat rates of generation units,
if there are some months where fuel consumption is reported as NA, but electricity
generation is reported normally, then the fuel consumption for the year needs to be NA,
otherwise we'll get unrealistic heat rates.
"""
logger = pudl.logging_helpers.get_logger(__name__)
def label_map(
df: pd.DataFrame,
from_col: str = "code",
to_col: str = "label",
null_value: str | NAType = pd.NA,
) -> defaultdict[str, str | NAType]:
"""Build a mapping dictionary from two columns of a labeling / coding dataframe.
These dataframes document the meanings of the codes that show up in much of the
originally reported data. They're defined in :mod:`pudl.metadata.codes`. This
function is mostly used to build maps that can translate the hard to understand
short codes into longer human-readable codes.
Args:
df: The coding / labeling dataframe. Must contain columns ``from_col``
and ``to_col``.
from_col: Label of column containing the existing codes to be replaced.
to_col: Label of column containing the new codes to be swapped in.
null_value: Defualt (Null) value to map to when a value which doesn't
appear in ``from_col`` is encountered.
Returns:
A mapping dictionary suitable for use with :meth:`pandas.Series.map`.
"""
return defaultdict(
lambda: null_value,
df.loc[:, [from_col, to_col]]
.drop_duplicates(subset=[from_col])
.to_records(index=False),
)
def find_new_ferc1_strings(
table: str,
field: str,
strdict: dict[str, list[str]],
ferc1_engine: sa.engine.Engine,
) -> set[str]:
"""Identify as-of-yet uncategorized freeform strings in FERC Form 1.
Args:
table: Name of the FERC Form 1 DB to search.
field: Name of the column in that table to search.
strdict: A string cleaning dictionary. See
e.g. `pudl.transform.ferc1.FUEL_UNIT_STRINGS`
ferc1_engine: SQL Alchemy DB connection engine for the FERC Form 1 DB.
Returns:
Any string found in the searched table + field that was not part of any of
categories enumerated in strdict.
"""
all_strings = set(
pd.read_sql(f"SELECT {field} FROM {table};", ferc1_engine).pipe( # nosec
simplify_strings, columns=[field]
)[field]
)
old_strings = set.union(*[set(strings) for strings in strdict.values()])
return all_strings.difference(old_strings)
def find_foreign_key_errors(dfs: dict[str, pd.DataFrame]) -> list[dict[str, Any]]:
"""Report foreign key violations from a dictionary of dataframes.
The database schema to check against is generated based on the names of the
dataframes (keys of the dictionary) and the PUDL metadata structures.
Args:
dfs: Keys are table names, and values are dataframes ready for loading
into the SQLite database.
Returns:
A list of dictionaries, each one pertains to a single database table
in which a foreign key constraint violation was found, and it includes
the table name, foreign key definition, and the elements of the
dataframe that violated the foreign key constraint.
"""
import pudl.metadata.classes
package = pudl.metadata.classes.Package.from_resource_ids(
resource_ids=tuple(sorted(dfs))
)
errors = []
for resource in package.resources:
for foreign_key in resource.schema.foreign_keys:
x = dfs[resource.name][foreign_key.fields]
y = dfs[foreign_key.reference.resource][foreign_key.reference.fields]
ncols = x.shape[1]
idx = range(ncols)
xx, yy = x.set_axis(idx, axis=1), y.set_axis(idx, axis=1)
if ncols == 1:
# Faster check for single-field foreign key
invalid = ~(xx[0].isin(yy[0]) | xx[0].isna())
else:
invalid = ~(
pd.concat([yy, xx]).duplicated().iloc[len(yy) :]
| xx.isna().any(axis=1)
)
if invalid.any():
errors.append(
{
"resource": resource.name,
"foreign_key": foreign_key,
"invalid": x[invalid],
}
)
return errors
def download_zip_url(url, save_path, chunk_size=128, timeout=9.05):
"""Download and save a Zipfile locally.
Useful for acquiring and storing non-PUDL data locally.
Args:
url (str): The URL from which to download the Zipfile
save_path (pathlib.Path): The location to save the file.
chunk_size (int): Data chunk in bytes to use while downloading.
timeout (float): Time to wait for the server to accept a connection.
See https://requests.readthedocs.io/en/latest/user/advanced/#timeouts
Returns:
None
"""
# This is a temporary hack to avoid being filtered as a bot:
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:77.0) Gecko/20100101 Firefox/77.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"DNT": "1",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
r = requests.get(url, stream=True, headers=headers, timeout=timeout)
with save_path.open(mode="wb") as fd:
for chunk in r.iter_content(chunk_size=chunk_size):
fd.write(chunk)
def add_fips_ids(df, state_col="state", county_col="county", vintage=2015):
"""Add State and County FIPS IDs to a dataframe.
To just add State FIPS IDs, make county_col = None.
"""
# force the columns to be the nullable string types so we have a consistent
# null value to filter out before feeding to addfips
df = df.astype({state_col: pd.StringDtype()})
if county_col:
df = df.astype({county_col: pd.StringDtype()})
af = addfips.AddFIPS(vintage=vintage)
# Lookup the state and county FIPS IDs and add them to the dataframe:
df["state_id_fips"] = df.apply(
lambda x: (
af.get_state_fips(state=x[state_col]) if pd.notnull(x[state_col]) else pd.NA
),
axis=1,
)
# force the code columns to be nullable strings - the leading zeros are
# important
df = df.astype({"state_id_fips": pd.StringDtype()})
logger.info(
f"Assigned state FIPS codes for "
f"{len(df[df.state_id_fips.notnull()])/len(df):.2%} of records."
)
if county_col:
df["county_id_fips"] = df.apply(
lambda x: (
af.get_county_fips(state=x[state_col], county=x[county_col])
if pd.notnull(x[county_col]) and pd.notnull(x[state_col])
else pd.NA
),
axis=1,
)
# force the code columns to be nullable strings - the leading zeros are
# important
df = df.astype({"county_id_fips": pd.StringDtype()})
logger.info(
f"Assigned county FIPS codes for "
f"{len(df[df.county_id_fips.notnull()])/len(df):.2%} of records."
)
return df
def clean_eia_counties(df, fixes, state_col="state", county_col="county"):
"""Replace non-standard county names with county nmes from US Census."""
df = df.copy()
df[county_col] = (
df[county_col]
.str.strip()
# Condense multiple whitespace chars.
.str.replace(r"\s+", " ", regex=True)
.str.replace(r"^St ", "St. ", regex=True) # Standardize abbreviation.
# Standardize abbreviation.
.str.replace(r"^Ste ", "Ste. ", regex=True)
.str.replace("Kent & New Castle", "Kent, New Castle") # Two counties
# Fix ordering, remove comma
.str.replace("Borough, Kodiak Island", "Kodiak Island Borough")
# Turn comma-separated counties into lists
.str.replace(r",$", "", regex=True)
.str.split(",")
)
# Create new records for each county in a multi-valued record
df = df.explode(county_col)
df[county_col] = df[county_col].str.strip()
# Yellowstone county is in MT, not WY
df.loc[
(df[state_col] == "WY") & (df[county_col] == "Yellowstone"), state_col
] = "MT"
# Replace individual bad county names with identified correct names in fixes:
for fix in fixes.itertuples():
state_mask = df[state_col] == fix.state
county_mask = df[county_col] == fix.eia_county
df.loc[state_mask & county_mask, county_col] = fix.fips_county
return df
def oob_to_nan(df, cols, lb=None, ub=None):
"""Set non-numeric values and those outside of a given range to NaN.
Args:
df (pandas.DataFrame): The dataframe containing values to be altered.
cols (iterable): Labels of the columns whose values are to be changed.
lb: (number): Lower bound, below which values are set to NaN. If None,
don't use a lower bound.
ub: (number): Upper bound, below which values are set to NaN. If None,
don't use an upper bound.
Returns:
pandas.DataFrame: The altered DataFrame.
"""
out_df = df.copy()
for col in cols:
# Force column to be numeric if possible, NaN otherwise:
out_df.loc[:, col] = pd.to_numeric(out_df[col], errors="coerce")
if lb is not None:
out_df.loc[out_df[col] < lb, col] = np.nan
if ub is not None:
out_df.loc[out_df[col] > ub, col] = np.nan
return out_df
def oob_to_nan_with_dependent_cols(
df: pd.DataFrame,
cols: list,
dependent_cols: list,
lb: float = None,
ub: float = None,
):
"""Call oob_to_nan and additionally nullify any derived columns.
Set values in ``cols`` to NaN if values are non-numeric or outside of a
given range. The corresponding values in ``dependent_cols`` are then set
to NaN. ``dependent_cols`` should be columns derived from one or multiple
of the columns in ``cols``.
Args:
df (pandas.DataFrame): The dataframe containing values to be altered.
cols (iterable): Labels of the columns whose values are to be changed.
dependent_cols (iterable): Labels of the columns whose corresponding
values should also be nullified. Columns are derived from one or
multiple of the columns in ``cols``.
lb: (number): Lower bound, below which values are set to NaN. If None,
don't use a lower bound.
ub: (number): Upper bound, below which values are set to NaN. If None,
don't use an upper bound.
Returns:
pandas.DataFrame: The altered DataFrame.
"""
out_df = oob_to_nan(df, cols, lb, ub)
out_df.loc[out_df[cols].isnull().any(axis=1), dependent_cols] = np.nan
return out_df
def prep_dir(dir_path, clobber=False):
"""Create (or delete and recreate) a directory.
Args:
dir_path (path-like): path to the directory that you are trying to
clean and prepare.
clobber (bool): If True and dir_path exists, it will be removed and
replaced with a new, empty directory.
Raises:
FileExistsError: if a file or directory already exists at dir_path.
Returns:
pathlib.Path: Path to the created directory.
"""
dir_path = pathlib.Path(dir_path)
if dir_path.exists():
if clobber:
shutil.rmtree(dir_path)
else:
raise FileExistsError(f"{dir_path} exists and clobber is {clobber}")
dir_path.mkdir(parents=True)
return dir_path
def is_doi(doi):
"""Determine if a string is a valid digital object identifier (DOI).
Function simply checks whether the offered string matches a regular
expresssion -- it doesn't check whether the DOI is actually registered
with the relevant authority.
Args:
doi (str): String to validate.
Returns:
bool: True if doi matches the regex for valid DOIs, False otherwise.
"""
doi_regex = re.compile(
r"(doi:\s*|(?:https?://)?(?:dx\.)?doi\.org/)?(10\.\d+(.\d+)*/.+)$",
re.IGNORECASE | re.UNICODE,
)
return bool(re.match(doi_regex, doi))
def convert_col_to_datetime(df: pd.DataFrame, date_col_name: str) -> pd.DataFrame:
"""Convert a non-datetime column in a dataframe to a datetime64[s].
If the column isn't a datetime, it needs to be converted to a string type
first so that integer years are formatted correctly.
Args:
df: Dataframe with column to convert.
date_col_name: name of the datetime column to convert.
Returns:
Dataframe with the converted datetime column.
"""
if not pd.api.types.is_datetime64_dtype(df[date_col_name]):
logger.warning(
f"{date_col_name} is {df[date_col_name].dtype} column. "
"Converting to datetime64[ns]."
)
df[date_col_name] = pd.to_datetime(df[date_col_name].astype("string"))
return df
def full_timeseries_date_merge(
left: pd.DataFrame,
right: pd.DataFrame,
on: list[str],
left_date_col: str = "report_date",
right_date_col: str = "report_date",
new_date_col: str = "report_date",
date_on: list[str] = ["year"],
how: Literal["inner", "outer", "left", "right", "cross"] = "inner",
report_at_start: bool = True,
freq: str = "MS",
**kwargs,
):
"""Merge dataframes with different date frequencies and expand to a full timeseries.
Arguments: see arguments for ``date_merge`` and ``expand_timeseries``
"""
out = date_merge(
left=left,
right=right,
left_date_col=left_date_col,
right_date_col=right_date_col,
new_date_col=new_date_col,
on=on,
date_on=date_on,
how=how,
report_at_start=report_at_start,
**kwargs,
)
out = expand_timeseries(
df=out,
date_col=new_date_col,
freq=freq,
key_cols=on,
)
return out
def _add_suffix_to_date_on(date_on):
"""Check date_on list is valid and add _temp_for_merge suffix."""
if date_on is None:
date_on = ["year"]
date_on_suffix = []
for col in date_on:
if col not in ["year", "month", "quarter", "day"]:
raise AssertionError(
logger.error(f"{col} is not a valid string in date_on column list.")
)
date_on_suffix.append(col + "_temp_for_merge")
return date_on_suffix
def date_merge(
left: pd.DataFrame,
right: pd.DataFrame,
on: list[str],
left_date_col: str = "report_date",
right_date_col: str = "report_date",
new_date_col: str = "report_date",
date_on: list[str] = None,
how: Literal["inner", "outer", "left", "right", "cross"] = "inner",
report_at_start: bool = True,
**kwargs,
) -> pd.DataFrame:
"""Merge two dataframes that have different report date frequencies.
We often need to bring together data that is reported at different
temporal granularities e.g. monthly basis versus annual basis. This function
acts as a wrapper on a pandas merge to allow merging at different temporal
granularities. The date columns of both dataframes are separated into
year, quarter, month, and day columns. Then, the dataframes are merged according
to ``how`` on the columns specified by the ``on`` and ``date_on`` argument,
which list the new temporal columns to merge on as well any additional shared columns.
Finally, the datetime column is reconstructed in the output dataframe and
named according to the ``new_date_col`` parameter.
Args:
left: The left dataframe in the merge. Typically monthly in our use
cases if doing a left merge E.g. ``generation_eia923``.
Must contain columns specified by ``left_date_col`` and
``on`` argument.
right: The right dataframe in the merge. Typically annual in our uses
cases if doing a left merge E.g. ``generators_eia860``.
Must contain columns specified by ``right_date_col`` and ``on`` argument.
on: The columns to merge on that are shared between both
dataframes. Typically ID columns like ``plant_id_eia``, ``generator_id``
or ``boiler_id``.
left_date_col: Column in ``left`` containing datetime like data. Default is
``report_date``. Must be a Datetime or convertible to a Datetime using
:func:`pandas.to_datetime`
right_date_col: Column in ``right`` containing datetime like data. Default is
``report_date``. Must be a Datetime or convertible to a Datetime using
:func:`pandas.to_datetime`.
new_date_col: Name of the reconstructed datetime column in the output dataframe.
date_on: The temporal columns to merge on. Values in this list
of columns must be [``year``, ``quarter``, ``month``, ``day``].
E.g. if a monthly reported dataframe is being merged onto a daily reported
dataframe, then the merge would be performed on ``["year", "month"]``.
If one of these temporal columns already exists in the dataframe it will not
be clobbered by the merge, as the suffix "_temp_for_merge" is added when
expanding the datetime column into year, quarter, month, and day. By default,
`date_on` will just include year.
how: How the dataframes should be merged. See :func:`pandas.DataFrame.merge`.
report_at_start: Whether the data in the dataframe whose report date is not being
kept in the merged output (in most cases the less frequently reported dataframe)
is reported at the start or end of the time period e.g. January 1st
for annual data.
kwargs : Additional arguments to pass to :func:`pandas.DataFrame.merge`.
Returns:
Merged contents of left and right input dataframes.
Raises:
ValueError: if ``left_date_col`` or ``right_date_col`` columns are missing from their
respective input dataframes.
ValueError: if any of the labels referenced in ``on`` are missing from either
the left or right dataframes.
"""
def separate_date_cols(df, date_col_name, date_on):
out_df = df.copy()
out_df.loc[:, date_col_name] = pd.to_datetime(out_df[date_col_name])
if "year_temp_for_merge" in date_on:
out_df.loc[:, "year_temp_for_merge"] = out_df[date_col_name].dt.year
if "quarter_temp_for_merge" in date_on:
out_df.loc[:, "quarter_temp_for_merge"] = out_df[date_col_name].dt.quarter
if "month_temp_for_merge" in date_on:
out_df.loc[:, "month_temp_for_merge"] = out_df[date_col_name].dt.month
if "day_temp_for_merge" in date_on:
out_df.loc[:, "day_temp_for_merge"] = out_df[date_col_name].dt.day
return out_df
right = convert_col_to_datetime(right, right_date_col)
left = convert_col_to_datetime(left, left_date_col)
date_on = _add_suffix_to_date_on(date_on)
right = separate_date_cols(right, right_date_col, date_on)
left = separate_date_cols(left, left_date_col, date_on)
merge_cols = date_on + on
out = pd.merge(left, right, on=merge_cols, how=how, **kwargs)
suffixes = ["", ""]
if left_date_col == right_date_col:
if "suffixes" in kwargs:
suffixes = kwargs["suffixes"]
else:
suffixes = ["_x", "_y"]
# reconstruct the new report date column and clean up columns
left_right_date_col = [left_date_col + suffixes[0], right_date_col + suffixes[1]]
if report_at_start:
# keep the later of the two report dates when determining
# the new report date for each row
reconstructed_date = out[left_right_date_col].max(axis=1)
else:
# keep the earlier of the two report dates
reconstructed_date = out[left_right_date_col].min(axis=1)
out = out.drop(left_right_date_col + date_on, axis=1)
out.insert(loc=0, column=new_date_col, value=reconstructed_date)
return out
def expand_timeseries(
df: pd.DataFrame,
key_cols: list[str],
date_col: str = "report_date",
freq: str = "MS",
fill_through_freq: Literal["year", "month", "day"] = "year",
) -> pd.DataFrame:
"""Expand a dataframe to a include a full time series at a given frequency.
This function adds a full timeseries to the given dataframe for each group
of columns specified by ``key_cols``. The data in the timeseries will be filled
with the next previous chronological observation for a group of primary key columns
specified by ``key_cols``.
Arguments:
df: The dataframe to expand. Must have ``date_col`` in columns.
key_cols: Column names of the non-date primary key columns in the dataframe.
The resulting dataframe will have a full timeseries expanded for each
unique group of these ID columns that are present in the dataframe.
date_col: Name of the datetime column being expanded into a full timeseries.
freq: The frequency of the time series to expand the data to.
See :ref:`here <timeseries.offset_aliases>` for a list of
frequency aliases.
fill_through_freq: The frequency in which to fill in the data through. For
example, if equal to "year" the data will be filled in through the end of
the last reported year for each grouping of `key_cols`. Valid frequencies
are only year, month, or day.
"""
try:
pd.tseries.frequencies.to_offset(freq)
except ValueError:
logger.exception(
f"Frequency string {freq} is not valid. \
See Pandas Timeseries Offset Aliases docs for valid strings."
)
# For each group of ID columns add a dummy record with the date column
# equal to one increment higher than the last record in the group for the
# desired fill_through_freq.
# This allows records to be filled through the end of the last reported period
# and then this dummy record is dropped
df = convert_col_to_datetime(df, date_col)
end_dates = df.groupby(key_cols).agg({date_col: "max"})
if fill_through_freq == "year":
end_dates.loc[:, date_col] = pd.to_datetime(
{
"year": end_dates[date_col].dt.year + 1,
"month": 1,
"day": 1,
}
)
elif fill_through_freq == "month":
end_dates.loc[:, date_col] = pd.to_datetime(
{
"year": end_dates[date_col].dt.year,
"month": end_dates[date_col].dt.month + 1,
"day": 1,
}
)
elif fill_through_freq == "day":
end_dates.loc[:, date_col] = pd.to_datetime(
{
"year": end_dates[date_col].dt.year,
"month": end_dates[date_col].dt.month,
"day": end_dates[date_col].dt.day + 1,
}
)
else:
raise AssertionError(
f"{fill_through_freq} is not a valid frequency to fill through."
)
end_dates["drop_row"] = True
df = (
pd.concat([df, end_dates.reset_index()])
.set_index(date_col)
.groupby(key_cols)
.resample(freq)
.ffill()
.drop(key_cols, axis=1)
.reset_index()
)
return (
df[df.drop_row.isnull()]
.drop(columns="drop_row")
.reset_index(drop=True)
.pipe(apply_pudl_dtypes)
)
def organize_cols(df, cols):
"""Organize columns into key ID & name fields & alphabetical data columns.
For readability, it's nice to group a few key columns at the beginning
of the dataframe (e.g. report_year or report_date, plant_id...) and then
put all the rest of the data columns in alphabetical order.
Args:
df: The DataFrame to be re-organized.
cols: The columns to put first, in their desired output ordering.
Returns:
pandas.DataFrame: A dataframe with the same columns as the input
DataFrame df, but with cols first, in the same order as they
were passed in, and the remaining columns sorted alphabetically.
"""
# Generate a list of all the columns in the dataframe that are not
# included in cols
data_cols = sorted(c for c in df.columns.tolist() if c not in cols)
organized_cols = cols + data_cols
return df[organized_cols]
def simplify_strings(df, columns):
"""Simplify the strings contained in a set of dataframe columns.
Performs several operations to simplify strings for comparison and parsing purposes.
These include removing Unicode control characters, stripping leading and trailing
whitespace, using lowercase characters, and compacting all internal whitespace to a
single space.
Leaves null values unaltered. Casts other values with astype(str).
Args:
df (pandas.DataFrame): DataFrame whose columns are being cleaned up.
columns (iterable): The labels of the string columns to be simplified.
Returns:
pandas.DataFrame: The whole DataFrame that was passed in, with
the string columns cleaned up.
"""
out_df = df.copy()
for col in columns:
if col in out_df.columns:
out_df.loc[out_df[col].notnull(), col] = (
out_df.loc[out_df[col].notnull(), col]
.astype(str)
.str.replace(r"[\x00-\x1f\x7f-\x9f]", "", regex=True)
.str.strip()
.str.lower()
.str.replace(r"\s+", " ", regex=True)
)
return out_df
def cleanstrings_series(col, str_map, unmapped=None, simplify=True):
"""Clean up the strings in a single column/Series.
Args:
col (pandas.Series): A pandas Series, typically a single column of a
dataframe, containing the freeform strings that are to be cleaned.
str_map (dict): A dictionary of lists of strings, in which the keys are
the simplified canonical strings, witch which each string found in
the corresponding list will be replaced.
unmapped (str): A value with which to replace any string found in col
that is not found in one of the lists of strings in map. Typically
the null string ''. If None, these strings will not be replaced.
simplify (bool): If True, strip and compact whitespace, and lowercase
all strings in both the list of values to be replaced, and the
values found in col. This can reduce the number of strings that
need to be kept track of.
Returns:
pandas.Series: The cleaned up Series / column, suitable for
replacing the original messy column in a :class:`pandas.DataFrame`.
"""
if simplify:
col = (
col.astype(str).str.strip().str.lower().str.replace(r"\s+", " ", regex=True)
)
for k in str_map:
str_map[k] = [re.sub(r"\s+", " ", s.lower().strip()) for s in str_map[k]]
for k in str_map:
if str_map[k]:
col = col.replace(str_map[k], k)
if unmapped is not None:
badstrings = np.setdiff1d(col.unique(), list(str_map.keys()))
# This call to replace can only work if there are actually some
# leftover strings to fix -- otherwise it runs forever because we
# are replacing nothing with nothing.
if len(badstrings) > 0:
col = col.replace(badstrings, unmapped)
return col
def cleanstrings(df, columns, stringmaps, unmapped=None, simplify=True):
"""Consolidate freeform strings in several dataframe columns.
This function will consolidate freeform strings found in `columns` into
simplified categories, as defined by `stringmaps`. This is useful when
a field contains many different strings that are really meant to represent
a finite number of categories, e.g. a type of fuel. It can also be used to
create simplified categories that apply to similar attributes that are
reported in various data sources from different agencies that use their own
taxonomies.
The function takes and returns a pandas.DataFrame, making it suitable for
use with the :func:`pandas.DataFrame.pipe` method in a chain.
Args:
df (pandas.DataFrame): the DataFrame containing the string columns to
be cleaned up.
columns (list): a list of string column labels found in the column
index of df. These are the columns that will be cleaned.
stringmaps (list): a list of dictionaries. The keys of these
dictionaries are strings, and the values are lists of strings. Each
dictionary in the list corresponds to a column in columns. The
keys of the dictionaries are the values with which every string in
the list of values will be replaced.
unmapped (str, None): the value with which strings not found in the
stringmap dictionary will be replaced. Typically the null string
''. If None, then strings found in the columns but not in the
stringmap will be left unchanged.
simplify (bool): If true, strip whitespace, remove duplicate
whitespace, and force lower-case on both the string map and the
values found in the columns to be cleaned. This can reduce the
overall number of string values that need to be tracked.
Returns:
pandas.DataFrame: The function returns a new DataFrame containing the
cleaned strings.
"""
out_df = df.copy()
for col, str_map in zip(columns, stringmaps):
out_df[col] = cleanstrings_series(
out_df[col], str_map, unmapped=unmapped, simplify=simplify
)
return out_df
def fix_int_na(df, columns, float_na=np.nan, int_na=-1, str_na=""):
"""Convert NA containing integer columns from float to string.
Numpy doesn't have a real NA value for integers. When pandas stores integer
data which has NA values, it thus upcasts integers to floating point
values, using np.nan values for NA. However, in order to dump some of our
dataframes to CSV files for use in data packages, we need to write out
integer formatted numbers, with empty strings as the NA value. This
function replaces np.nan values with a sentinel value, converts the column
to integers, and then to strings, finally replacing the sentinel value with
the desired NA string.
This is an interim solution -- now that pandas extension arrays have been
implemented, we need to go back through and convert all of these integer
columns that contain NA values to Nullable Integer types like Int64.
Args:
df (pandas.DataFrame): The dataframe to be fixed. This argument allows
method chaining with the pipe() method.
columns (iterable of strings): A list of DataFrame column labels
indicating which columns need to be reformatted for output.
float_na (float): The floating point value to be interpreted as NA and
replaced in col.
int_na (int): Sentinel value to substitute for float_na prior to
conversion of the column to integers.
str_na (str): sa.String value to substitute for int_na after the column
has been converted to strings.
Returns:
df (pandas.DataFrame): a new DataFrame, with the selected columns
converted to strings that look like integers, compatible with
the postgresql COPY FROM command.
"""
return (
df.replace({c: float_na for c in columns}, int_na)
.astype({c: int for c in columns})
.astype({c: str for c in columns})
.replace({c: str(int_na) for c in columns}, str_na)
)
def month_year_to_date(df):
"""Convert all pairs of year/month fields in a dataframe into Date fields.
This function finds all column names within a dataframe that match the
regular expression '_month$' and '_year$', and looks for pairs that have
identical prefixes before the underscore. These fields are assumed to
describe a date, accurate to the month. The two fields are used to
construct a new _date column (having the same prefix) and the month/year
columns are then dropped.
Todo:
This function needs to be combined with convert_to_date, and improved:
* find and use a _day$ column as well
* allow specification of default month & day values, if none are found.
* allow specification of lists of year, month, and day columns to be
combined, rather than automataically finding all the matching ones.
* Do the Right Thing when invalid or NA values are encountered.
Args:
df (pandas.DataFrame): The DataFrame in which to convert year/months
fields to Date fields.
Returns:
pandas.DataFrame: A DataFrame in which the year/month fields have been
converted into Date fields.
"""
df = df.copy()
month_regex = "_month$"
year_regex = "_year$"
# Columns that match our month or year patterns.
month_cols = list(df.filter(regex=month_regex).columns)
year_cols = list(df.filter(regex=year_regex).columns)
# Base column names that don't include the month or year pattern
months_base = [re.sub(month_regex, "", m) for m in month_cols]
years_base = [re.sub(year_regex, "", y) for y in year_cols]
# We only want to retain columns that have BOTH month and year
# matches -- otherwise there's no point in creating a Date.
date_base = [base for base in months_base if base in years_base]
# For each base column that DOES have both a month and year,
# We need to grab the real column names corresponding to each,
# so we can access the values in the data frame, and use them
# to create a corresponding Date column named [BASE]_date
month_year_date = []
for base in date_base:
base_month_regex = f"^{base}{month_regex}"
month_col = list(df.filter(regex=base_month_regex).columns)
if not len(month_col) == 1:
raise AssertionError()
month_col = month_col[0]
base_year_regex = f"^{base}{year_regex}"
year_col = list(df.filter(regex=base_year_regex).columns)
if not len(year_col) == 1:
raise AssertionError()
year_col = year_col[0]
date_col = f"{base}_date"
month_year_date.append((month_col, year_col, date_col))
for month_col, year_col, date_col in month_year_date:
df = fix_int_na(df, columns=[year_col, month_col])
date_mask = (df[year_col] != "") & (df[month_col] != "")
years = df.loc[date_mask, year_col]
months = df.loc[date_mask, month_col]
df.loc[date_mask, date_col] = pd.to_datetime(
{"year": years, "month": months, "day": 1}, errors="coerce"
)
# Now that we've replaced these fields with a date, we drop them.
df = df.drop([month_col, year_col], axis=1)
return df
def remove_leading_zeros_from_numeric_strings(
df: pd.DataFrame, col_name: str
) -> pd.DataFrame:
"""Remove leading zeros frame column values that are numeric strings.
Sometimes an ID column (like generator_id or unit_id) will be reported with leading
zeros and sometimes it won't. For example, in the Excel spreadsheets published by
EIA, the same generator may show up with the ID "0001" and "1" in different years
This function strips the leading zeros from those numeric strings so the data can
be mapped accross years and datasets more reliably.
Alphanumeric generator IDs with leadings zeroes are not affected, as we
found no instances in which an alphanumeric ID appeared both with
and without leading zeroes. The ID "0A1" will stay "0A1".
Args:
df: A DataFrame containing the column you'd like to remove numeric leading zeros
from.
col_name: The name of the column you'd like to remove numeric leading zeros
from.
Returns:
A DataFrame without leading zeros for numeric string values in the desired
column.
"""
leading_zeros = df[col_name].str.contains(r"^0+\d+$").fillna(False)
if leading_zeros.any():
logger.debug(f"Fixing leading zeros in {col_name} column")
df.loc[leading_zeros, col_name] = df[col_name].str.replace(
r"^0+", "", regex=True
)
else:
logger.debug(f"Found no numeric leading zeros in {col_name}")
return df
def convert_to_date(
df,
date_col="report_date",
year_col="report_year",
month_col="report_month",
day_col="report_day",
month_value=1,
day_value=1,
):
"""Convert specified year, month or day columns into a datetime object.
If the input ``date_col`` already exists in the input dataframe, then no
conversion is applied, and the original dataframe is returned unchanged.
Otherwise the constructed date is placed in that column, and the columns
which were used to create the date are dropped.
Args:
df (pandas.DataFrame): dataframe to convert
date_col (str): the name of the column you want in the output.
year_col (str): the name of the year column in the original table.
month_col (str): the name of the month column in the original table.
day_col: the name of the day column in the original table.
month_value (int): generated month if no month exists.
day_value (int): generated day if no month exists.
Returns:
pandas.DataFrame: A DataFrame in which the year, month, day columns
values have been converted into datetime objects.
Todo:
Update docstring.
"""
df = df.copy()
if date_col in df.columns:
return df
year = df[year_col]
if month_col not in df.columns:
month = month_value
else:
month = df[month_col]
if day_col not in df.columns:
day = day_value
else:
day = df[day_col]
df[date_col] = pd.to_datetime({"year": year, "month": month, "day": day})
cols_to_drop = [x for x in [day_col, year_col, month_col] if x in df.columns]
df.drop(cols_to_drop, axis="columns", inplace=True)
return df
def fix_eia_na(df: pd.DataFrame) -> pd.DataFrame:
"""Replace common ill-posed EIA NA spreadsheet values with np.nan.
Currently replaces empty string, single decimal points with no numbers,
and any single whitespace character with np.nan.
Args:
df: The DataFrame to clean.