In [20]:
import os

In [21]:
# Set generic variables
%run set_env.ipynb

In [22]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark Session with Default Configurations").getOrCreate()

### Set path to Parquet file(s) in the Iguazio Platform

In [22]:
file_path = os.path.join(os.getenv('V3IO_HOME_URL')+'/%s'% parquet_file)
file_path

'v3io://users/admin/data/weather_parquet_spark_snappy/'

### Load Spark Dataframe from Parquet

In [23]:
%%time
df = spark.read\
    .format("parquet")\
    .load(file_path)

CPU times: user 871 µs, sys: 1.85 ms, total: 2.72 ms
Wall time: 2.58 s


### Explore Spark Dataframe

In [24]:
%%time
df.head(5)

CPU times: user 4.1 ms, sys: 102 µs, total: 4.21 ms
Wall time: 2.46 s


[Row(usaf='724085', wban=94732, datetime=datetime.datetime(2020, 6, 15, 0, 54), latitude=40.079, longitude=-75.013, elevation=36.0, windAngle=110.0, windSpeed=3.1, temperature=19.4, seaLvlPressure=1028.3, cloudCoverage='CLR', presentWeatherIndicator=None, pastWeatherIndicator=None, precipTime=1.0, precipDepth=0.0, snowDepth=None, stationName='NE PHILADELPHIA AIRPORT', countryOrRegion='US', p_k='724085-94732', year=2020, day=15, version=1.0),
 Row(usaf='275185', wban=99999, datetime=datetime.datetime(2020, 6, 15, 10, 30), latitude=55.592, longitude=37.260999999999996, elevation=209.0, windAngle=100.0, windSpeed=5.0, temperature=20.0, seaLvlPressure=None, cloudCoverage=None, presentWeatherIndicator=None, pastWeatherIndicator=None, precipTime=None, precipDepth=None, snowDepth=None, stationName='VNUKOVO', countryOrRegion='RS', p_k='275185-99999', year=2020, day=15, version=1.0),
 Row(usaf='722040', wban=12838, datetime=datetime.datetime(2020, 6, 24, 5, 53), latitude=28.101, longitude=-80.6

In [5]:
%%time
df.describe()

CPU times: user 61 ms, sys: 12.9 ms, total: 73.9 ms
Wall time: 8min 56s


DataFrame[summary: string, usaf: string, wban: string, latitude: string, longitude: string, elevation: string, windAngle: string, windSpeed: string, temperature: string, seaLvlPressure: string, cloudCoverage: string, presentWeatherIndicator: string, pastWeatherIndicator: string, precipTime: string, precipDepth: string, snowDepth: string, stationName: string, countryOrRegion: string, p_k: string, year: string, day: string, version: string]

In [26]:
%%time
df.count()

CPU times: user 1.44 ms, sys: 1.08 ms, total: 2.52 ms
Wall time: 6.71 s


146451216

### Print Dataframe Schema
NOTE: We will use this schema to create a Hive table 

SQL file : weathercsv-parquet_external.sql

In [25]:
%%time
df.printSchema()

root
 |-- usaf: string (nullable = true)
 |-- wban: integer (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- windAngle: double (nullable = true)
 |-- windSpeed: double (nullable = true)
 |-- temperature: double (nullable = true)
 |-- seaLvlPressure: double (nullable = true)
 |-- cloudCoverage: string (nullable = true)
 |-- presentWeatherIndicator: double (nullable = true)
 |-- pastWeatherIndicator: double (nullable = true)
 |-- precipTime: double (nullable = true)
 |-- precipDepth: double (nullable = true)
 |-- snowDepth: double (nullable = true)
 |-- stationName: string (nullable = true)
 |-- countryOrRegion: string (nullable = true)
 |-- p_k: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- version: double (nullable = true)

CPU times: user 0 ns, sys: 820 µs, total: 820 µs
Wall time: 528 µs


### Prep Dataframe for SparkSQL 

In [27]:
%%time
df.createOrReplaceTempView('weather')

CPU times: user 690 µs, sys: 1 ms, total: 1.69 ms
Wall time: 35.6 ms


In [10]:
%%time
sqlDF = spark.sql("SELECT * FROM weather limit 10")

CPU times: user 1.55 ms, sys: 153 µs, total: 1.71 ms
Wall time: 78.9 ms


In [28]:
%%time
sqlDF = spark.sql("SELECT wban,count(*) FROM weather group by wban")

CPU times: user 349 µs, sys: 859 µs, total: 1.21 ms
Wall time: 9.57 ms


In [30]:
%%time
sqlDF.show()

+-----+--------+
| wban|count(1)|
+-----+--------+
|  471|   29633|
|34061|   27370|
|94950|   13127|
|26623|   13533|
|26425|   15007|
| 3997|   13363|
|23271|     745|
| 4935|   29527|
|22521|   12568|
|  463|   29459|
|  148|   29161|
| 3918|   13467|
|13840|   13735|
| 3749|     732|
|93037|   11664|
|54844|   38216|
|  243|   25490|
|23136|   12324|
|  392|   45309|
| 4929|   39244|
+-----+--------+
only showing top 20 rows

CPU times: user 2.44 ms, sys: 258 µs, total: 2.7 ms
Wall time: 5.83 s


### Get WBAN list
Loading a data set to do Joins next


In [33]:
file_path = os.path.join(os.getenv('V3IO_HOME_URL')+'/%s'% 'data/wbanmasterlist.parquet/wbanmasterlist.parquet')
file_path

'v3io://users/admin/data/wbanmasterlist.parquet/wbanmasterlist.parquet'

In [34]:
%%time
df = spark.read\
    .format("parquet")\
    .load(file_path)

CPU times: user 1.98 ms, sys: 98 µs, total: 2.08 ms
Wall time: 164 ms


In [35]:
df.printSchema()
# Schema notes  for Hive table
# Type long change to integer
# Column with __ in name quote with `
# SQL file: wbanmasterlist_external.sql

root
 |-- REGION: string (nullable = true)
 |-- WBAN_ID: long (nullable = true)
 |-- STATION_NAME: string (nullable = true)
 |-- STATE_PROVINCE: string (nullable = true)
 |-- COUNTY: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- EXTENDED_NAME: string (nullable = true)
 |-- CALL_SIGN: string (nullable = true)
 |-- STATION_TYPE: string (nullable = true)
 |-- DATE_ASSIGNED: string (nullable = true)
 |-- BEGIN_DATE: string (nullable = true)
 |-- COMMENTS: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ELEV_OTHER: string (nullable = true)
 |-- ELEV_GROUND: string (nullable = true)
 |-- ELEV_RUNWAY: string (nullable = true)
 |-- ELEV_BAROMETRIC: string (nullable = true)
 |-- ELEV_STATION: string (nullable = true)
 |-- ELEV_UPPER_AIR: string (nullable = true)
 |-- __null_dask_index__: long (nullable = true)



In [36]:
%%time
df.write.parquet("v3io://users/admin/data/wbanmasterlist_spark_parquet/", mode='overwrite')

CPU times: user 907 µs, sys: 1.18 ms, total: 2.09 ms
Wall time: 1.03 s


In [12]:
spark.stop()