In [None]:
import sys
sys.argv.append('--JOB_NAME')
sys.argv.append('address-cleaning')

sys.argv.append('--source_catalog_database')
sys.argv.append('housing-repairs-raw-zone')

sys.argv.append('--source_catalog_table')
sys.argv.append('housing_repairs_reactive_rewires')

sys.argv.append('--cleaned_repairs_s3_bucket_target')
sys.argv.append('s3://dataplatform-stg-refined-zone/housing-repairs/repairs-electrical-mechanical-fire/reactive-rewires/cleaned')

In [None]:
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, NGram, HashingTF, MinHashLSH
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, trim, when, max, trim
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from awsglue.dynamicframe import DynamicFrame

def get_glue_env_var(key, default="none"):
    if f'--{key}' in sys.argv:
        return getResolvedOptions(sys.argv, [key])[key]
    else:
        return default

def getLatestPartitions(dfa):
   dfa = dfa.where(col('import_year') == dfa.select(max('import_year')).first()[0])
   dfa = dfa.where(col('import_month') == dfa.select(max('import_month')).first()[0])
   dfa = dfa.where(col('import_day') == dfa.select(max('import_day')).first()[0])
   return dfa

In [None]:
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

source_catalog_database = get_glue_env_var('source_catalog_database', '')
source_catalog_table    = get_glue_env_var('source_catalog_table', '')
cleaned_repairs_s3_bucket_target = get_glue_env_var('cleaned_repairs_s3_bucket_target', '')


sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
logger = glueContext.get_logger()
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


In [None]:
# logger.info('Fetch Source Data')
source_data = glueContext.create_dynamic_frame.from_catalog(
    name_space=source_catalog_database,
    table_name=source_catalog_table,
) 

df = source_data.toDF()
df = getLatestPartitions(df)

In [None]:
df.printSchema()
df.show(vertical=True, n=100)

In [None]:
import re
# clean up column names
logger.info('clean up column names')
def clean_column_names(df):
    # remove full stops from column names
    df = df.select([F.col("`{0}`".format(c)).alias(
        c.replace('.', '')) for c in df.columns])
    # remove trialing underscores
    df = df.select([F.col(col).alias(re.sub("_$", "", col))
                   for col in df.columns])
    # lowercase and remove double underscores
    df2 = df.select([F.col(col).alias(
        re.sub("[^0-9a-zA-Z$]+", "_", col.lower())) for col in df.columns])
    return df2

df2 = clean_column_names(df)

df2.printSchema()
df2.show(vertical=True, n=200)

In [None]:
# only keep relevant columns
df2 = df2[[
    'address',
    'description',
    'date',
    'temp_order_number',
    'priority_code',
    'contractor_s_own_ref_no',
    'new_uhw_number',
    'cost_of_repairs_work',
    'status_of_completed_y_n',
    'requested_by',
    'import_year',
    'import_month',
    'import_day',
    'import_date',
    'import_datetime',
    'import_timestamp'
]]

In [None]:
df2.printSchema()
df2.show(vertical=True, n=200)

In [None]:
df2 = df2.replace('NaT', None).filter(col('date').isNotNull())

In [None]:
df2.printSchema()
df2.show(vertical=True, n=200)

In [None]:
df2 = df2.withColumn('date', F.to_timestamp('date', 'yyyy-MM-dd')).withColumnRenamed('date', 'datetime_raised')
df2 = df2.withColumn('status_of_completed_y_n', F.when(df2['status_of_completed_y_n']=='Y', 'Completed').otherwise(''))\
    .withColumnRenamed('status_of_completed_y_n', 'order_status')

In [None]:
df2.printSchema()
df2.show(vertical=True, n=200)

In [None]:
# add new data source column to specify which repairs sheet the repair came from
df2 = df2.withColumn('data_source', F.lit('ElecMechFire - Reactive Rewires'))

# # rename column names
df2 = df2.withColumnRenamed('requested_by', 'operative') \
    .withColumnRenamed('address', 'property_address') \
    .withColumnRenamed('description', 'description_of_work') \
    .withColumnRenamed('priority_code', 'work_priority_description') \
    .withColumnRenamed('temp_order_number', 'temp_order_number_full') \
    .withColumnRenamed('cost_of_repairs_work', 'order_value')\
    .withColumnRenamed('contractor_s_own_ref_no', 'contractor_ref')

df2.printSchema()
df2.show(vertical=True, n=200)

In [None]:
def map_repair_priority(code):
    if code == 'Immediate':
        return 1
    elif code == 'Emergency':
        return 2
    elif code == 'Urgent':
        return 3
    elif code == 'Normal':
        return 4
    else:
        return None
    

# # convert to a UDF Function by passing in the function and the return type of function (string in this case)
udf_map_repair_priority = F.udf(map_repair_priority, StringType())
# apply function
df2 = df2.withColumn('work_priority_priority_code', udf_map_repair_priority('work_priority_description'))

In [None]:
df2.show(vertical=True)

In [None]:
cleanedDataframe = DynamicFrame.fromDF(df2, glueContext, "cleanedDataframe")
parquetData = glueContext.write_dynamic_frame.from_options(
    frame=cleanedDataframe,
    connection_type="s3",
    format="parquet",
    connection_options={"path": cleaned_repairs_s3_bucket_target,"partitionKeys": ["import_year", "import_month", "import_day", "import_date"]},
    transformation_ctx="parquetData")
job.commit()