In [1]:
import os 

import numpy as np 
import pandas as pd  
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import nbimporter
import Useful_Visualization_Functions
from pyspark.ml import *
from pyspark.sql import *
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import *
from pyspark.sql.functions import *
# from pyspark.sql.functions import col, explode, array, lit, lead, when, substring
warnings.filterwarnings("ignore")
import pyspark.sql.functions as F
from pyspark.sql import Window

In [2]:
# from pyspark.sql import SparkSession
# from pyspark.sql import Row
# from pyspark.sql.functions import lit, col, column, expr, desc, asc

In [3]:
# ! pip install matplotlib
# ! pip install seaborn
# ! pip install ipynb
# ! pip install nbimporter

# Load and build data

### Build the spark session

In [4]:
# build our own SparkSession
myspark = SparkSession\
    .builder\
    .appName("AWS-Spark")\
    .config("spark.driver.memory", "12g") \
    .config("spark.sql.shuffle.partitions",6)\
    .config("spark.sql.repl.eagereval.enabled",True)\
    .getOrCreate()

22/05/24 17:15:54 WARN Utils: Your hostname, nuno-g14 resolves to a loopback address: 127.0.1.1; using 10.15.55.168 instead (on interface wlp2s0)
22/05/24 17:15:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/24 17:16:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
myspark

### Load the file

In [6]:
# ! head noaa.csv
# noaa_data.show(10)
#noaa_data = myspark.read.load("noaa.csv", format="csv", sep=",", header=True, inferSchema=True)
noaa_csv_directory_path = "/home/saltedcookie/Desktop/projetoABD/data"

noaa_csv_pathname = "noaa.csv"

#df_raw = myspark.read.load(noaa_csv_directory_path, 
#                         format="csv", 
#                         header=True, 
#                         pathGlobFilter="*.csv",
#                         recursiveFileLookup=True,
#                         sep=",",
#                         header=True,
#                         inferSchema=True)

df_raw = myspark.read.load(noaa_csv_pathname,
                              format="csv",
                              sep=",",
                              header=True,
                              inferSchema=True)

df_raw.count()

                                                                                

25933550

In [7]:
df_raw.printSchema()

root
 |-- STATION: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- TEMP_ATTRIBUTES: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- DEWP_ATTRIBUTES: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- SLP_ATTRIBUTES: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- STP_ATTRIBUTES: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- VISIB_ATTRIBUTES: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- WDSP_ATTRIBUTES: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: double (nullable = true)
 |-- MAX_ATTRIBUTES: string (nullable = true)
 |-- MIN: double (nullable = true)
 |-- MIN_ATTRIBUTES: string (nullable = true)
 |-- PRCP: double (nullable = t

### Create columns "ItRained" and "NextDay"

In [8]:
# "ItRained" is a column generated by using a part of the value of "FRSHTT"
# "NextDay" is a column generated by using the value of the following row of "ItRained"

df_raw = df_raw.withColumn("ItRained", when((F.length(df_raw["FRSHTT"]) <= 4), lit(0)) \
                    .when(F.length(df_raw["FRSHTT"]) == 5, lit(1)) \
                    .otherwise(lit(substring('FRSHTT', 2, 1).cast(IntegerType()))) \
)

#noaa_data.show(10)
df_raw = df_raw.withColumn( "NextDay", lead("ItRained", default=2).over(Window.orderBy("STATION")).alias("Next") )

# Data cleansing

### Drop unnecessary columns

In [9]:
columns = df_raw.columns

#for cl in columns:
#    noaa_data.describe(cl).show()
#
#for cl in columns:
#    noaa_data.select(cl).distinct().show(10)


In [10]:
cols_to_drop = ["STATION", "DATE", "LATITUDE", "LONGITUDE", "ELEVATION", "NAME", "TEMP_ATTRIBUTES", "DEWP_ATTRIBUTES",
               "SLP_ATTRIBUTES", "STP_ATTRIBUTES", "VISIB_ATTRIBUTES", "WDSP_ATTRIBUTES", "MAX_ATTRIBUTES",
               "MIN_ATTRIBUTES", "PRCP_ATTRIBUTES", "GUST", "STP"]

cols_interest = [x for x in columns if x not in cols_to_drop]
df_interest_cols = df_raw.select(cols_interest)

#### Drop nulls

In [11]:
df_interest_cols.printSchema()
df_clean = df_interest_cols.dropna()
[df_interest_cols.count(), df_clean.count()]

columns = df_clean.columns

#df_clean.select("SLP").summary("10%", "20%", "30%", "40%", "50%", "60%", "70%", "80%", "90%").show()
#df_clean.select("STP").summary("10%", "20%", "30%", "40%", "50%", "60%", "70%", "80%", "90%").show()

#for cl in df_clean.columns: 
#    print(cl)
#    df_clean.select(cl).summary().show()
#df_clean.select("ItRained").summary().show()
#
#for cl in columns:
#    df_clean.describe(cl).show()
#
#
#for cl in columns:
#    df_clean.select(cl).distinct().show(10)



root
 |-- TEMP: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- MAX: double (nullable = true)
 |-- MIN: double (nullable = true)
 |-- PRCP: double (nullable = true)
 |-- SNDP: double (nullable = true)
 |-- FRSHTT: integer (nullable = true)
 |-- ItRained: integer (nullable = true)
 |-- NextDay: integer (nullable = true)



22/05/24 17:16:41 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/24 17:16:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

#### Drop error values

In [12]:
#print(f"Antes: {df_clean.count()}")

df_clean = df_clean.filter(df_clean.TEMP > -10)
df_clean = df_clean.filter(df_clean.DEWP < 100)
df_clean = df_clean.filter(df_clean.SLP < 4000)
#df_clean = df_clean.filter(df_clean.STP < 100)
df_clean = df_clean.filter(df_clean.VISIB < 100)
df_clean = df_clean.filter(df_clean.WDSP < 100)
df_clean = df_clean.filter(df_clean.MXSPD < 100)
# df_clean = df_clean.filter(df_clean.GUST < 100)
df_clean = df_clean.filter(df_clean.MAX < 100)
df_clean = df_clean.filter(df_clean.MIN < 100)
df_clean = df_clean.filter(df_clean.PRCP < 100)
df_clean = df_clean.filter(df_clean.SNDP < 100)

#print(f"Depois: {df_clean.count()}")


#temp_median = df_clean_pd['TEMP'].quantile(0.50)
#df_clean_pd['TEMP'] = np.where(df_clean_pd['TEMP'] < -10, temp_median, df_clean_pd['TEMP'])
#plt.boxplot(df_clean_pd["TEMP"])
#plt.show()
#
#dewp_median = df_clean_pd['DEWP'].quantile(0.50)
#df_clean_pd['DEWP'] = np.where(df_clean_pd['DEWP'] > 100, dewp_median, df_clean_pd['DEWP'])
#plt.boxplot(df_clean_pd["DEWP"])
#plt.show()


# Save parquets

In [13]:
cleanfilename = "clean-noaa"
df_clean.write.mode("overwrite").parquet(cleanfilename)

22/05/24 17:17:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/24 17:17:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/24 17:17:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                