*Note: this notebook was generated by dsflow*

# create table meteoparis

## \[1- config\] set input and output paths

In [1]:
import os
import json
import datetime as dt
from pyspark.sql import SparkSession
from dsflow_core.helpers import DsflowContext

spark = SparkSession.builder.getOrCreate()
dsflow = DsflowContext.create()

spark

In [2]:
default_task_specs = """
    {"source_path": "/data/raw/meteoparis/ds=2017-11-24",
     "sink_path": "/data/tables/meteoparis/ds=2017-11-24",
     "ds": "2017-11-24"}
    """

In [3]:
task_specs_raw = os.environ.get('TASK_SPECS', default_task_specs)
task_specs = json.loads(task_specs_raw)

# dsflow alerts if something looks wrong
dsflow.validade_task_specs(task_specs)

print(task_specs)

{'source_path': '/data/raw/meteoparis/ds=2017-11-24', 'sink_path': '/data/tables/meteoparis/ds=2017-11-24', 'ds': '2017-11-24'}


In [4]:
# Get task variables:
source_path = task_specs["source_path"]
sink_path = task_specs["sink_path"]
ds = task_specs["ds"]  # ds is the execution date

In [None]:
# https://public.opendatasoft.com/explore/dataset/donnees-synop-essentielles-omm/information/

## \[2- first data check\] using Spark, print a couple lines from source_path

In [5]:
raw_data = spark.read.text(source_path)

In [6]:
raw_data.show()

+--------------------+
|               value|
+--------------------+
|[{"datasetid": "a...|
+--------------------+



## \[3- process data\] convert raw data into a Spark dataframe

In [7]:
dataset = (spark
.read
.json(source_path)
)

number of rows in dataset

In [8]:
dataset.count()

41551

In [9]:
# by default, dataframe uses an auto-detected schema
dataset.printSchema()

root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- 2_metre_temperature: double (nullable = true)
 |    |-- forecast: string (nullable = true)
 |    |-- position: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- relative_humidity: double (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |    |-- total_water_precipitation: double (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- record_timestamp: string (nullable = true)
 |-- recordid: string (nullable = true)



## First 10 rows

In [12]:
dataset02 = dataset.selectExpr("fields.*")

In [13]:
dsflow.display(dataset02)

Showing 10 random rows


Unnamed: 0,2_metre_temperature,forecast,position,relative_humidity,timestamp,total_water_precipitation
0,9.728647,2017-11-23T23:00:00+01:00,"[49.15, 2.325]",95.755348,2017-11-23T19:00:00+01:00,0.336426
1,9.078256,2017-11-23T23:00:00+01:00,"[49.125, 2.1]",99.974098,2017-11-23T19:00:00+01:00,0.890137
2,11.045053,2017-11-23T23:00:00+01:00,"[49.125, 2.325]",98.317848,2017-11-23T19:00:00+01:00,0.662109
3,11.044077,2017-11-23T23:00:00+01:00,"[49.125, 2.575]",97.255348,2017-11-23T19:00:00+01:00,0.262695
4,9.172006,2017-11-23T23:00:00+01:00,"[49.1, 2.1]",100.005348,2017-11-23T19:00:00+01:00,1.161621
5,9.572397,2017-11-23T23:00:00+01:00,"[49.1, 2.325]",100.005348,2017-11-23T19:00:00+01:00,1.245117
6,10.801889,2017-11-23T23:00:00+01:00,"[49.1, 2.575]",96.692848,2017-11-23T19:00:00+01:00,0.760254
7,9.678842,2017-11-23T23:00:00+01:00,"[49.075, 2.075]",99.442848,2017-11-23T19:00:00+01:00,1.511719
8,10.705209,2017-11-23T23:00:00+01:00,"[49.075, 2.3]",98.755348,2017-11-23T19:00:00+01:00,2.009766
9,10.211069,2017-11-23T23:00:00+01:00,"[49.075, 2.575]",96.536598,2017-11-23T19:00:00+01:00,1.564453


In [14]:
dsflow.display(dataset.describe())

Showing 10 random rows


Unnamed: 0,summary,datasetid,record_timestamp,recordid
0,count,41551,41551,41551
1,mean,,,
2,stddev,,,
3,min,arome-0025-sp1_sp2_paris,2017-11-23T19:00:00+01:00,arome_0025_sp1_sp2_lastgrib2/10233181
4,max,arome-0025-sp1_sp2_paris,2017-11-23T19:00:00+01:00,arome_0025_sp1_sp2_lastgrib2/9775023


## \[4- write on disk\] write the output as parquet

In [16]:
final_dataset = dataset02

In [17]:
(final_dataset
 .write
 .mode("overwrite")
 .parquet(sink_path)
)

In [18]:
dsflow.validade_task_output(task_specs)

'not implemented'