*Note: this notebook was generated by dsflow*

# meteo_bretagne / create_table

## \[1- config\] get task configuration as defined in dag_specs.yaml

In [2]:
import os
import json
import datetime as dt
from pyspark.sql import SparkSession
from dsflow_core.helpers import DsflowContext

spark = SparkSession.builder.getOrCreate()
dsflow = DsflowContext.create()

spark

In [3]:
default_task_specs = """
    {"source_path": "/data/raw/meteo/ds=2017-09-23",
     "sink_path": "/data/tables/meteo/ds=2017-09-23",
     "ds": "2017-09-23"}
    """

In [5]:
task_specs_raw = os.environ.get('TASK_SPECS', default_task_specs)
task_specs = json.loads(task_specs_raw)

# dsflow alerts if something looks wrong
dsflow.validade_task_specs(task_specs)

print(task_specs)

{'source_path': '/data/raw/meteo/ds=2017-09-23', 'sink_path': '/data/tables/meteo/ds=2017-09-23', 'ds': '2017-09-23'}


In [6]:
# Get task variables:
source_path = task_specs["source_path"]
sink_path = task_specs["sink_path"]
ds = task_specs["ds"]  # ds is the execution date

In [5]:
# https://public.opendatasoft.com/explore/dataset/donnees-synop-essentielles-omm/information/

## \[2- first data check\] using Spark, print a couple lines from source_path

In [6]:
raw_data = spark.read.text(source_path)

## \[3- process data\] convert raw data into a Spark dataframe

In [7]:
dataset = (spark
.read
.options(wholeFile=True, inferSchema=True)
.json(source_path)
)

number of rows in dataset

In [8]:
dataset.count()

1694

In [9]:
# by default, dataframe uses an auto-detected schema
dataset.printSchema()

root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- ch: string (nullable = true)
 |    |-- cl: string (nullable = true)
 |    |-- cm: string (nullable = true)
 |    |-- cod_tend: long (nullable = true)
 |    |-- coordonnees: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- ctype1: string (nullable = true)
 |    |-- ctype2: string (nullable = true)
 |    |-- ctype3: string (nullable = true)
 |    |-- ctype4: string (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- dd: long (nullable = true)
 |    |-- etat_sol: string (nullable = true)
 |    |-- ff: double (nullable = true)
 |    |-- hbas: string (nullable = true)
 |    |-- hnuage1: string (nullable = true)
 |    |-- hnuage2: string (nullable = true)
 |    |-- hnuage3: string (nullable = true)
 |    |-- hnuage4: string (nullable = true)
 |    |-- n: string (nullable = true)
 |    |-- nbas: string (nullable = true)
 |    |-- nnuage1: string 

In [10]:
from pyspark.sql.functions import expr

In [11]:
dataset02 = (dataset
  .selectExpr("cast(fields.date as timestamp) as ts",
          "date(cast(fields.date as date)) as record_date",
          "fields.*")
  )

In [12]:
dataset02.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- record_date: date (nullable = true)
 |-- ch: string (nullable = true)
 |-- cl: string (nullable = true)
 |-- cm: string (nullable = true)
 |-- cod_tend: long (nullable = true)
 |-- coordonnees: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- ctype1: string (nullable = true)
 |-- ctype2: string (nullable = true)
 |-- ctype3: string (nullable = true)
 |-- ctype4: string (nullable = true)
 |-- date: string (nullable = true)
 |-- dd: long (nullable = true)
 |-- etat_sol: string (nullable = true)
 |-- ff: double (nullable = true)
 |-- hbas: string (nullable = true)
 |-- hnuage1: string (nullable = true)
 |-- hnuage2: string (nullable = true)
 |-- hnuage3: string (nullable = true)
 |-- hnuage4: string (nullable = true)
 |-- n: string (nullable = true)
 |-- nbas: string (nullable = true)
 |-- nnuage1: string (nullable = true)
 |-- nnuage2: string (nullable = true)
 |-- nnuage3: string (nullable = true)
 |-- nnuage4: s

In [13]:
dataset02.groupby("record_date").count().show()

+-----------+-----+
|record_date|count|
+-----------+-----+
| 2017-01-06|    8|
| 2017-01-27|    8|
| 2017-02-26|    8|
| 2017-01-24|    8|
| 2017-06-29|    8|
| 2017-02-16|    8|
| 2017-07-31|    8|
| 2017-04-09|    8|
| 2017-03-28|    8|
| 2017-02-28|    8|
| 2017-06-30|    8|
| 2017-01-30|    8|
| 2017-07-06|    8|
| 2017-05-11|    8|
| 2017-02-10|    8|
| 2017-04-25|    8|
| 2017-03-19|    8|
| 2017-05-26|    8|
| 2017-01-04|    8|
| 2017-06-28|    8|
+-----------+-----+
only showing top 20 rows



## First 10 rows

In [14]:
dsflow.display(dataset02)

Showing 10 random rows


Unnamed: 0,ts,record_date,ch,cl,cm,cod_tend,coordonnees,ctype1,ctype2,ctype3,...,tend,tend24,tn12,tx12,type_de_tendance_barometrique,u,vv,w1,w2,ww
0,2017-05-05 21:00:00,2017-05-05,,,,3,"[48.825833, -3.473167]",,,,...,60,-960,,,"En baisse ou stationnaire, puis en hausse, ou ...",93,,,,
1,2017-05-06 06:00:00,2017-05-06,,37.0,,3,"[48.825833, -3.473167]",7.0,,,...,40,-960,283.95,285.35,"En baisse ou stationnaire, puis en hausse, ou ...",97,4000.0,2.0,,10.0
2,2017-06-02 15:00:00,2017-06-02,,35.0,,7,"[48.825833, -3.473167]",6.0,,,...,-10,-210,,,En baisse,76,20000.0,2.0,,2.0
3,2017-05-26 09:00:00,2017-05-26,10.0,30.0,25.0,8,"[48.825833, -3.473167]",4.0,,,...,-40,-740,,,"Stationnaire ou en hausse, puis en baisse, ou ...",72,20000.0,0.0,,1.0
4,2017-07-25 18:00:00,2017-07-25,,35.0,22.0,8,"[48.825833, -3.473167]",6.0,4.0,,...,-140,-340,288.65,292.95,"Stationnaire ou en hausse, puis en baisse, ou ...",83,7000.0,2.0,2.0,2.0
5,2017-04-06 06:00:00,2017-04-06,10.0,31.0,21.0,3,"[48.825833, -3.473167]",8.0,4.0,,...,40,0,281.05,283.45,"En baisse ou stationnaire, puis en hausse, ou ...",90,15000.0,0.0,0.0,0.0
6,2017-04-05 03:00:00,2017-04-05,,,,5,"[48.825833, -3.473167]",,,,...,-10,570,,,"En baisse, puis en hausse, la pression atmosph...",88,,,,
7,2017-04-22 12:00:00,2017-04-22,,,,8,"[48.825833, -3.473167]",,,,...,-90,-430,,,"Stationnaire ou en hausse, puis en baisse, ou ...",77,,,,
8,2017-06-11 09:00:00,2017-06-11,,35.0,24.0,1,"[48.825833, -3.473167]",6.0,3.0,,...,50,40,,,"En hausse, puis stationnaire, ou en hausse, pu...",82,6000.0,2.0,,1.0
9,2017-06-12 21:00:00,2017-06-12,,,,3,"[48.825833, -3.473167]",,,,...,20,230,,,"En baisse ou stationnaire, puis en hausse, ou ...",87,,,,


In [15]:
dsflow.display(dataset02.describe())

Showing 10 random rows


Unnamed: 0,summary,ch,cl,cm,cod_tend,ctype1,ctype2,ctype3,ctype4,date,...,tend,tend24,tn12,tx12,type_de_tendance_barometrique,u,vv,w1,w2,ww
0,count,540.0,992.0,704.0,1694.0,950.0,507.0,146.0,17.0,1694,...,1694.0,1689.0,424.0,424.0,1694,1694.0,981.0,1008.0,225.0,990.0
1,mean,13.974074074074077,34.788306451612904,23.056818181818183,4.38370720188902,7.393684210526316,3.777120315581854,2.650684931506849,2.235294117647059,,...,-0.7083825265643447,-9.727649496743636,283.13136792452826,287.0662735849058,,81.22136953955136,14790.876656472989,1.9583333333333333,2.28,11.217171717171718
2,stddev,10.07261548104618,4.780293422129401,7.206931947629506,2.6803071205273765,8.014657417164052,2.330335518286593,2.8323073711860487,3.881517289744854,,...,129.96557757037158,666.3523979084362,4.13901542826014,4.684761704459992,,12.003936803107043,8369.626868897245,1.8697003886178163,1.6110112174833369,20.653927961123127
3,min,10.0,30.0,20.0,0.0,0.0,0.0,0.0,0.0,2017-01-01T01:00:00+01:00,...,-780.0,-2230.0,270.05,274.35,En baisse,35.0,100.0,0.0,0.0,0.0
4,max,60.0,62.0,61.0,8.0,9.0,9.0,9.0,9.0,2017-07-31T23:00:00+02:00,...,830.0,2910.0,292.65,302.25,"Stationnaire, la pression atmosphérique est la...",99.0,9000.0,9.0,7.0,89.0


## \[4- write on disk\] write down the output to disk

In [None]:
final_dataset = dataset02

In [19]:
(final_dataset
 .write
 .mode("overwrite")
 .parquet(sink_path)
)

In [7]:
dsflow.validade_task_output(task_specs)

'not implemented'