Skip to content
ebiiii edited this page Nov 10, 2014 · 10 revisions

This wrapper is used for generating GSN's stream elements out of CSV files.

Parameters

  • fields Name of the fields in the comma separated form.
  • formats Defines the formats of each field in a comma separated form. Possible formats are "string", "numeric", "bigint", "timestamp" and "timestampl". In the case of "timestamp" and "timestampl" the syntax is "timestamp[l]" (formats http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html). Difference between timestamp and timestampl is due to padding. For example, given a field format of "HHmm" and a data item of "10", the "timestamp" applies right-padding therefore parses "10" as "1000" therefore 10AM. The "timestampl" will apply left-padding therefore parses "10" as "0010" which is 10 minutes after mid-night.
  • timezone Default is the local-time zone. Can be set to the values specified in http://joda-time.sourceforge.net/timezones.html
  • file The actual csv data file.
  • sampling Integer value showing how often to read the data file. The value is in milli-seconds(e.g., 10000 implies 10 seconds).
  • check-point-directory The path to store the checkpoint files. The default value is "./csv-check-points".
  • use-counter-for-check-point This boolean switch allows to use count-based checkpoints. The wrapper logs processed line counts instead of latest timestamp. The default value is "false".
  • separator The separator, can be only a single character. The default value is ",".
  • quote String field's quote character. The default value is ".
  • skip-first-lines Integer value showing how many lines the csv wrapper should skip from the data file before parsing it. It is useful whenever the data file has some headers and we want the csv wrapper to parse the data after the headers. Default value is "0".
  • bad-values Comma separated list of variable for which the wrapper will store "null" instead of the actual value. For instance if the bad-values is set to -9999, the CSV wrapper (while parsing the data file) sets all the fields containing -9999 into null.

Discussions

  • Checkpoints The content of the checkpoint files (or the files themselves) can be removed during the execution of the system. If one wants to add some values to the middle of the csv data file (e.g., out of order items), those values will be rejected as long as the previous checkpoint file is still available. Therefore, for inserting out of order values:
    • Stop the GSN server
    • Remove the corresponding checkpoint file(s).
    • Update the CSV data file.
    • Start GSN again.
    • Using this approach, one will see duplicate value exceptions which are caused by the sensor readings which aren't changed in the data file. These exceptions can be ignored safely but they can reduce the performance significantly.

Example 1: timestamp in one column

Data:

01.10.2008 00:00,2.05,95.7,2.301,99.9,.369,279.4,26.19,.882,0,0,1.17,318.1,3.168,787.47,-6999,1.61,2.302,2.802,2.974
01.10.2008 00:10,1.99,97.8,2.332,99.9,.628,315.1,20.92,1.176,0,0,1.127,321.1,3.158,787.52,-6999,1.672,2.24,2.952,2.967
01.10.2008 00:20,1.97,97.7,2.278,99.9,.516,249.8,81.1,.98,0,0,1.184,322.5,3.139,787.45,-6999,1.615,2.284,2.738,2.978
01.10.2008 00:30,2.82,87.5,2.531,99.7,.745,189.5,24.18,2.254,0,0,1.247,318.2,3.195,787.43,-6999,1.617,2.259,2.769,3.047
01.10.2008 00:40,2.61,88.2,2.712,95.4,.415,142.1,58.89,.98,0,0,1.197,317.1,3.364,787.31,-6999,1.652,2.236,2.851,2.878
01.10.2008 00:50,2.46,90.7,2.738,92.6,.572,170,29.95,1.47,0,0,1.197,316.1,3.472,787.29,-6999,1.662,2.258,2.81,2.851
01.10.2008 01:00,2.34,91.1,2.713,92.8,.237,109.7,58.54,.784,0,0,1.111,314.5,3.527,787.17,-6999,1.805,1.984,2.966,2.935
01.10.2008 01:10,2.12,93.9,2.455,96.4,.765,279.6,24.19,1.47,0,0,.647,313.8,3.426,787.08,-6999,1.727,2.13,2.816,2.88
01.10.2008 01:20,2.6,88,2.613,94.1,.782,273.9,12.5,2.45,0,0,1.104,314.1,3.394,787.08,-6999,1.669,2.145,2.855,2.888
01.10.2008 01:30,2.86,83.8,2.879,87.9,.8,262.5,31.56,1.666,0,0,1.249,314.9,3.574,787,-6999,1.622,2.288,2.7,3.099
01.10.2008 01:40,3.2,80,3.073,85.1,.308,198.1,18.16,.98,0,0,1.233,315.4,3.788,786.82,-6999,1.71,2.113,2.775,3.05
<address wrapper="csv">
  <predicate key="file">/usr/local/pub/09D0STI.SPZ</predicate>
  <predicate key="fields">timed,air_temp_thyg,rel_humidity_thyg,air_temp_rotro,rel_humidity_rotro,....</predicate>
  <predicate key="formats">timestamp(d.M.y H:m),numeric,numeric,numeric,numeric,....</predicate>
  <predicate key="bad-values">NaN,6999,-6999,null</predicate>
  <predicate key="timezone">Etc/GMT-1</predicate>
  <predicate key="sampling">1000</predicate> <!-- sever 1000 milliseconds; 1sec -->
  <predicate key="check-point-directory">csv-check-points</predicate>
</address>

Example 2: timestamp scattered in different columns

Raw data:

1,2007,255,2300,-6999,-6999,-6999,12.42,6.395,17.41,17.42,17.39,-6999,-6999,-6999,-6999
1,2007,255,2310,-6999,-6999,-6999,12.42,6.388,17.4,17.41,17.39,-6999,-6999,-6999,-6999
1,2007,255,2320,-6999,-6999,-6999,12.41,6.371,17.4,17.41,17.38,-6999,-6999,-6999,-6999
1,2007,255,2330,-6999,-6999,-6999,12.41,6.355,17.4,17.41,17.38,-6999,-6999,-6999,-6999
1,2007,255,2340,-6999,-6999,-6999,12.4,6.336,17.39,17.41,17.38,-6999,-6999,-6999,-6999
1,2007,255,2350,-6999,-6999,-6999,12.4,6.309,17.39,17.4,17.38,-6999,-6999,-6999,-6999
1,2007,256,0,-6999,-6999,-6999,12.41,6.274,17.39,17.4,17.37,-6999,-6999,-6999,-6999
1,2007,256,10,-6999,-6999,-6999,12.4,6.243,17.38,17.4,17.37,-6999,-6999,-6999,-6999
1,2007,256,20,-6999,-6999,-6999,12.41,6.219,17.39,17.4,17.37,-6999,-6999,-6999,-6999
1,2007,256,30,-6999,-6999,-6999,12.4,6.213,17.38,17.39,17.37,-6999,-6999,-6999,-6999
1,2007,256,40,-6999,-6999,-6999,12.4,6.215,17.37,17.38,17.36,-6999,-6999,-6999,-6999
1,2007,256,50,-6999,-6999,-6999,12.4,6.23,17.38,17.39,17.37,-6999,-6999,-6999,-6999
1,2007,256,100,-6999,-6999,-6999,12.4,6.249,17.37,17.38,17.37,-6999,-6999,-6999,-6999
1,2007,256,110,-6999,-6999,-6999,12.39,6.264,17.37,17.38,17.36,-6999,-6999,-6999,-6999
1,2007,256,120,-6999,-6999,-6999,12.39,6.265,17.37,17.38,17.35,-6999,-6999,-6999,-6999
1,2007,256,130,-6999,-6999,-6999,12.38,6.231,17.36,17.38,17.35,-6999,-6999,-6999,-6999
1,2007,256,140,-6999,-6999,-6999,12.38,6.171,17.36,17.38,17.34,-6999,-6999,-6999,-6999

GSN Configuration:

<address wrapper="csv">
  <predicate key="file">/Messdaten/Damma_final_storage_1.dat</predicate>
  <predicate key="fields">station_id,timed,timed,timed,flowrate_p_avg,flowrate_p_max,....</predicate>
  <predicate key="formats">numeric,timestamp(y),timestamp(D),timestampl(HHmm),numeric,numeric,...</predicate>
  <predicate key="bad-values">NaN, 6999, -6999, null</predicate>
  <predicate key="timezone">Etc/GMT+1</predicate>
  <predicate key="sampling">60000</predicate> <!-- every minute -->
  <predicate key="check-point-directory">csv-check-points</predicate>
</address>

Example 3: timestamp generated through a conversion formula

Data:

#time [d] P sb50s [bar] P sb120 [bar] T sb50s [°C] T sb120 [°C] 
731161.1625000 0.9929155 0.8977918 3.5088000 4.1778000
731161.1666700 0.9929155 0.8977918 3.5088000 4.1778000
731161.1708300 0.9919155 0.8977574 3.5088000 4.2046000
731161.1750000 0.9928574 0.8977918 3.5355000 4.1778000
731161.1791700 0.9929155 0.8977918 3.5088000 4.1778000
731161.1833300 0.9927991 0.8977574 3.5623000 4.2046000
731161.1875000 0.9928574 0.8977574 3.5355000 4.2046000
731161.1916700 0.9929737 0.8977918 3.4821000 4.1778000
731161.1958300 0.9919155 0.8977918 3.5088000 4.1778000

GSN Configuration: note that this data, requires count-based check points

   <stream name="data">
      <source alias="source" storage-size="1" sampling-rate="1">
        <address wrapper="csv">
           <predicate key="file">borehole_pressure.dat</predicate>
           <predicate key="fields">excel_day,pressure_50m,pressure_120m,temperature_50m,temperature_120m</predicate>
           <predicate key="formats">numeric,numeric,numeric,numeric,numeric</predicate>
           <predicate key="bad-values">999999999, NaN, NA</predicate>
           <predicate key="timezone">Etc/GMT-1</predicate>
           <predicate key="sampling">10000</predicate>
           <predicate key="check-point-directory">csv-check-points</predicate>
           <predicate key="use-counter-for-check-point">true</predicate>
           <predicate key="skip-first-lines">1</predicate>
           <predicate key="separator"> </predicate>
        </address>
        <query>select ((EXCEL_DAY-719529)*86400000) AS TIMED, PRESSURE_50M, PRESSURE_120M, TEMPERATURE_50M, TEMPERATURE_120M from wrapper</query>
      </source>
      <query>select TIMED, PRESSURE_50M, PRESSURE_120M, TEMPERATURE_50M, TEMPERATURE_120M from source</query>
   </stream>
</streams>

Useful Links

Clone this wiki locally