In [87]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()


In [88]:
df = spark.read.csv('train.csv', inferSchema=True, header=True)

In [89]:
df.printSchema()

root
 |-- DATE (MM/DD): string (nullable = true)
 |-- MST: timestamp (nullable = true)
 |-- Global CMP22 (vent/cor) [W/m^2]: double (nullable = true)
 |-- Direct sNIP [W/m^2]: double (nullable = true)
 |-- Azimuth Angle [degrees]: double (nullable = true)
 |-- Tower Dry Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Wet Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Dew Point Temp [deg C]: double (nullable = true)
 |-- Tower RH [%]: double (nullable = true)
 |-- Total Cloud Cover [%]: double (nullable = true)
 |-- Peak Wind Speed @ 6ft [m/s]: double (nullable = true)
 |-- Avg Wind Direction @ 6ft [deg from N]: double (nullable = true)
 |-- Station Pressure [mBar]: double (nullable = true)
 |-- Precipitation (Accumulated) [mm]: double (nullable = true)
 |-- Snow Depth [cm]: double (nullable = true)
 |-- Moisture: double (nullable = true)
 |-- Albedo (CMP11): double (nullable = true)



In [90]:
df.columns

['DATE (MM/DD)',
 'MST',
 'Global CMP22 (vent/cor) [W/m^2]',
 'Direct sNIP [W/m^2]',
 'Azimuth Angle [degrees]',
 'Tower Dry Bulb Temp [deg C]',
 'Tower Wet Bulb Temp [deg C]',
 'Tower Dew Point Temp [deg C]',
 'Tower RH [%]',
 'Total Cloud Cover [%]',
 'Peak Wind Speed @ 6ft [m/s]',
 'Avg Wind Direction @ 6ft [deg from N]',
 'Station Pressure [mBar]',
 'Precipitation (Accumulated) [mm]',
 'Snow Depth [cm]',
 'Moisture',
 'Albedo (CMP11)']

In [91]:
n=df.select([
 'MST',
 'Global CMP22 (vent/cor) [W/m^2]',
 'Direct sNIP [W/m^2]',
 'Azimuth Angle [degrees]',
 'Tower Dry Bulb Temp [deg C]',
 'Tower Wet Bulb Temp [deg C]',
 'Tower Dew Point Temp [deg C]',
 'Tower RH [%]',
 'Total Cloud Cover [%]',
 'Peak Wind Speed @ 6ft [m/s]',
 'Avg Wind Direction @ 6ft [deg from N]',
 'Station Pressure [mBar]',
 'Precipitation (Accumulated) [mm]',
 'Snow Depth [cm]',
 'Moisture',
 'Albedo (CMP11)'])

In [92]:
n.columns


['MST',
 'Global CMP22 (vent/cor) [W/m^2]',
 'Direct sNIP [W/m^2]',
 'Azimuth Angle [degrees]',
 'Tower Dry Bulb Temp [deg C]',
 'Tower Wet Bulb Temp [deg C]',
 'Tower Dew Point Temp [deg C]',
 'Tower RH [%]',
 'Total Cloud Cover [%]',
 'Peak Wind Speed @ 6ft [m/s]',
 'Avg Wind Direction @ 6ft [deg from N]',
 'Station Pressure [mBar]',
 'Precipitation (Accumulated) [mm]',
 'Snow Depth [cm]',
 'Moisture',
 'Albedo (CMP11)']

In [93]:
new=n.na.drop()


In [94]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

In [95]:
new2= new.withColumn("MST",new.MST.cast(DoubleType()))
new2.printSchema()

root
 |-- MST: double (nullable = true)
 |-- Global CMP22 (vent/cor) [W/m^2]: double (nullable = true)
 |-- Direct sNIP [W/m^2]: double (nullable = true)
 |-- Azimuth Angle [degrees]: double (nullable = true)
 |-- Tower Dry Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Wet Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Dew Point Temp [deg C]: double (nullable = true)
 |-- Tower RH [%]: double (nullable = true)
 |-- Total Cloud Cover [%]: double (nullable = true)
 |-- Peak Wind Speed @ 6ft [m/s]: double (nullable = true)
 |-- Avg Wind Direction @ 6ft [deg from N]: double (nullable = true)
 |-- Station Pressure [mBar]: double (nullable = true)
 |-- Precipitation (Accumulated) [mm]: double (nullable = true)
 |-- Snow Depth [cm]: double (nullable = true)
 |-- Moisture: double (nullable = true)
 |-- Albedo (CMP11): double (nullable = true)



In [96]:
from pyspark.ml.feature import VectorAssembler

In [97]:
assembler = VectorAssembler(inputCols=['MST',
 'Global CMP22 (vent/cor) [W/m^2]',
 'Direct sNIP [W/m^2]',
 'Azimuth Angle [degrees]',
 'Tower Dry Bulb Temp [deg C]',
 'Tower Wet Bulb Temp [deg C]',
 'Tower Dew Point Temp [deg C]',
 'Tower RH [%]',
 
 'Peak Wind Speed @ 6ft [m/s]',
 'Avg Wind Direction @ 6ft [deg from N]',
 'Station Pressure [mBar]',
 'Precipitation (Accumulated) [mm]',
 'Snow Depth [cm]',
 'Moisture',
 'Albedo (CMP11)'], outputCol='features')

In [98]:
output = assembler.transform(new2)

In [99]:
output.printSchema()

root
 |-- MST: double (nullable = true)
 |-- Global CMP22 (vent/cor) [W/m^2]: double (nullable = true)
 |-- Direct sNIP [W/m^2]: double (nullable = true)
 |-- Azimuth Angle [degrees]: double (nullable = true)
 |-- Tower Dry Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Wet Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Dew Point Temp [deg C]: double (nullable = true)
 |-- Tower RH [%]: double (nullable = true)
 |-- Total Cloud Cover [%]: double (nullable = true)
 |-- Peak Wind Speed @ 6ft [m/s]: double (nullable = true)
 |-- Avg Wind Direction @ 6ft [deg from N]: double (nullable = true)
 |-- Station Pressure [mBar]: double (nullable = true)
 |-- Precipitation (Accumulated) [mm]: double (nullable = true)
 |-- Snow Depth [cm]: double (nullable = true)
 |-- Moisture: double (nullable = true)
 |-- Albedo (CMP11): double (nullable = true)
 |-- features: vector (nullable = true)



In [100]:
final = output.select('features', 'Total Cloud Cover [%]')

In [101]:
final.show()

+--------------------+---------------------+
|            features|Total Cloud Cover [%]|
+--------------------+---------------------+
|[1.6796826E9,-0.9...|                  0.0|
|[1.67968266E9,-0....|                  0.0|
|[1.67968272E9,-0....|                  0.0|
|[1.67968278E9,-0....|                  0.0|
|[1.67968284E9,-0....|                  0.0|
|[1.6796829E9,-0.9...|                  0.0|
|[1.67968296E9,-0....|                  0.0|
|[1.67968302E9,-0....|                  0.0|
|[1.67968308E9,-0....|                  0.0|
|[1.67968314E9,-0....|                  0.0|
|[1.6796832E9,-0.9...|                  0.0|
|[1.67968326E9,-0....|                  0.0|
|[1.67968332E9,-0....|                  0.0|
|[1.67968338E9,-0....|                  0.0|
|[1.67968344E9,-0....|                  0.0|
|[1.6796835E9,-0.9...|                  0.0|
|[1.67968356E9,-0....|                  0.0|
|[1.67968362E9,-0....|                  0.0|
|[1.67968368E9,-0....|                  0.0|
|[1.679683

In [102]:
train_data, test_data = final.randomSplit([0.7,0.3])

In [103]:
from pyspark.ml.classification import LogisticRegression

In [104]:
lr = LogisticRegression(featuresCol='features', labelCol='Total Cloud Cover [%]')

In [105]:
fit_model = lr.fit(train_data)

In [106]:
results = fit_model.evaluate(test_data)

In [107]:
results.predictions.show(10000)

+--------------------+---------------------+--------------------+--------------------+----------+
|            features|Total Cloud Cover [%]|       rawPrediction|         probability|prediction|
+--------------------+---------------------+--------------------+--------------------+----------+
|[1.6796826E9,-1.1...|                  0.0|[8.33850862007602...|[0.92165647984373...|       0.0|
|[1.6796826E9,-1.0...|                  0.0|[8.85397437869323...|[0.94959172929295...|       0.0|
|[1.6796826E9,-0.9...|                  0.0|[9.02432495732500...|[0.95870604463574...|       0.0|
|[1.6796826E9,-0.9...|                  0.0|[8.39613224721688...|[0.83584669702461...|       0.0|
|[1.6796826E9,-0.9...|                  0.0|[10.2544915704129...|[0.98902088472708...|       0.0|
|[1.6796826E9,-0.9...|                  0.0|[7.96692474604788...|[0.84205066355907...|       0.0|
|[1.6796826E9,-0.9...|                  0.0|[8.98800936297266...|[0.95743044926695...|       0.0|
|[1.6796826E9,-0.8..

In [108]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [109]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Total Cloud Cover [%]')

In [110]:
auc = my_eval.evaluate(results.predictions)

In [111]:
auc

0.9768411126023963

In [112]:
final_lr_model = lr.fit(final)

In [196]:
new_cus = spark.read.csv( "final test.csv",inferSchema=True, header=True)
new_cus.columns
new_cus.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- scenario_set: integer (nullable = true)
 |-- DATE (MM/DD): string (nullable = true)
 |-- MST: timestamp (nullable = true)
 |-- Global CMP22 (vent/cor) [W/m^2]: double (nullable = true)
 |-- Direct sNIP [W/m^2]: double (nullable = true)
 |-- Azimuth Angle [degrees]: double (nullable = true)
 |-- Tower Dry Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Wet Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Dew Point Temp [deg C]: double (nullable = true)
 |-- Tower RH [%]: double (nullable = true)
 |-- Total Cloud Cover [%]: double (nullable = true)
 |-- Peak Wind Speed @ 6ft [m/s]: double (nullable = true)
 |-- Avg Wind Direction @ 6ft [deg from N]: double (nullable = true)
 |-- Station Pressure [mBar]: double (nullable = true)
 |-- Precipitation (Accumulated) [mm]: double (nullable = true)
 |-- Snow Depth [cm]: double (nullable = true)
 |-- Moisture: double (nullable = true)
 |-- Albedo (CMP11): double (nullable = true)



In [213]:
n2=new_cus.select(['scenario_set',
 'MST',
 'Global CMP22 (vent/cor) [W/m^2]',
 'Direct sNIP [W/m^2]',
 'Azimuth Angle [degrees]',
 'Tower Dry Bulb Temp [deg C]',
 'Tower Wet Bulb Temp [deg C]',
 'Tower Dew Point Temp [deg C]',
 'Tower RH [%]',
 
 'Peak Wind Speed @ 6ft [m/s]',
 'Avg Wind Direction @ 6ft [deg from N]',
 'Station Pressure [mBar]',
 'Precipitation (Accumulated) [mm]',
 'Snow Depth [cm]',
 'Moisture',
 'Albedo (CMP11)'])

In [214]:
nd=n2.na.drop()

In [215]:
new3= nd.withColumn("MST",nd.MST.cast(DoubleType()))
new3.printSchema()

root
 |-- scenario_set: integer (nullable = true)
 |-- MST: double (nullable = true)
 |-- Global CMP22 (vent/cor) [W/m^2]: double (nullable = true)
 |-- Direct sNIP [W/m^2]: double (nullable = true)
 |-- Azimuth Angle [degrees]: double (nullable = true)
 |-- Tower Dry Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Wet Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Dew Point Temp [deg C]: double (nullable = true)
 |-- Tower RH [%]: double (nullable = true)
 |-- Peak Wind Speed @ 6ft [m/s]: double (nullable = true)
 |-- Avg Wind Direction @ 6ft [deg from N]: double (nullable = true)
 |-- Station Pressure [mBar]: double (nullable = true)
 |-- Precipitation (Accumulated) [mm]: double (nullable = true)
 |-- Snow Depth [cm]: double (nullable = true)
 |-- Moisture: double (nullable = true)
 |-- Albedo (CMP11): double (nullable = true)



In [216]:
new_data = assembler.transform(new3)

In [217]:
new_data.printSchema()

root
 |-- scenario_set: integer (nullable = true)
 |-- MST: double (nullable = true)
 |-- Global CMP22 (vent/cor) [W/m^2]: double (nullable = true)
 |-- Direct sNIP [W/m^2]: double (nullable = true)
 |-- Azimuth Angle [degrees]: double (nullable = true)
 |-- Tower Dry Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Wet Bulb Temp [deg C]: double (nullable = true)
 |-- Tower Dew Point Temp [deg C]: double (nullable = true)
 |-- Tower RH [%]: double (nullable = true)
 |-- Peak Wind Speed @ 6ft [m/s]: double (nullable = true)
 |-- Avg Wind Direction @ 6ft [deg from N]: double (nullable = true)
 |-- Station Pressure [mBar]: double (nullable = true)
 |-- Precipitation (Accumulated) [mm]: double (nullable = true)
 |-- Snow Depth [cm]: double (nullable = true)
 |-- Moisture: double (nullable = true)
 |-- Albedo (CMP11): double (nullable = true)
 |-- features: vector (nullable = true)



In [218]:
final_results = final_lr_model.transform(new_data)

In [223]:
f=final_results.select('scenario_set','MST','features', 'prediction')

In [224]:
myeval2=BinaryClassificationEvaluator(rawPredictionCol='MST',labelCol='prediction')

In [225]:
auc2=myeval2.evaluate(f)

In [226]:
auc2

0.993826732834169

In [227]:
f2=f.withColumn("30min",f["prediction"])

In [228]:
f3=f2.withColumn("60min",f2["prediction"])

In [229]:
f4=f3.withColumn("90min",f3["prediction"])

In [230]:
f5=f4.withColumn("120min",f4["prediction"])

In [231]:
f5.show(10000)

+------------+------------+--------------------+----------+-----+-----+-----+------+
|scenario_set|         MST|            features|prediction|30min|60min|90min|120min|
+------------+------------+--------------------+----------+-----+-----+-----+------+
|           0| 1.6797108E9|[1.6797108E9,54.3...|       0.0|  0.0|  0.0|  0.0|   0.0|
|           0|1.67971086E9|[1.67971086E9,56....|       0.0|  0.0|  0.0|  0.0|   0.0|
|           0|1.67971092E9|[1.67971092E9,58....|       0.0|  0.0|  0.0|  0.0|   0.0|
|           0|1.67971098E9|[1.67971098E9,61....|       0.0|  0.0|  0.0|  0.0|   0.0|
|           0|1.67971104E9|[1.67971104E9,63....|       1.0|  1.0|  1.0|  1.0|   1.0|
|           0| 1.6797111E9|[1.6797111E9,66.2...|       1.0|  1.0|  1.0|  1.0|   1.0|
|           0|1.67971116E9|[1.67971116E9,68....|       1.0|  1.0|  1.0|  1.0|   1.0|
|           0|1.67971122E9|[1.67971122E9,71....|       1.0|  1.0|  1.0|  1.0|   1.0|
|           0|1.67971128E9|[1.67971128E9,74....|       1.0|  1.0|