In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "weather", table_name = "crawler_weather_small", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "weather", table_name = "crawler_weather_small", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("col0", "long", "col0", "long"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string"), ("col4", "string", "col4", "string"), ("col5", "string", "col5", "string"), ("col6", "string", "col6", "string"), ("col7", "string", "col7", "string"), ("col8", "string", "col8", "string"), ("col9", "string", "col9", "string"), ("col10", "string", "col10", "string"), ("col11", "string", "col11", "string"), ("col12", "string", "col12", "string"), ("col13", "string", "col13", "string"), ("col14", "string", "col14", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "long", "col0", "long"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string"), ("col3", "string", "col3", "string"), ("col4", "string", "col4", "string"), ("col5", "string", "col5", "string"), ("col6", "string", "col6", "string"), ("col7", "string", "col7", "string"), ("col8", "string", "col8", "string"), ("col9", "string", "col9", "string"), ("col10", "string", "col10", "string"), ("col11", "string", "col11", "string"), ("col12", "string", "col12", "string"), ("col13", "string", "col13", "string"), ("col14", "string", "col14", "string")], transformation_ctx = "applymapping1")

#------------------------------
from awsglue.dynamicframe import DynamicFrame
import pyspark.sql.functions as F

#Convert DynamicFrame to Sprk DataFrame
df = applymapping1.toDF()

new_cols = ['id', 'Station', 'WBAN', 'Year', 'Month', 'Day', 'Hour', 'Temperature', 'Dew_Point', 'Pressure', 'Wind_Direction', 'Wind_Speed', 'Sky_Condition', 'Precipitation_1H', 'Precipitation_6H']
for i, col in enumerate(df.columns):
    df = df.withColumnRenamed(col, new_cols[i])

#Drop the row containing column names
print('size before filter:', df.count())
header_removed_DF = df.filter(F.col(new_cols[1]) != new_cols[1])
print('size after filter:', header_removed_DF.count())

#replace missing values with None
for col in ['Temperature', 'Dew_Point','Wind_Direction', 'Wind_Speed']:
    header_removed_DF = header_removed_DF.withColumn(col,F.when(F.col(col).isin([9999, -9999, 999, -999]),None).otherwise(F.col(col)))
#for pressure, only [-9999, -999] indicate missing values
header_removed_DF = header_removed_DF.withColumn('Pressure',F.when(F.col('Pressure').isin([-9999, -999]),None).otherwise(F.col('Pressure')))

#Divide some measures by 10
for col in ['Temperature', 'Dew_Point','Wind_Direction', 'Wind_Speed', 'Pressure']:
    header_removed_DF = header_removed_DF.withColumn(col, F.col(col)/10.0)
    

In [None]:
#Assignment 3

In [None]:
#Add a new column named station_WBAN that combines Station and WBAN columns and separates the values using a hyphen
from pyspark.sql.functions import concat, lit, col
header_removed_DF = header_removed_DF.withColumn('station_WBAN', F.concat(F.col('Station'),F.lit('-'),F.col('WBAN')))

#For precipitation columns, replace all missing values with zero.
for col in ['Precipitation_1H', 'Precipitation_6H']:
     header_removed_DF = header_removed_DF.withColumn(col,F.when(F.col(col).isin([9999, -9999, 999, -999]),0).otherwise(F.col(col)))

#For all other columns, replace the missing values with the mean of the measure for the corresponding station
from pyspark.sql import Window
from pyspark.sql.functions import avg, when, col
w = Window.partitionBy(header_removed_DF.Station)
for col in ['Temperature', 'Dew_Point', 'Pressure', 'Wind_Direction', 'Wind_Speed', 'Sky_Condition']:
    header_removed_DF = header_removed_DF.withColumn(col,avg(F.col(col)).over(w))

In [None]:
#End Assignment 3 code

In [None]:
header_removed_DF.show()
print(header_removed_DF.select('Year').distinct().collect())

#convert back to DynamicFrame
newDynamicDF = DynamicFrame.fromDF(header_removed_DF, glueContext, 'clean_df')

#-------------------------------
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://gba6430emily/Glue/weather_transformed", "compression": "gzip"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = newDynamicDF, connection_type = "s3", connection_options = {"path": "s3://gba6430emily/Glue/weather_transformed", "compression": "gzip"}, format = "csv", transformation_ctx = "datasink2")
job.commit()