In [445]:
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql.functions import isnull, col

Init Spark Session

In [212]:
spark = SparkSession.builder\
    .master("local[*]")\
    .appName('METAR Data')\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/13 10:41:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Read Data

In [431]:
schema = types.StructType([
    types.StructField('station', types.StringType(), True), 
    types.StructField('valid', types.TimestampType(), True), 
    types.StructField('lon', types.DoubleType(), True), 
    types.StructField('lat', types.DoubleType(), True), 
    types.StructField('elevation', types.DoubleType(), True), 
    types.StructField('tmpf', types.DoubleType(), True), 
    types.StructField('dwpf', types.DoubleType(), True), 
    types.StructField('relh', types.DoubleType(), True), 
    types.StructField('drct', types.DoubleType(), True), 
    types.StructField('sknt', types.DoubleType(), True), 
    types.StructField('p01i', types.DoubleType(), True), 
    types.StructField('alti', types.DoubleType(), True), 
    types.StructField('mslp', types.StringType(), True), 
    types.StructField('vsby', types.DoubleType(), True), 
    types.StructField('gust', types.IntegerType(), True), 
    types.StructField('skyc1', types.StringType(), True), 
    types.StructField('skyc2', types.StringType(), True), 
    types.StructField('skyc3', types.StringType(), True), 
    types.StructField('skyc4', types.StringType(), True), 
    types.StructField('skyl1', types.DoubleType(), True), 
    types.StructField('skyl2', types.DoubleType(), True), 
    types.StructField('skyl3', types.DoubleType(), True), 
    types.StructField('skyl4', types.DoubleType(), True), 
    types.StructField('wxcodes', types.StringType(), True), 
    types.StructField('ice_accretion_1hr', types.DoubleType(), True), 
    types.StructField('ice_accretion_3hr', types.DoubleType(), True), 
    types.StructField('ice_accretion_6hr', types.DoubleType(), True), 
    types.StructField('peak_wind_gust', types.DoubleType(), True), 
    types.StructField('peak_wind_drct', types.DoubleType(), True), 
    types.StructField('peak_wind_time', types.DoubleType(), True), 
    types.StructField('feel', types.DoubleType(), True), 
    types.StructField('metar', types.StringType(), True), 
    types.StructField('snowdepth', types.StringType(), True)
    ])


In [533]:
url_list = list()
for year in [*range(2000,2022)]:
    url_list.append(f'data/raw/EPWA/{year}/EPWA_01_01_{year}_31_12_{year}.csv')

url_list


['data/raw/EPWA/2000/EPWA_01_01_2000_31_12_2000.csv',
 'data/raw/EPWA/2001/EPWA_01_01_2001_31_12_2001.csv',
 'data/raw/EPWA/2002/EPWA_01_01_2002_31_12_2002.csv',
 'data/raw/EPWA/2003/EPWA_01_01_2003_31_12_2003.csv',
 'data/raw/EPWA/2004/EPWA_01_01_2004_31_12_2004.csv',
 'data/raw/EPWA/2005/EPWA_01_01_2005_31_12_2005.csv',
 'data/raw/EPWA/2006/EPWA_01_01_2006_31_12_2006.csv',
 'data/raw/EPWA/2007/EPWA_01_01_2007_31_12_2007.csv',
 'data/raw/EPWA/2008/EPWA_01_01_2008_31_12_2008.csv',
 'data/raw/EPWA/2009/EPWA_01_01_2009_31_12_2009.csv',
 'data/raw/EPWA/2010/EPWA_01_01_2010_31_12_2010.csv',
 'data/raw/EPWA/2011/EPWA_01_01_2011_31_12_2011.csv',
 'data/raw/EPWA/2012/EPWA_01_01_2012_31_12_2012.csv',
 'data/raw/EPWA/2013/EPWA_01_01_2013_31_12_2013.csv',
 'data/raw/EPWA/2014/EPWA_01_01_2014_31_12_2014.csv',
 'data/raw/EPWA/2015/EPWA_01_01_2015_31_12_2015.csv',
 'data/raw/EPWA/2016/EPWA_01_01_2016_31_12_2016.csv',
 'data/raw/EPWA/2017/EPWA_01_01_2017_31_12_2017.csv',
 'data/raw/EPWA/2018/EPWA_01

In [534]:
#station_df = spark.read\
#    .option("header","true")\
#    .schema(schema)\
#    .csv(url_list)

In [540]:
station_df = spark.read\
    .option("header","true")\
    .schema(schema)\
    .csv('data/EPWA.csv')

In [541]:
station_df.count()

620227

Check Schema

In [542]:
station_df.printSchema()

root
 |-- station: string (nullable = true)
 |-- valid: timestamp (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- tmpf: double (nullable = true)
 |-- dwpf: double (nullable = true)
 |-- relh: double (nullable = true)
 |-- drct: double (nullable = true)
 |-- sknt: double (nullable = true)
 |-- p01i: double (nullable = true)
 |-- alti: double (nullable = true)
 |-- mslp: string (nullable = true)
 |-- vsby: double (nullable = true)
 |-- gust: integer (nullable = true)
 |-- skyc1: string (nullable = true)
 |-- skyc2: string (nullable = true)
 |-- skyc3: string (nullable = true)
 |-- skyc4: string (nullable = true)
 |-- skyl1: double (nullable = true)
 |-- skyl2: double (nullable = true)
 |-- skyl3: double (nullable = true)
 |-- skyl4: double (nullable = true)
 |-- wxcodes: string (nullable = true)
 |-- ice_accretion_1hr: double (nullable = true)
 |-- ice_accretion_3hr: double (nullable = true)
 |-- ice_ac

In [636]:
station_df.show(5)

+-------+-------------------+-------+-------+---------+-----+-----+-----+-----+----+----+----+-----+--------------------+---------+
|station|              valid|    lon|    lat|elevation| tmpf| dwpf| relh| drct|sknt|alti|vsby| feel|               metar|snowdepth|
+-------+-------------------+-------+-------+---------+-----+-----+-----+-----+----+----+----+-----+--------------------+---------+
|   EPWA|1928-01-02 06:00:00|20.9611|52.1628|    107.0| 14.0|12.92|95.36|180.0| 2.0|null|1.25| 14.0|EPWA 020600Z AUTO...|     null|
|   EPWA|1928-01-03 06:00:00|20.9611|52.1628|    107.0|35.96| 32.0|85.37|160.0| 2.0|null|1.25|35.96|EPWA 030600Z AUTO...|     null|
|   EPWA|1928-01-06 06:00:00|20.9611|52.1628|    107.0|33.98|33.08|96.46|270.0|13.0|null|1.25|24.11|EPWA 060600Z AUTO...|     null|
|   EPWA|1928-01-09 06:00:00|20.9611|52.1628|    107.0|33.98|30.02|85.24|270.0|13.0|null| 1.0|24.11|EPWA 090600Z AUTO...|     null|
|   EPWA|1928-01-10 06:00:00|20.9611|52.1628|    107.0|33.98|33.08|96.46|250

Replace all 'null' to None

In [614]:
station_df = station_df.na.replace('null',None)

Count rows with Null

In [637]:

column = 'snowdepth'
station_df.filter(col(column).isNull()).count()
#to replace: drct (6026), skyc1 (54828)



620227

Replace Null

In [529]:
station_df.select(col(column)).fillna('-',subset=[column])

DataFrame[skyc1: string]

Drop rows with Null

In [557]:
station_df.na.drop(subset=[column])

DataFrame[station: string, valid: timestamp, lon: double, lat: double, elevation: double, tmpf: double, dwpf: double, relh: double, drct: double, sknt: double, p01i: double, alti: double, mslp: string, vsby: double, gust: int, skyc1: string, skyc2: string, skyc3: string, skyc4: string, skyl1: double, skyl2: double, skyl3: double, skyl4: double, wxcodes: string, ice_accretion_1hr: double, ice_accretion_3hr: double, ice_accretion_6hr: double, peak_wind_gust: double, peak_wind_drct: double, peak_wind_time: double, feel: double, metar: string, snowdepth: string]

In [567]:
station_df.filter(col(column).isNull()).count()

6913

Drop column

In [638]:
station_df = station_df.drop(column)

Replace None

In [612]:

unique_val = station_df.select(column).distinct()
unique_val.show()
#to drop

+-----------+
|    wxcodes|
+-----------+
|       null|
|         BR|
|      -SHRA|
|        -SN|
|        -RA|
|     -RA BR|
|   -RASN BR|
|   -SHRA BR|
|-SN BLSN BR|
|       MIFG|
|     SHSNGS|
|   +SHRA BR|
|         RA|
|      -SHSG|
|     -SG BR|
|     -PL BR|
|       SHSN|
|         FG|
|      +SHSN|
| -SHRA VCTS|
+-----------+
only showing top 20 rows



In [639]:
station_df.count()

620227

Check

In [610]:
station_df.select(column)

DataFrame[wxcodes: string]

Dropna 

In [645]:
df = station_df.dropna()
df.show(100)

+-------+-------------------+-------+-------+---------+----+----+-----+-----+----+-----+----+-----+--------------------+
|station|              valid|    lon|    lat|elevation|tmpf|dwpf| relh| drct|sknt| alti|vsby| feel|               metar|
+-------+-------------------+-------+-------+---------+----+----+-----+-----+----+-----+----+-----+--------------------+
|   EPWA|1973-01-01 00:30:00|20.9611|52.1628|    107.0|21.2|12.2|67.73|140.0| 8.0|30.65| 6.0|10.81|EPWA 010030Z AUTO...|
|   EPWA|1973-01-01 01:00:00|20.9611|52.1628|    107.0|19.4|12.2|73.13|140.0| 8.0|30.62| 6.0| 8.59|EPWA 010100Z AUTO...|
|   EPWA|1973-01-01 01:30:00|20.9611|52.1628|    107.0|19.4|12.2|73.13|140.0| 6.0|30.62| 6.0|10.36|EPWA 010130Z AUTO...|
|   EPWA|1973-01-01 02:00:00|20.9611|52.1628|    107.0|19.4|10.4|67.51|140.0| 6.0|30.62| 6.0|10.36|EPWA 010200Z AUTO...|
|   EPWA|1973-01-01 03:00:00|20.9611|52.1628|    107.0|19.4|10.4|67.51|160.0| 6.0|30.62| 6.0|10.36|EPWA 010300Z AUTO...|
|   EPWA|1973-01-01 03:30:00|20.

Regrasja liniowa

In [647]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [678]:
input_cols = ['tmpf','dwpf','relh','drct','alti','vsby']

In [679]:
assembler = VectorAssembler(inputCols=input_cols, outputCol='features')
data = assembler.transform(df)
data.show(10)

+-------+-------------------+-------+-------+---------+----+----+-----+-----+----+-----+----+-----+--------------------+--------------------+
|station|              valid|    lon|    lat|elevation|tmpf|dwpf| relh| drct|sknt| alti|vsby| feel|               metar|            features|
+-------+-------------------+-------+-------+---------+----+----+-----+-----+----+-----+----+-----+--------------------+--------------------+
|   EPWA|1973-01-01 00:30:00|20.9611|52.1628|    107.0|21.2|12.2|67.73|140.0| 8.0|30.65| 6.0|10.81|EPWA 010030Z AUTO...|[21.2,12.2,67.73,...|
|   EPWA|1973-01-01 01:00:00|20.9611|52.1628|    107.0|19.4|12.2|73.13|140.0| 8.0|30.62| 6.0| 8.59|EPWA 010100Z AUTO...|[19.4,12.2,73.13,...|
|   EPWA|1973-01-01 01:30:00|20.9611|52.1628|    107.0|19.4|12.2|73.13|140.0| 6.0|30.62| 6.0|10.36|EPWA 010130Z AUTO...|[19.4,12.2,73.13,...|
|   EPWA|1973-01-01 02:00:00|20.9611|52.1628|    107.0|19.4|10.4|67.51|140.0| 6.0|30.62| 6.0|10.36|EPWA 010200Z AUTO...|[19.4,10.4,67.51,...|
|   EP

In [680]:
label_col = 'sknt'
data.select('features',label_col).show(10)

+--------------------+----+
|            features|sknt|
+--------------------+----+
|[21.2,12.2,67.73,...| 8.0|
|[19.4,12.2,73.13,...| 8.0|
|[19.4,12.2,73.13,...| 6.0|
|[19.4,10.4,67.51,...| 6.0|
|[19.4,10.4,67.51,...| 6.0|
|[19.4,10.4,67.51,...| 4.0|
|[17.6,10.4,72.93,...| 4.0|
|[17.6,10.4,72.93,...| 6.0|
|[19.4,8.6,62.28,1...| 6.0|
|[19.4,8.6,62.28,1...| 4.0|
+--------------------+----+
only showing top 10 rows



In [681]:
train_data, test_data = data.randomSplit([0.7,0.3],seed=123)

In [682]:
lin_regression = LinearRegression(featuresCol="features",labelCol=label_col)
model = lin_regression.fit(train_data)

23/03/13 18:53:33 WARN Instrumentation: [b7ac114c] regParam is zero, which might cause numerical instability and overfitting.


                                                                                

In [683]:
predictions = model.transform(test_data)

In [684]:
predictions.select("features", label_col, "prediction").show(500)

[Stage 957:>                                                        (0 + 1) / 1]

+--------------------+----+------------------+
|            features|sknt|        prediction|
+--------------------+----+------------------+
|[19.4,12.2,73.13,...| 6.0| 7.391044952949713|
|[17.6,10.4,72.93,...| 4.0| 7.815996898599991|
|[19.4,8.6,62.28,1...| 4.0| 8.418811143460516|
|[21.2,12.2,67.73,...| 4.0| 7.683224885965359|
|[21.2,12.2,67.73,...| 4.0| 7.683224885965359|
|[35.6,19.4,51.33,...| 6.0| 8.113908090481246|
|[30.2,19.4,63.75,...| 6.0| 7.495871385887014|
|[24.8,15.8,68.18,...| 6.0|  7.74385228139279|
|[24.8,15.8,68.18,...| 6.0| 7.859180817631199|
|[24.8,14.0,63.03,...| 2.0| 8.244085086182153|
|[21.2,10.4,62.53,...| 2.0| 8.619488447052447|
|[23.0,14.0,67.96,...| 2.0| 7.916547333955549|
|[21.2,12.2,67.73,...| 4.0| 8.059752280295328|
|[19.4,12.2,73.13,...| 4.0| 7.882900883518104|
|[35.6,21.2,55.41,...| 4.0| 8.387634412425982|
|[35.6,19.4,51.33,...| 4.0| 8.744795736682264|
|[32.0,24.8,74.44,...| 6.0| 6.938997585118116|
|[28.4,19.4,68.61,...| 6.0| 8.222615728924069|
|[24.8,17.6,7

                                                                                

R-kwadrat (R²): \
Metryka ta mierzy, jak wiele wariancji w zmiennej zależnej jest wyjaśniane przez nasz model. 
R² przyjmuje wartości od 0 do 1, gdzie 0 oznacza, że model nie wyjaśnia żadnej zmienności, a 1 oznacza, że model wyjaśnia całą zmienność.
Wyższe wartości R² oznaczają lepszą dopasowanie modelu.

In [686]:
from pyspark.ml.evaluation import RegressionEvaluator

In [688]:
evaluator = RegressionEvaluator(labelCol=label_col,
                                predictionCol="prediction",
                                metricName='r2')

In [690]:
r2 = evaluator.evaluate(predictions)

                                                                                

In [691]:
'R-squared on test data = %g' % r2

'R-squared on test data = 0.167706'