In [2]:
import pandas as pd

In [10]:
pd.read_clipboard().to_dict(orient="records")

[{'Row': 1,
  'NERR Site ID ': 'ace                                               ',
  'Station Code': 'acebbnut  ',
  'Station Name': 'Big Bay                                 ',
  'Lat Long': "32° 29' 38.76 N - 80° 19' 26.76 W",
  'Latitude ': 32.4941,
  ' Longitude': 80.3241,
  ' Status': 'Inactive  ',
  ' Active Dates': 'Feb 2002-Dec 2014',
  ' State': 'sc        ',
  ' Reserve Name': 'Ashepoo Combahee Edisto Basin                     ',
  'Real Time': nan,
  'HADS ID': nan,
  'GMT Offset': -5,
  'Station Type': 2,
  'Region': 2,
  'isSWMP': 'P',
  'Parameters Reported': 'PO4F,NH4F,NO23F,CHLA_N,DIN,NO2F,NO3F'}]

In [6]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions

In [4]:
# General parsing functions...
def flag_parse(val):
    # Remove brackets from flag columns...
    if val:
        val = val.strip()
        regex = r"<?(-?\d+)>?\D?"
        parsed_val = re.findall(regex, val)[0]
        return float(parsed_val) if parsed_val else None
    else:
        return None
def measurement_parse(val):
    val = val.strip()
    return float(val) if val else None 
def date_parse(val):
    val = val.strip()
    return parse(val) if val else None
def integer_parse(val):
    val = val.strip()
    return int(val) if val else None
def string_parse(val):
    val = val.strip()
    return str(val) if val else None

class DataIngestions:
    
    # Dataset names
    WQ = "wq"
    MET = "met"
    NUT = "nut"
    STATION = "station"
    ENV_BUCKET = "environmental_data"
    LANDING_ZONE = "landing_zone"
   
    def __init__(self):
        pass

    def read_csv(self, string_input):
        for line in csv.reader([string_input], quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL, skipinitialspace=True):
            if line:
                # Strip trailing spaces                
                line = [field.strip() for field in line]
                return line
        
    def parse_wq_row(self,  row):
        if row:
            return beam.Row(
                StationCode = string_parse(row[0]),
                isSWMP = string_parse(row[1]),
                DateTimeStamp = date_parse(row[2]),
                Historical = integer_parse(row[3]),
                ProvisionalPlus = integer_parse(row[4]),
                F_Record = flag_parse(row[5]),
                Temp = measurement_parse(row[6]),
                F_Temp = flag_parse(row[7]),
                SpCond = measurement_parse(row[8]),
                F_SpCond = flag_parse(row[9]),
                Sal = measurement_parse(row[10]),
                F_Sal = flag_parse(row[11]),
                DO_Pct = measurement_parse(row[12]),
                F_DO_Pct = flag_parse(row[13]),
                DO_mgl = measurement_parse(row[14]),
                F_DO_mgl = flag_parse(row[15]),
                Depth = measurement_parse(row[16]),
                F_Depth = flag_parse(row[17]),
                cDepth = measurement_parse(row[18]),
                F_cDepth = flag_parse(row[19]),
                Level = measurement_parse(row[20]),
                F_Level = flag_parse(row[21]),
                cLevel = measurement_parse(row[22]),
                F_cLevel = flag_parse(row[23]),
                pH = measurement_parse(row[24]),
                F_pH = flag_parse(row[25]),
                Turb = measurement_parse(row[26]),
                F_Turb = flag_parse(row[27]),
                ChlFluor = measurement_parse(row[28]),
                F_ChlFluor = flag_parse(row[29])                    
            )

    def parse_nut_row(self,  row):
        if row:
            return beam.Row(
                StationCode = string_parse(row["StationCode"]),
                isSWMP = string_parse(row["isSWMP"]),
                DateTimeStamp = date_parse(row["DateTimeStamp"]),
                Historical = integer_parse(row["Historical"]),
                ProvisionalPlus = integer_parse(row["ProvisionalPlus"]),
                CollMethd = integer_parse(row["CollMethd"]),
                REP = string_parse(row["REP"]),
                F_Record = flag_parse(row["F_Record"]),
                PO4F = measurement_parse(row["PO4F"]),
                F_PO4F = flag_parse(row["F_PO4F"]),
                NH4F = measurement_parse(row["NH4F"]),
                F_NH4F = flag_parse(row["F_NH4F"]),
                NO2F = measurement_parse(row["NO2F"]),
                F_NO2F = flag_parse(row["F_NO2F"]),
                NO3F = measurement_parse(row["NO3F"]),
                F_NO3F = flag_parse(row["F_NO3F"]),
                NO23F = measurement_parse(row["NO23F"]),
                F_NO23F = flag_parse(row["F_NO23F"]),
                CHLA_N = measurement_parse(row["CHLA_N"]),
                F_CHLA_N = flag_parse(row["F_CHLA_N"])                   
            )        

    def parse_met_row(self,  row):
        if row:
            return beam.Row(
                StationCode = string_parse(row[0]),
                isSWMP = string_parse(row[1]),
                DateTimeStamp = date_parse(row[2]),
                Historical = integer_parse(row[3]),
                ProvisionalPlus = integer_parse(row[4]),
                Frequency = integer_parse(row[5]),
                F_Record = flag_parse(row[6]),
                ATemp = measurement_parse(row[7]),
                F_ATemp = flag_parse(row[8]),
                RH = measurement_parse(row[9]),
                F_RH = flag_parse(row[10]),
                BP = measurement_parse(row[11]),
                F_BP = flag_parse(row[12]),
                WSpd = measurement_parse(row[13]),
                F_WSpd = flag_parse(row[14]),
                MaxWSpd = measurement_parse(row[15]),
                F_MaxWSpd = flag_parse(row[16]),
                MaxWSpdT = string_parse(row[17]),
                Wdir = measurement_parse(row[18]),
                F_Wdir = flag_parse(row[19]),
                SDWDir = measurement_parse(row[20]),
                F_SDWDir = flag_parse(row[21]),
                TotPAR = measurement_parse(row[22]),
                F_TotPAR = flag_parse(row[23]),
                TotPrcp = measurement_parse(row[24]),
                F_TotPrcp = flag_parse(row[25]),
                TotSoRad = string_parse(row[26]),
                F_TotSoRad = flag_parse(row[27])
            )

    def parse_station_row(self,  row):
        if row:
            return beam.Row(
                # Index 1 is just row num
                NERRSiteID = string_parse(row[1]),
                StationCode = string_parse(row[2]),
                StationName = string_parse(row[3]),
                LatLong = string_parse(row[4]),
                Latitude = measurement_parse(row[5]),
                Longitude = measurement_parse(row[6]),
                Status = string_parse(row[7]),
                ActiveDates = string_parse(row[8]),
                State = string_parse(row[9]),
                ReserveName = string_parse(row[10]),
                RealTime = string_parse(row[11]),
                HADSID = string_parse(row[12]),
                GMTOffset = measurement_parse(row[13]),
                StationType = integer_parse(row[14]),
                Region = integer_parse(row[15]),
                isSWMP = string_parse(row[16]),
                ParametersReported = string_parse(row[17])
            )

In [2]:
# from google.cloud import storage
# https://googleapis.dev/python/storage/latest/blobs.html

from collections import namedtuple
import csv

import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import fileio
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection
import numpy as np
from dateutil.parser import parse
import io
import re

    
di = DataIngestions()

# https://beam.apache.org/documentation/dsls/dataframes/overview/
def run(argv=None, save_main_session=True):

#     datasets = di.get_dataset_paths()
    # https://docs.google.com/presentation/d/10XnYgKsTzuvXuQrdt9KtA21l2ifXstWMw19r6_wWpJ0/edit#slide=id.g90d6944d57_0_1281

    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = 'data-science'
    google_cloud_options.job_name = 'nerr-pipeline'
    google_cloud_options.staging_location = 'gs://environmental_data/'
    google_cloud_options.temp_location = 'gs://environmental_data/' 
    google_cloud_options.region = 'us-east1'        
    google_cloud_options.service_account_email = "pipeline-admin-sc@data-science-306122.iam.gserviceaccount.com"
    options.view_as(StandardOptions).runner = 'DataflowRunner'    
    
    with beam.Pipeline(options=PipelineOptions()) as p:
        print(options)
        # Creates a PCollection of files

#         wq_collection = (
#             p
#             | "WQ Dataset" >> beam.Create(datasets.get(di.WQ)[0:1])
#             | "Read WQ CSV files" >> ReadAllFromText(skip_header_lines=1)
#             | "Split WQ CSV files" >> beam.Map(di.read_csv)
#             | "WQ Parse and To Row" >> beam.Map(
#                 lambda row: di.parse_wq_row(row)
#             )
# #             | "Print WQ" >> beam.Map(print)            
#         )
        wq_collection = (
            p
#             | "WQ Dataset" >> beam.Create(datasets.get(di.WQ)[0:1])
            | 'MatchAll' >> beam.io.ReadFromText("gs://environmental_data/landing_zone/cdmo/advanced_query_system/2021_02_28/*nut*.csv")
#             | beam.Reshuffle()
#             | 'ReadEach' >> fileio.ReadMatches()
#             | beam.FlatMap(lambda rfile:
#                            csv.DictReader(io.TextIOWrapper(rfile.open()))
#             )
            | "Print WQ" >> beam.Map(print)            
        )            

        
#         nut_collection = (
#             p
#             | "NUT Dataset" >> beam.Create(datasets.get(di.NUT)[0:1])
#             | "Read NUT CSV files" >> ReadAllFromText(skip_header_lines=1)
#             | "Split NUT CSV files" >> beam.Map(di.read_csv)
#             | "NUT Parse and To Row" >> beam.Map(
#                 lambda row: di.parse_nut_row(row)
#             )
# #             | "Print NUT" >> beam.Map(print)            
#         ) 
#         met_collection = (
#             p
#             | "MET Dataset" >> beam.Create(datasets.get(di.MET)[0:1])
#             | "Read MET CSV files" >> ReadAllFromText(skip_header_lines=1)
#             | "Split MET CSV files" >> beam.Map(di.read_csv)
#             | "MET Parse and To Row" >> beam.Map(
#                 lambda row: di.parse_met_row(row)
#             )
# #             | "Print MET" >> beam.Map(print)            
#         )
#         station_collection = (
#             p
#             | "Station Dataset" >> beam.Create(datasets.get(di.STATION)[0:1])
#             | "Read Station CSV files" >> ReadAllFromText(skip_header_lines=1)
#             | "Split Station CSV files" >> beam.Map(di.read_csv)
#             | "Station Parse and To Row" >> beam.Map(
#                 lambda row: di.parse_met_row(row)
#             )
# #             | "Print MET" >> beam.Map(print)            
#         )        
        result = p.run()
        print("Done")

# run()

NameError: name 'DataIngestions' is not defined

In [5]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
 
from datasource import ReadFromCsv
di = DataIngestions()

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


class NERRPipelineOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):  # type: (_BeamArgumentParser) -> None
    parser.add_argument("--station_files", default="gs://environmental_data/landing_zone/cdmo/advanced_query_system/*/*stations*.csv")
    parser.add_argument("--nut_files", default="gs://environmental_data/landing_zone/cdmo/advanced_query_system/*/*nut*.csv")
    parser.add_argument("--wq_files", default="gs://environmental_data/landing_zone/cdmo/advanced_query_system/*/*wq*.csv")
    parser.add_argument("--met_files", default="gs://environmental_data/landing_zone/cdmo/advanced_query_system/*/*met*.csv")


def run(options):
    print(options)
    
    with beam.Pipeline(options=options) as p:
        print(options)
        data = (
            p
            | "Read files" >> ReadFromCsv(options.view_as(NERRPipelineOptions).station_files)
    #         | "NUT Parse and To Row" >> beam.Map(
    #             lambda row: di.parse_nut_row(row)
    #         )        
            | "Print WQ" >> beam.Map(print)
        )

import sys
options = PipelineOptions(sys.argv[1:])
run(options)



PipelineOptions()
PipelineOptions()




{'Row': '1', 'NERR Site ID ': 'ace                                               ', 'Station Code': 'acebbnut  ', 'Station Name': 'Big Bay                                 ', 'Lat Long': "32° 29' 38.76 N - 80° 19' 26.76 W", 'Latitude ': '32.4941', ' Longitude': '80.3241', ' Status': 'Inactive  ', ' Active Dates': 'Feb 2002-Dec 2014', ' State': 'sc        ', ' Reserve Name': 'Ashepoo Combahee Edisto Basin                     ', 'Real Time': '', 'HADS ID': '', 'GMT Offset': '-5        ', 'Station Type': '2', 'Region': '2', 'isSWMP': 'P', 'Parameters Reported': 'PO4F,NH4F,NO23F,CHLA_N,DIN,NO2F,NO3F'}
{'Row': '2', 'NERR Site ID ': 'ace                                               ', 'Station Code': 'acebbwq   ', 'Station Name': 'Big Bay                                 ', 'Lat Long': "32° 29' 38.76 N - 80° 19' 26.76 W", 'Latitude ': '32.4941', ' Longitude': '80.3241', ' Status': 'Inactive  ', ' Active Dates': 'Mar 1995-Dec 2014', ' State': 'sc        ', ' Reserve Name': 'Ashepoo Combahee Ed

{'Row': '62', 'NERR Site ID ': 'elk                                               ', 'Station Code': 'elkcwmet  ', 'Station Name': 'Caspian Weather Station                 ', 'Lat Long': "36° 45' 55.57 N - 121° 44' 17.32 W", 'Latitude ': '36.8154361', ' Longitude': '121.7381444', ' Status': 'Active    ', ' Active Dates': 'Jan 2001-', ' State': 'ca        ', ' Reserve Name': 'Elkhorn Slough                                    ', 'Real Time': 'R         ', 'HADS ID': '3B0155FC  ', 'GMT Offset': '-8        ', 'Station Type': '0', 'Region': '6', 'isSWMP': 'P', 'Parameters Reported': 'ATemp,RH,BP,WSpd,MaxWSpd,MaxWSpdT,Wdir,SDWDir,TotPAR,TotPrcp,CumPrcp'}
{'Row': '63', 'NERR Site ID ': 'elk                                               ', 'Station Code': 'elknmnut  ', 'Station Name': 'North Marsh                             ', 'Lat Long': "36° 50' 4.56 N - 121° 44' 18.24 W", 'Latitude ': '36.8346', ' Longitude': '121.7384', ' Status': 'Active    ', ' Active Dates': 'Apr 2002-', ' State': 'ca 

{'Row': '84', 'NERR Site ID ': 'gtm                                               ', 'Station Code': 'gtmpcmet  ', 'Station Name': 'Pellicer Creek                          ', 'Lat Long': "29° 39' 28.08 N - 81° 13' 58 W", 'Latitude ': '29.657702', ' Longitude': '81.232743', ' Status': 'Active    ', ' Active Dates': 'Sep 2002-', ' State': 'fl        ', ' Reserve Name': 'Guana Tolomato Mantanzas                          ', 'Real Time': 'R         ', 'HADS ID': '3B01B60E  ', 'GMT Offset': '-5        ', 'Station Type': '0', 'Region': '2', 'isSWMP': 'P', 'Parameters Reported': 'ATemp,RH,BP,WSpd,MaxWSpd,MaxWSpdT,Wdir,SDWDir,TotPAR,TotPrcp,CumPrcp'}
{'Row': '85', 'NERR Site ID ': 'gtm                                               ', 'Station Code': 'gtmpcnut  ', 'Station Name': 'Pellicer Creek                          ', 'Lat Long': "29° 40' 01 N - 81° 15' 27 W", 'Latitude ': '29.667071', ' Longitude': '81.257403', ' Status': 'Active    ', ' Active Dates': 'May 2002-', ' State': 'fl        ', 

{'Row': '351', 'NERR Site ID ': 'kac                                               ', 'Station Code': 'kach3wq   ', 'Station Name': 'Homer Surface 3                         ', 'Lat Long': "59° 36' 7.38 N - 151° 24' 33.90 W", 'Latitude ': '59.60205', ' Longitude': '151.40942', ' Status': 'Active    ', ' Active Dates': 'Jun 2012 -', ' State': 'AK        ', ' Reserve Name': 'Kachemak Bay                                      ', 'Real Time': '', 'HADS ID': '', 'GMT Offset': '-9        ', 'Station Type': '1', 'Region': '6', 'isSWMP': 'P', 'Parameters Reported': 'Temp,SpCond,Sal,DO_pct,DO_mgl,Depth,cDepth,pH,Turb,ChlFluor'}
{'Row': '358', 'NERR Site ID ': 'lks                                               ', 'Station Code': 'lksolwq   ', 'Station Name': 'Oliver Bridge                           ', 'Lat Long': "46° 39' 24.66 N - 92° 12' 5.98 W", 'Latitude ': '46.65685', ' Longitude': '92.20166', ' Status': 'Active    ', ' Active Dates': 'Jul 2012 -', ' State': 'WI        ', ' Reserve Name': 'La

{'Row': '182', 'NERR Site ID ': 'pdb                                               ', 'Station Code': 'pdbgsnut  ', 'Station Name': 'Gong Surface                            ', 'Lat Long': "48° 33' 27.00 N - 122° 34' 21.00 W", 'Latitude ': '48.55750', ' Longitude': '122.57250', ' Status': 'Active    ', ' Active Dates': 'Apr 2003-', ' State': 'wa        ', ' Reserve Name': 'Padilla Bay                                       ', 'Real Time': '', 'HADS ID': '', 'GMT Offset': '-8        ', 'Station Type': '2', 'Region': '7', 'isSWMP': 'P', 'Parameters Reported': 'PO4F,NH4F,NO2F,NO3F,NO23F,CHLA_N,DIN,DON,DOP,DO_S_N,PHEA,PHOSP,PN,SALT_N,SIO4F,TDN,TDP,TN,TON,TP,TSS,DO_N,SO4,TVS,DOC'}
{'Row': '183', 'NERR Site ID ': 'pdb                                               ', 'Station Code': 'pdbgswq   ', 'Station Name': 'Gong Surface                            ', 'Lat Long': "48° 33' 27.00 N - 122° 34' 21.00 W", 'Latitude ': '48.55750', ' Longitude': '122.57250', ' Status': 'Active    ', ' Active Dates

{'Row': '235', 'NERR Site ID ': 'tjr                                               ', 'Station Code': 'tjrrcnut  ', 'Station Name': 'River Channel                           ', 'Lat Long': "32° 33' 28.08 N - 117° 6' 21.96 W", 'Latitude ': '32.5578', ' Longitude': '117.1061', ' Status': 'Inactive  ', ' Active Dates': 'Sep 2002-Nov 2004', ' State': 'ca        ', ' Reserve Name': 'Tijuana River                                     ', 'Real Time': '', 'HADS ID': '', 'GMT Offset': '-8        ', 'Station Type': '2', 'Region': '6', 'isSWMP': 'P', 'Parameters Reported': 'PO4F,NH4F,NO2F,NO3F,NO23F,CHLA_N,DIN'}
{'Row': '236', 'NERR Site ID ': 'tjr                                               ', 'Station Code': 'tjrrcwq   ', 'Station Name': 'River Channel                           ', 'Lat Long': "32° 33' 28.08 N - 117° 6' 21.96 W", 'Latitude ': '32.5578', ' Longitude': '117.1061', ' Status': 'Inactive  ', ' Active Dates': 'Aug 2002-Nov 2004', ' State': 'ca        ', ' Reserve Name': 'Tijuana River

In [18]:
NERRPipelineOptions.nut_files

AttributeError: type object 'NERRPipelineOptions' has no attribute 'nut_files'

In [5]:
[print(x) for x in data]

TypeError: 'ReadFromCsv' object is not iterable