
# Glue Studio Notebook
You are now running a **Glue Studio** notebook; before you can start using your notebook you *must* start an interactive session.

## Available Magics
|          Magic              |   Type       |                                                                        Description                                                                        |
|-----------------------------|--------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| %%configure                 |  Dictionary  |  A json-formatted dictionary consisting of all configuration parameters for a session. Each parameter can be specified here or through individual magics. |
| %profile                    |  String      |  Specify a profile in your aws configuration to use as the credentials provider.                                                                          |
| %iam_role                   |  String      |  Specify an IAM role to execute your session with.                                                                                                        |
| %region                     |  String      |  Specify the AWS region in which to initialize a session                                                                                                  |
| %session_id                 |  String      |  Returns the session ID for the running session.                                                                                                          |
| %connections                |  List        |  Specify a comma separated list of connections to use in the session.                                                                                     |
| %additional_python_modules  |  List        |  Comma separated list of pip packages, s3 paths or private pip arguments.                                                                                 |
| %extra_py_files             |  List        |  Comma separated list of additional Python files from S3.                                                                                                 |
| %extra_jars                 |  List        |  Comma separated list of additional Jars to include in the cluster.                                                                                       |
| %number_of_workers          |  Integer     |  The number of workers of a defined worker_type that are allocated when a job runs. worker_type must be set too.                                          |
| %worker_type                |  String      |  Standard, G.1X, *or* G.2X. number_of_workers must be set too. Default is G.1X                                                                            |
| %glue_version               |  String      |  The version of Glue to be used by this session. Currently, the only valid options are 2.0 and 3.0 (eg: %glue_version 2.0)                                |
| %security_config            |  String      |  Define a security configuration to be used with this session.                                                                                            |
| %sql                        |  String      |  Run SQL code. All lines after the initial %%sql magic will be passed as part of the SQL code.                                                            |
| %streaming                  |  String      |  Changes the session type to Glue Streaming.                                                                                                              |
| %etl                        |  String      |   Changes the session type to Glue ETL.                                                                                                                   |
| %status                     |              |  Returns the status of the current Glue session including its duration, configuration and executing user / role.                                          |
| %stop_session               |              |  Stops the current session.                                                                                                                               |
| %list_sessions              |              |  Lists all currently running sessions by name and ID.                                                                                                     |
| %spark_conf                 |  String      |  Specify custom spark configurations for your session. E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer                       |

In [None]:
# I set my own magics (config for the job)
%number_of_workers 20

## default imports by Glue Spark
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [3]:
### my import libraries
from io import StringIO
import numpy as np
import pandas as pd
# import awswrangler as wr
import glob
import re
#import boto3
regex_schema = "/*.csv"
from string import printable
st = set(printable)
from datetime import datetime

import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.functions import when

from pyspark.sql.window import Window
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType, IntegerType

pd.set_option('display.max_rows', 50)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)

## Setting run date & save path
import datetime
a = datetime.datetime.now()
# date_key = a.strftime('%Y%m%d')
date_key = '20221120' # temporary
print(date_key)

# UAMS_PySpark_save_path = 's3://astro-datalake-prod-sandbox/amzar/BB/AddrStd/testing/20221028_UAMS_Step4_PySpark/' # old Qubole Zepp path
UAMS_PySpark_save_path = 's3://astro-groupdata-prod-pipeline/address_standardization/spark_uams_generation/' 

20221120


In [1]:
## set the paths to be read in this cell
final_1_temp_path = UAMS_PySpark_save_path+"phase_2/{}/All_pipeline4-final.orc".format(date_key)
postcode_ref_path = 's3://astro-datalake-prod-ulm-pii/edw_address/postcode_lookup/year=2022/month=08/LKT_POSTCODE_DL_202208.csv.gz'
# old_fullfeed_path = "s3://astro-datalake-prod-sandbox/amzar/BB/other_adhoc/uploaded/address_checking/NEW_FULLFEED_UAMS_WITH_KEY_20220806.csv"
old_fullfeed_path = "s3://astro-groupdata-prod-source/old_fullfeed_uams_with_key/NEW_FULLFEED_UAMS_WITH_KEY_after_zohreh_code.csv" # This is the only FULLFEED UAMS that I could find which is close enough in size to the 20220806 one I used in Qubole. Also, this version was last modified on 16/8/22 which is close enough to 6/8/2022
shamani_data_read_path = "s3://amsdatabucket/Sales Team Data/dump_2022_09_15.csv"




# PHASE 3
- Get the postcode lookup table from EDW and replace the EDW state for all the state in the data based on the postcode
- Remove records with street name less than one record, city-state mismatch and, postcode less than 4 digits.

----
========================================= THIS IS THE START OF PHASE 3 ============================================
- taken from Glue Job: address_standardization-prod-uams_generation_final_5 (Pipeline 5)...
- originally converted to PySpark in Zepp Qubole notebook: https://us.qubole.com/notebooks#recent?id=141821&type=my-notebooks&view=home
Combine all of P1P2 Mapped Addresses from each ISP

## Pipeline 5

In [4]:
## read in the table from end of Pipeline 4
# Final_1 = wr.s3.read_csv(path = final_1_temp_path, dtype = {'Account_No':object, 'OBJID':object})
Final_1 = spark.read.orc(final_1_temp_path)

#revision - fakhrul - 20/6/22 - otherwise postcode of 08000 will be read as 8000
Final_1 = Final_1.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
Final_1 = Final_1.withColumn("Postcode", f.substring(f.col('Postcode'), 1, 5) )
Final_1 = Final_1.withColumn('Postcode', f.lpad(f.col('Postcode').cast('string'), 5, '0') )
print('final 1 is: ', Final_1.select('Postcode').count()) # 8214216

## Read postcode lookup table from EDW bucket
postcode_lookup = spark.read.csv(postcode_ref_path, header=True) ## requires changing based on relevant month
# s3://astro-datalake-prod-ulm-pii/edw_address/postcode_lookup/year=2022/month=09/LKT_POSTCODE_DL_202209.csv.gz
## # path = '/Users/zmmzohreh/OneDrive - MEASAT Broadcast Network Systems Sdn. Bhd/Shared Documents/Input Data/'
## postcode_lookup = pd.read_csv('POSTCODE_REF_TABLE.csv') 
# postcode_lookup = wr.s3.read_csv(path = postcode_ref_path, compression = 'gzip', encoding = 'unicode_escape')
print('postcode lookup is : ', postcode_lookup.select('state').count()) # 108772
print(postcode_lookup.columns)
print(postcode_lookup.head(2))

postcode_lkt = postcode_lookup.select('state','postcode')
postcode_lkt = postcode_lkt.withColumn("postcode", f.regexp_replace(f.col('postcode').cast('string'), '\.0', '') )
postcode_lkt = postcode_lkt.withColumn("postcode", f.lpad(f.substring(f.col('postcode'), 1, 5), 5, '0') )
postcode_lkt = postcode_lkt.withColumnRenamed('postcode', 'lkt_postcode').withColumnRenamed('state', 'lkt_state')
postcode_lkt = postcode_lkt.withColumn('lkt_state', f.upper(f.trim(f.col('lkt_state'))) )
postcode_lkt = postcode_lkt.dropDuplicates() # order does not matter in this de-dupe
print('postcode lookup after cleaning & dedupe is : ', postcode_lkt.select('lkt_state').count()) # 3216

#REVISION - fakhrul zohreh - 12/7/22 - because somehow we lost wilayah persekutuan
postcode_lkt = postcode_lkt.withColumn('lkt_state', when(f.col('lkt_state') == 'WILAYAH PERSEKUTUAN', 'WILAYAH PERSEKUTUAN KUALA LUMPUR').otherwise(f.col('lkt_state')) )

#this code below is added to prevent int + object merge error
Final_1 = Final_1.withColumn('Postcode', f.col('Postcode').cast('string'))

## Get the state from joining to lookup table
Final_2 = Final_1.join(postcode_lkt, Final_1.Postcode == postcode_lkt.lkt_postcode, how='left')
print('final 2 after join to lookup table is: ', Final_2.select('Postcode').count()) # 8644370
Final_2 = Final_2.dropDuplicates() ## order shouldn't matter
print('final 2 after dedupe is: ', Final_2.select('Postcode').count()) # 8644370
print('final 2 account check :', Final_2.select('Account_No').head(10))

## create a new column that checks if lkt_state is null. If yes, return UAMS STATE. Otherwise, return lkt_state
Final_2 = Final_2.withColumn('FINAL_STATE', when(f.col('lkt_state').isNull(), f.col('STATE')).otherwise(f.col('lkt_state')) )

# display updated DataFrame
print(Final_2.head(5))

## create list of valid state values & filter for those. Then reset index
valid_state = ['SELANGOR', 'WILAYAH PERSEKUTUAN KUALA LUMPUR', 'JOHOR', 'SARAWAK',
       'NEGERI SEMBILAN', 'MELAKA', 'KEDAH', 'PULAU PINANG', 'SABAH',
       'PAHANG', 'TERENGGANU', 'PERLIS', 'KELANTAN', 'PERAK', 'LABUAN',
       'PUTRAJAYA']
Final_3 = Final_2.filter(f.col('FINAL_STATE').isin(valid_state))
print('Final_3 count (after filter for valid states)', Final_3.select('Postcode').count()) # 8644370
# create a sequential index as Zohreh did a pandas reset_index at this step again. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
Final_3 = Final_3.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )
print('final 3 account check :', Final_3.select('Account_No').head(10))

## save intermediate table --> have to break it up coz it seems like pyspark took a longgg time (more than 3 hours) trying to run an Action cell
Final_3.write.orc(UAMS_PySpark_save_path+"phase_3/{}/intermediate1.orc".format(date_key), mode='overwrite', compression='snappy')

# ------------------------------------------------------------------------------------------------------------

del Final_1
del Final_2
del Final_3
del postcode_lkt # -- if required 

final 1 is:  8919424
postcode lookup is :  108772
['postcode_id', 'postcode', 'region', 'state_code', 'state', 'city', 'area', 'location', 'tat_hours', 'remarks', 'record_insert_datetime', 'record_update_datetime', 'record_last_update_user', 'etl_job_date', 'etl_job_run_id']
[Row(postcode_id='536873419', postcode='06750', region='NORTHERN REGION 1', state_code='KED', state='KEDAH', city='PENDANG', area='KAMPUNG KAYU TIGA', location='RURAL', tat_hours='48', remarks=None, record_insert_datetime='2022.09.15 02:52:28', record_update_datetime=None, record_last_update_user='SYSTEM', etl_job_date='2022.09.15', etl_job_run_id='220915025228'), Row(postcode_id='536873421', postcode='06750', region='NORTHERN REGION 1', state_code='KED', state='KEDAH', city='PENDANG', area='KAMPUNG PADANG DURIAN', location='RURAL', tat_hours='48', remarks=None, record_insert_datetime='2022.09.15 02:52:28', record_update_datetime=None, record_last_update_user='SYSTEM', etl_job_date='2022.09.15', etl_job_run_id='220

In [5]:
Final_3 = spark.read.orc(UAMS_PySpark_save_path+"phase_3/{}/intermediate1.orc".format(date_key)) ## read in ORC version

## Standardising state name for Wilayah
Final_3 = Final_3.withColumn('FINAL_STATE', when(f.col('FINAL_STATE') == 'WP KUALA LUMPUR', 'WILAYAH PERSEKUTUAN KUALA LUMPUR')
                                            .when(f.col('FINAL_STATE') == 'WP LABUAN', 'LABUAN')
                                            .when(f.col('FINAL_STATE') == 'WP PUTRAJAYA', 'PUTRAJAYA')
                                            .otherwise(f.col('FINAL_STATE')) )


# ### Removing potential street name with error (street name with 1 record)
# create a sequential index as Zohreh did a pandas reset_index at this step again. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
Final_3 = Final_3.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )

## get Number of addresses for each streetname to identify which streetnames to remove (coz there are only 1 case of that street so might be a mistake with the streetname)
street_count = Final_3.groupBy('Street_Name').count().withColumnRenamed('count', 'street_count')
street_count = street_count.withColumn('remove_street', when(f.col('street_count')==1, 1).otherwise(0) )
street_count.head(5)
street_count.groupBy('remove_street').count().show()

# Left join to get remove street flag then remove those streets
Final_3 = Final_3.join(street_count, on='Street_Name', how='left')
print('Final_3 count before removing cases where Street_Name only appears once:', Final_3.select('Street_Name').count()) # 8644370
Final_3 = Final_3.filter(f.col('remove_street') != 1)
# Remove street with the following address - very isolated case
Final_3 = Final_3.filter(f.col('Street_Name') != 'WARISAN PUTERI A17 VILA SURIA BANDAR WARISAN PUTERI 70400 SEREMBAN NEGERI SEMBILAN')
print('Final_3 count after removing cases where Street_Name only appears once:', Final_3.select('Street_Name').count()) # 8641491


# ### Removing potential city-state mismatch (address with City-State less than 1 records)
# create a sequential index as Zohreh did a pandas reset_index at this step again. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
Final_3 = Final_3.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )

## Create key city+FINAL_STATE
Final_3 = Final_3.withColumn('city_state', f.concat_ws(' ,', f.col("City").cast('string'), f.col("FINAL_STATE").cast('string')) )
## get Number of addresses for each city-state combo to identify which city-state combo to remove (coz there are too few cases of that combo so might be an incorret combo)
city_state_count = Final_3.groupBy('city_state').count().withColumnRenamed('count', 'city_state_count')
city_state_count = city_state_count.withColumn('remove_city_state', when(f.col('city_state_count')<=5, 1).otherwise(0) ) # 12/11/2022: from observation, looks like 5 is enough to filter weird combos
city_state_count.head(5)
city_state_count.groupBy('remove_city_state').count().show()

# Left join to get remove street flag then remove those streets
Final_4 = Final_3.join(city_state_count, on='city_state', how='left')
print('Final_4 count before removing cases where city-state appears less than 100 times:', Final_4.select('city_state').count()) # 8641491
Final_4 = Final_4.filter(f.col('remove_city_state') != 1)
print('Final_4 count after removing cases where city-state appears less than 100 times:', Final_4.select('city_state').count()) # 8640835

## save intermediate table --> have to break it up coz it seems like pyspark took a longgg time (more than 3 hours) trying to run an Action cell
Final_4.write.orc(UAMS_PySpark_save_path+"phase_3/{}/intermediate2.orc".format(date_key), mode='overwrite', compression='snappy')

# ------------------------------------------------------------------------------------------------------------

del Final_4 # -- if required 

## read in the intermediate table
Final_4 = spark.read.orc(UAMS_PySpark_save_path+"phase_3/{}/intermediate2.orc".format(date_key)) ## read in ORC version

# #### Remove records with postcode length less than 4
Final_4 = Final_4.withColumn('postcode_length', f.length(f.col('Postcode').cast('string')) )
Final_4 = Final_4.filter(f.col('postcode_length') > 3)
print('Final_4 count after removing records with postcode length less than 4:', Final_4.select('Postcode').count()) # 8640835

# #### Getting Urban/Rural/Remote flag from lookup table

## select & rename columns
Final_5 = Final_4.select('Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable',
       'P_Flag', 'lkt_state', 'lkt_postcode', 'FINAL_STATE')
print('final 5 account check :', Final_5.select('Account_No').head(10))
Final_5 = Final_5.withColumnRenamed('FINAL_STATE', 'State')

## clean columns
Final_5 = Final_5.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
Final_5 = Final_5.withColumn("Postcode", f.substring(f.col('Postcode'), 1, 5) )
Final_5 = Final_5.withColumn('Postcode', f.lpad(f.col('Postcode').cast('string'), 5, '0') )

Final_5 = Final_5.withColumn('State', f.trim(f.col('State')) )
Final_5 = Final_5.withColumn('City', f.trim(f.col('City')) )
Final_5 = Final_5.withColumn('Area', f.trim(f.col('Area')) )

Final_5 = Final_5.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )
Final_5 = Final_5.fillna('', subset=['Area'])

## Adding ur_key for flagging urban/rural
Final_5 = Final_5.withColumn("ur_key", f.concat_ws(" ,", f.col('Area'), f.col('Postcode'), f.col('City'), f.col('State')) )
print(Final_5.head(5))

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Megabytes):')
# print(usage)

## save intermediate table --> have to break it up coz it seems like pyspark took a longgg time (more than 3 hours) trying to run an Action cell
Final_5.write.orc(UAMS_PySpark_save_path+"phase_3/{}/intermediate3.orc".format(date_key), mode='overwrite', compression='snappy')
Final_5.coalesce(1).write.csv(UAMS_PySpark_save_path+"phase_3/{}/intermediate3.csv.gz".format(date_key), header=True, mode='overwrite', compression='gzip')

+-------------+-----+
|remove_street|count|
+-------------+-----+
|            1| 5715|
|            0|66450|
+-------------+-----+

Final_3 count before removing cases where Street_Name only appears once: 9384958
Final_3 count after removing cases where Street_Name only appears once: 9379243
+-----------------+-----+
|remove_city_state|count|
+-----------------+-----+
|                1|  571|
|                0|  813|
+-----------------+-----+

Final_4 count before removing cases where city-state appears less than 100 times: 9379243
Final_4 count after removing cases where city-state appears less than 100 times: 9378206
Final_4 count after removing records with postcode length less than 4: 9378206
final 5 account check : [Row(Account_No='83022571'), Row(Account_No='91335951'), Row(Account_No='81575795'), Row(Account_No='89766478'), Row(Account_No='96826547'), Row(Account_No='94306413'), Row(Account_No='83353661'), Row(Account_No='95682165'), Row(Account_No=''), Row(Account_No='')]
[R

# Phase 4
Adding Cust_Location to UAMS data
- To use area + postcode + city + state as mapping key instead of postcode
- To prepare unique list of area + postcode + city + state that do not have Urban/Rural or tagged as both Urban/Rural

----
========================================= THIS IS THE START OF PHASE 4 ============================================
- taken from Glue Job: address_standardization-prod-uams_generation_final_6 (Pipeline 6)
- originally converted to PySpark in Zepp Qubole notebook: https://us.qubole.com/notebooks#recent?id=141821&type=my-notebooks&view=home

## Pipeline 6

In [6]:
## POSTCODE REFERENCE TABLE from EDW
# ref_table = pd.read_csv('POSTCODE_REF_TABLE.csv') ## Read from EDW bucket / ##ref_table = pd.read_csv('LKT_POSTCODE_DL_202202.csv') ## Read from EDW bucket / ## ref_table = wr.s3.read_csv(path = postcode_ref_path, compression = 'gzip', encoding = 'unicode_escape')
# Final_5 = wr.s3.read_csv(path = final_5_temp_path, dtype = {'OBJID':object, 'Account_No':object})
ref_table = spark.read.csv(postcode_ref_path, header=True) ## requires changing based on relevant month
Final_5 = spark.read.orc(UAMS_PySpark_save_path+"phase_3/{}/intermediate3.orc".format(date_key))

#revision - fakhrul - 20/6/22 - otherwise postcode of 08000 will be read as 8000
Final_5 = Final_5.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
Final_5 = Final_5.withColumn("Postcode", f.substring(f.col('Postcode'), 1, 5) )
Final_5 = Final_5.withColumn('Postcode', f.lpad(f.col('Postcode'), 5, '0') )

print('checking ref_table numbers:', ref_table.count()) # 108772
print('checking ref_table columns:', ref_table.columns)
print('checking final_5 columns:', Final_5.columns)
print(Final_5.select('Account_No').head(10))

## clean the ref_table's postcode & more cleaning
ref_table = ref_table.withColumn("Postcode", f.regexp_replace(f.col('postcode').cast('string'), '\.0', '') )
ref_table = ref_table.withColumn("Postcode", f.substring(f.col('Postcode'), 1, 5) )
ref_table = ref_table.withColumn('Postcode', f.lpad(f.col('Postcode'), 5, '0') )
ref_table = ref_table.fillna('', subset=['area'])

## Replace WILAYAH PERSEKUTUAN to WILAYAH PERSEKUTUAN KUALA LUMPUR
ref_table = ref_table.withColumn('state', when(f.col('state') == 'WILAYAH PERSEKUTUAN', 'WILAYAH PERSEKUTUAN KUALA LUMPUR').otherwise(f.col('state')) )

ref_table = ref_table.withColumn('state', f.upper(f.trim(f.col('state'))) )
ref_table = ref_table.withColumn('city', f.upper(f.trim(f.col('city'))) )
ref_table = ref_table.withColumn('area', f.upper(f.trim(f.col('area'))) )
print(ref_table.select('state').distinct().show(20))

## Adding ur_key for flagging urban/rural then select relevant columns for dedupe
ref_table = ref_table.withColumn("ur_key", f.concat_ws(" ,", f.col('area').cast('string'), f.col('postcode').cast('string'), f.col('city').cast('string'), f.col('state').cast('string')) )
ref_table2 = ref_table.select('ur_key', 'location')
print('ref_table2 count before dedupe on ur_key & location:', ref_table2.select('ur_key').count()) # 108772
ref_table2 = ref_table2.dropDuplicates()
print('ref_table2 count after dedupe on ur_key & location:', ref_table2.select('ur_key').count()) # 108621
print(ref_table2.head(2))
ref_table3 = ref_table2.drop_duplicates() # this step seems unnecessary
print('ref_table3 count:', ref_table3.select('ur_key').count()) # 108621

## Not sure what below code does, but doesn't look like it's used later on so comment out for now
# ORI CODE --> STRT_P1_dup = ref_table3.groupby('ur_key').filter(lambda x : x['ur_key'].shape[0]>1)
# ATTEMPT to convert to PySpark: STRT_P1_dup = ref_table3.groupBy('ur_key').count().filter(f.col('count') > 1)
# print('STRT_P1_dup count:', STRT_P1_dup.select('ur_key').count())
# z.show(STRT_P1_dup.head(100))

## Flag AREA+POSTCODE+CITY+STATE that tagged as both urban and rural
location_pivot = ref_table.groupBy('ur_key').agg(f.concat_ws('/', f.collect_set('location')).alias('Cust_Location'))
location_pivot = location_pivot.withColumn('Cust_Location', when(f.col('Cust_Location')=='RURAL/URBAN', 'URBAN/RURAL').otherwise(f.col('Cust_Location')) )
# z.show(location_pivot.head(100))
# location_pivot.select('Cust_Location').distinct().show()

## merge Final_5 to location_pivot
Final_6 = Final_5.join(location_pivot,on='ur_key',how='left')
print('Final_6 count:', Final_6.select('ur_key').count()) # 8640835


# #### Split UAMS data into 2 groups:
#     1. Addresses with only Urban, Rural, Remote -- good to go for UAMS ingestion
#     2. Addresses without Urban/Rural flag & Address with both Urban and Rural -- need to share with IFS

## Addresses with only Urban or Rural -- good to go for UAMS ingestion
valid_loc = Final_6.filter(f.col('Cust_Location').isNotNull())
valid_loc = valid_loc.withColumn('Location', f.col('Cust_Location')).filter(f.col('Location').isin('URBAN','RURAL','REMOTE'))
print('valid_loc count:', valid_loc.select('Location').count()) # 8145739

## Addresses without Urban/Rural flag & Address with both Urban and Rural -- need to share with IFS
invalid_loc = Final_6.filter( (f.col('Cust_Location').isNull()) | (~f.col('Cust_Location').isin('URBAN','RURAL', 'REMOTE')) )
invalid_loc = invalid_loc.fillna('', subset=['Area'])
print('Count of invalid_loc', invalid_loc.select('ur_key').count()) # 495096
print('Count of Unique UR Key WITHOUT valid location', invalid_loc.select(f.countDistinct('ur_key')).show()) # 8257
print('-----------------------')


checking ref_table numbers: 108772
checking ref_table columns: ['postcode_id', 'postcode', 'region', 'state_code', 'state', 'city', 'area', 'location', 'tat_hours', 'remarks', 'record_insert_datetime', 'record_update_datetime', 'record_last_update_user', 'etl_job_date', 'etl_job_run_id']
checking final_5 columns: ['Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'lkt_state', 'lkt_postcode', 'State', 'ur_key']
[Row(Account_No='83022571'), Row(Account_No='91335951'), Row(Account_No='81575795'), Row(Account_No='89766478'), Row(Account_No='96826547'), Row(Account_No='94306413'), Row(Account_No='83353661'), Row(Account_No='95682165'), Row(Account_No=''), Row(Account_No='')]
+--------------------+
|               state|
+--------------------+
|          TERENGGANU|
|           PUTRAJAYA|
|WILAYAH PERSEKUTU...|
|               PERAK|
|           

In [12]:
### In early Nov, Fakhrul said this below code don't need to be run anymore
# #Edit - fakhrul 15/6/22 same comment as below
# invalid_loc_list = invalid_loc.select(['Area','Postcode','City','State']).drop_duplicates()
# #Edit - fakhrul - 15/6/22 invalid loc we dont use it it seems. will change back if we do
# invalid_loc_list.coalesce(1).write.csv(UAMS_PySpark_save_path+'invalid_loc_list_{}.csv'.format(date_key), header=True, mode='overwrite') ## save it somewhere to be shared to IFS

# ### Update UAMS with updated lookup table from IFS starts here -- only do this if EDW lookup table not updated by IFS

# # Updated reference table from IFS team for address missing urban/rural
# ifs_updated = spark.read.csv(UAMS_PySpark_save_path+'uploaded/Updated_Invalid_UrbanRural_from_IFS.csv', header=True) ## Read from source bucket??

# ## clean columns then create ur_key
# ifs_updated = ifs_updated.withColumn("Area", when(f.col('Area') == '14-Jan', '14JAN').otherwise(f.col('Area')) )
# ifs_updated = ifs_updated.fillna('', subset=['Area'])

# ifs_updated = ifs_updated.withColumn("Postcode", f.regexp_replace(f.col('postcode').cast('string'), '\.0', '') )
# ifs_updated = ifs_updated.withColumn("Postcode", f.substring(f.col('Postcode'), 1, 5) )
# ifs_updated = ifs_updated.withColumn('Postcode', f.lpad(f.col('Postcode'), 5, '0') )

# print('IFS_Updated count:', ifs_updated.select('Area').count())

# ifs_updated = ifs_updated.withColumn("ur_key", f.concat_ws(" ,", f.col('Area').cast('string'), f.col('Postcode').cast('string'), f.col('City').cast('string'), f.col('Area').cast('State')) )
# print(ifs_updated.head(2))

# ifs_updated = ifs_updated.select(['ur_key','Location']).withColumn('Location', f.upper(f.col('Location').cast('string')) )
# print(ifs_updated.column)

# updated_invalid = invalid_loc.join(ifs_updated, on='ur_key', how='left')
# print('updated_invalid count:', updated_invalid.select('ur_key').count())
# updated_invalid.head(2)

# final_UR = invalid_loc.join(ifs_updated,on='ur_key',how='left')
# print('final_UR count:', final_UR.select('ur_key').count())
# final_UR.head(2)
# final_UR.select('Location').distinct().show()

## Appending all dataframe with fixed U/R tagging for UAMS
# final_UR = pd.concat([valid_loc,updated_invalid]) ## this step is to be used if IFS has re-standardized
final_UR = valid_loc
print('final_UR:', final_UR.count()) # 8145739
print('checking final_ur here :', final_UR.select('Account_No').head(10))

# keep UAMS data with cust location URBAN/RURAL/REMOTE only
final_UR_removed_null = final_UR.filter(f.col('Location').isin('URBAN', 'RURAL', 'REMOTE'))

## clean postcode column (make into string, remove float \.0, select only first 5 digits, lpad with '0')
# Final['Postcode']  = Final['Postcode'] .replace({r'[^\x00-\x7F]+':''}, regex=True, inplace=True)
final_UR_removed_null = final_UR_removed_null.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
final_UR_removed_null = final_UR_removed_null.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )

## Handling all nan and decimals 
final_UR_removed_null = final_UR_removed_null.fillna('')
print('checking final_ur_removed_null here :', final_UR_removed_null.filter(f.col('Account_No') != '').select('Account_No').head(10))

## ensure these cols are string, remove float \.0, & replace nulls with blanks
final_UR_removed_null = final_UR_removed_null.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )
final_UR_removed_null = final_UR_removed_null.withColumn("OBJID", f.regexp_replace(f.col('OBJID').cast('string'), '\.0', '') )
final_UR_removed_null = final_UR_removed_null.withColumn("Account_No", f.regexp_replace(f.col('Account_No'), 'nan|NAN|null', '') )
final_UR_removed_null = final_UR_removed_null.withColumn("OBJID", f.regexp_replace(f.col('OBJID'), 'nan|NAN|null', '') )

## select & rename columns
final_UR_removed_null = final_UR_removed_null.select(['Key','Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City',
       'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State','Location'])
final_UR_removed_null = final_UR_removed_null.withColumnRenamed('Location', 'UR_Flag')  

# print(final_UR_removed_null.filter(f.col('Account_No').isNotNull()).count()) # 0
print('No of non-blank account_no in final_UR_removed_null', final_UR_removed_null.filter(f.col('Account_No') != '').count()) # 4063540 (less than half of the size
print('checking final_UR_removed_null account no: ', final_UR_removed_null.filter(f.col('Account_No') != '').select('Account_No').head(10)) # this should not be null right?

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Megabytes):')
# print(usage)

## save intermediate table --> have to break it up coz it seems like pyspark takes a longgg time trying to run an Action cell with so many transformations
final_UR_removed_null.write.orc(UAMS_PySpark_save_path+"phase_4/{}/final_UR_removed_null.orc".format(date_key), mode='overwrite', compression='snappy')
final_UR_removed_null.coalesce(1).write.csv(UAMS_PySpark_save_path+"phase_4/{}/final_UR_removed_null.csv.gz".format(date_key), header=True, mode='overwrite', compression='gzip')

# final_UR_removed_null.to_csv('NEW_FULLFEED_UAMS_WITH_KEY_mar.csv',index=False) ## Save in pipeline bucket
# wr.s3.to_csv(df = final_UR_removed_null, path = final_removed_null_save_path + 'final_UR_removed_null_temp.csv')
# Final_3.to_csv('TEMP_NEW_FULLFEED_UAMS_WITH_KEY.csv',index=False)
# Final_3.head()


final_UR: 8842843
checking final_ur here : [Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No='')]
checking final_ur_removed_null here : [Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No=''), Row(Account_No='')]
No of non-blank account_no in final_UR_removed_null 4063540
checking final_UR_removed_null account no:  [Row(Account_No='86578100'), Row(Account_No='91750064'), Row(Account_No='89825660'), Row(Account_No='84399575'), Row(Account_No='98072161'), Row(Account_No='84320754'), Row(Account_No='98351724'), Row(Account_No='97235235'), Row(Account_No='83140280'), Row(Account_No='97248767')]


# Phase 5
- Removing duplicate address with different address ID

----
========================================= THIS IS THE START OF PHASE 5 ============================================
- taken from Glue Job: address_standardization-prod-uams_generation_final_7_backup (Pipeline 7)
- originally converted to PySpark in Zepp Qubole notebook: https://us.qubole.com/notebooks#recent?id=141821&type=my-notebooks&view=home

## Pipeline 7

In [16]:
# new_uams = Final_4
final_UR_removed_null = spark.read.orc(UAMS_PySpark_save_path+"phase_4/{}/final_UR_removed_null.orc".format(date_key))
final_UR_removed_null = final_UR_removed_null.select(['Key', 'Address_ID', 'Account_No', 'OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag'])
print('checking objid value after reading :', final_UR_removed_null.select('OBJID').head(5))

#revision - fakhrul - 20/6/22 - otherwise postcode of 08000 will be read as 8000
## clean postcode column (make into string, remove float \.0, select only first 5 digits, lpad with '0')
final_UR_removed_null = final_UR_removed_null.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
final_UR_removed_null = final_UR_removed_null.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )

new_uams = final_UR_removed_null
del final_UR_removed_null
print('new uams without duplicate removal is :', new_uams.count()) # 8145739

## read in OLD UAMS
old_uams = spark.read.csv(old_fullfeed_path, header=True) # read in a version which has "Key"
#old_uams = pd.read_csv('Old_FULLFEED_UAMS_WITH_KEY.csv') # from source bucket
# old_uams = spark.read.csv(UAMS_PySpark_save_path+"uploaded/Old_Fullfeed_UAMS-20220818(1).csv.gz", header=True) # this looks like the latest version available on AWS as of 13/11/2022 # not sure why this version of Old UAMS Fullfeed DOESN'T have "Key" column in it... But I may have to recreate the Key column...
old_uams = old_uams.select('Key', 'Address_ID', 'Account_No', 'OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag') 
# old_uams = old_uams.withColumn("Key", f.concat_ws(" ,", "House_No", "Building_Name", "Street_Type_1", "Street_1_New", "Street_Type_2", "Street_2_New", "STD_CITY", "AREA", "POSTCODE", "STATE") )
# old_uams = old_uams.withColumn("Key", f.regexp_replace(f.upper(f.col("Key")), " ", "") )

## ensure these cols are string, remove float \.0
old_uams = old_uams.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )
old_uams = old_uams.withColumn("OBJID", f.regexp_replace(f.col('OBJID').cast('string'), '\.0', '') )
print('checking objid value after reading in old uams :', old_uams.select('OBJID').head(10))
print('old_uams serviceable checking :', old_uams.groupBy('Serviceable').count().orderBy('count').show())
print('old_uams without duplicate removal is :', old_uams.select('Serviceable').count()) # 9381682
print('checking old uams info ',old_uams.columns)

## don't know how to pyspark these lines below:
# print('new_uams sum of nulls:', new_uams.isna().sum())
# print('old_uams sum of nulls:', old_uams.isna().sum())

## fillna & deal with nulls
old_uams = old_uams.fillna('')
old_uams = old_uams.withColumn("Account_No", f.regexp_replace(f.col('Account_No'), 'nan|NAN|null', '') )

## clean postcode column (make into string, remove float \.0, select only first 5 digits, lpad with '0')
old_uams = old_uams.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
old_uams = old_uams.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Megabytes):')
# print(usage)

## Create key to check for duplicates address
old_uams = old_uams.withColumn("duplicate_key", f.concat_ws(" ,", f.col('Account_No').cast('string'),  f.col('House_No').cast('string'),  f.col('Building_Name').cast('string'),  f.col('Street_Type').cast('string'),  f.col('Street_Name').cast('string'),  f.col('Area').cast('string'),  f.col('City').cast('string'),  f.col('Postcode').cast('string'),  f.col('State').cast('string')) )
old_uams = old_uams.withColumn("duplicate_key", f.regexp_replace(f.upper(f.trim(f.col('duplicate_key').cast('string'))), " ", "") )

# create a sequential index to prepare for the dedupe step next. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
old_uams = old_uams.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )
## de-dupe on Key, & keep first based on index. To do in Spark: https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first
window = Window.partitionBy(['duplicate_key']).orderBy(f.col("index").asc())
old_uams_removed_dup = old_uams.withColumn('row', f.row_number().over(window)).filter(col('row') == 1).drop('row')
print('old_uams with duplicate removal is :', old_uams_removed_dup.select('duplicate_key').count()) # 9381639
print('old_uams_removed_dup account id checking: ', old_uams_removed_dup.head(2))

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Megabytes):')
# print(usage)

## fillna & deal with nulls
new_uams = new_uams.fillna('')
new_uams = new_uams.withColumn("Account_No", f.regexp_replace(f.col('Account_No'), 'nan|NAN|null', '') )
new_uams = new_uams.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )

## clean postcode column (make into string, remove float \.0, select only first 5 digits, lpad with '0')
new_uams = new_uams.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
new_uams = new_uams.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Megabytes):')
# print(usage)

## Create key to check for duplicates address
new_uams = new_uams.withColumn("duplicate_key", f.concat_ws(" ,", f.col('Account_No').cast('string'),  f.col('House_No').cast('string'),  f.col('Building_Name').cast('string'),  f.col('Street_Type').cast('string'),  f.col('Street_Name').cast('string'),  f.col('Area').cast('string'),  f.col('City').cast('string'),  f.col('Postcode').cast('string'),  f.col('State').cast('string')) )
new_uams = new_uams.withColumn("duplicate_key", f.regexp_replace(f.upper(f.trim(f.col('duplicate_key').cast('string'))), " ", "") )

# create a sequential index to prepare for the dedupe step next. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
new_uams = new_uams.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )
## de-dupe on Key, & keep first based on index. To do in Spark: https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first
window = Window.partitionBy(['duplicate_key']).orderBy(f.col("index").asc())
new_uams_removed_dup = new_uams.withColumn('row', f.row_number().over(window)).filter(col('row') == 1).drop('row')
print('new uams with duplicate removal is :', new_uams_removed_dup.select('duplicate_key').count()) # 8095903
print('new_uams account id checking: ', new_uams_removed_dup.head(2))

# ### Adding AddressID for new UAMS 
old_uams = old_uams_removed_dup
new_uams = new_uams_removed_dup

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Megabytes):')
# print(usage)

## save intermediate tables --> have to break it up coz it seems like pyspark takes a longgg time trying to run an Action cell with so many transformations
new_uams.write.orc(UAMS_PySpark_save_path+"phase_5/{}/new_uams_intermediate1.orc".format(date_key), mode='overwrite', compression='snappy')
# new_uams.coalesce(1).write.csv(UAMS_PySpark_save_path+"new_uams_temp_7_backup_{}.csv.gz".format(date_key), header=True, mode='overwrite', compression='gzip')
old_uams.write.orc(UAMS_PySpark_save_path+"phase_5/{}/old_uams_intermediate1.orc".format(date_key), mode='overwrite', compression='snappy')
# old_uams.coalesce(1).write.csv(UAMS_PySpark_save_path+"old_uams_temp_7_backup_{}.csv.gz".format(date_key), header=True, mode='overwrite', compression='gzip')

#Revision - fakhrul - 27/6/22 the merging is too big, it killed the job. have to build another pipeline
# wr.s3.to_csv(df = new_uams, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_uams_temp_7_backup.csv')
# wr.s3.to_csv(df = old_uams, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_uams_temp_7_backup.csv')

checking objid value after reading : [Row(OBJID=''), Row(OBJID=''), Row(OBJID=''), Row(OBJID=''), Row(OBJID='')]
new uams without duplicate removal is : 8842843
checking objid value after reading in old uams : [Row(OBJID=None), Row(OBJID=None), Row(OBJID=None), Row(OBJID=None), Row(OBJID=None), Row(OBJID='53445914'), Row(OBJID=None), Row(OBJID=None), Row(OBJID='16848414'), Row(OBJID=None)]
+--------------+-----+
|   Serviceable|count|
+--------------+-----+
| "   ""5-14-9"|    1|
| "    ""2-3-1"|    1|
| "     ""5-05"|    1|
| " 6220"" 42B"|    1|
| "    ""01-04"|    1|
| "    ""9-3-1"|    1|
|"""1062-12-22"|    1|
| " 6220"" 42C"|    1|
| "  ""10-5-02"|    1|
| "    ""5-2-3"|    1|
| "   ""2220-1"|    1|
| "  ""123-7-4"|    1|
| "   ""125-37"|    1|
| "    ""7-2-9"|    1|
| "    ""18-19"|    1|
| "  ""23 2-17"|    1|
| "  ""4-10-11"|    1|
| "   ""1A-3-1"|    1|
| "    ""1-12B"|    1|
| "  ""2C-16-7"|    1|
+--------------+-----+
only showing top 20 rows

old_uams serviceable checking

# Phase 6
- Adding AddressID for new UAMS
- Compare New AMS and Old AMS
- Adding Shamani's data to the delta delete

----
========================================= THIS IS THE START OF PHASE 6 ============================================
- taken from Glue Job: address_standardization-prod-uams_generation_final_7 (Pipeline 7)
- originally converted to PySpark in Zepp Qubole notebook: https://us.qubole.com/notebooks#recent?id=141821&type=my-notebooks&view=home

## Pipeline 7

In [17]:
## ============================================================ THIS IS THE START OF Pipeline 7 - FINAL ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_7
# ### Adding AddressID for new UAMS 
## ====================================

new_uams = spark.read.orc(UAMS_PySpark_save_path+"phase_5/{}/new_uams_intermediate1.orc".format(date_key))
# new_uams = wr.s3.read_csv('s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_uams_temp_7_backup.csv')
print('this is new uams cols', new_uams.columns)

old_uams = spark.read.orc(UAMS_PySpark_save_path+"phase_5/{}/old_uams_intermediate1.orc".format(date_key))
# old_uams = wr.s3.read_csv('s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_uams_temp_7_backup.csv')
print('this is old uams cols', old_uams.columns)

## rename column (as spark does not add suffixes to duplicate columns after joining)
old_uams = old_uams.withColumnRenamed('Address_ID', 'Address_ID_old')
Merge_new_old_key = new_uams.join(old_uams.select('Key', 'Address_ID_old'), on ='Key', how = 'left')
print('Merge_new_old_key count', Merge_new_old_key.select('Key').count()) # 21008589
print('checking merge new old account no unique values :', Merge_new_old_key.groupBy('Account_No').count())

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Megabytes):')
# print(usage)

## save intermediate tables --> have to break it up coz it seems like pyspark takes a longgg time trying to run an Action cell with so many transformations
Merge_new_old_key.write.orc(UAMS_PySpark_save_path+"phase_6/{}/merge_new_old_key_intermediate1.orc".format(date_key), mode='overwrite', compression='snappy')
Merge_new_old_key.coalesce(1).write.csv(UAMS_PySpark_save_path+"phase_6/{}/merge_new_old_key_intermediate1.csv.gz".format(date_key), header=True, mode='overwrite', compression='gzip')

#Revision - fakhrul - 27/6/22 the merging is too big, it killed the job. have to build another pipeline
# wr.s3.to_csv(df = Merge_new_old_key, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/merge_new_old_key_temp_7.csv')
#wr.s3.to_csv(df = old_uams, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_uams_temp_7.csv')

this is new uams cols ['Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag', 'duplicate_key', 'index']
this is old uams cols ['Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag', 'duplicate_key', 'index']
Merge_new_old_key count 21574347
checking merge new old account no unique values : DataFrame[Account_No: string, count: bigint]


## Pipeline 8

In [19]:
## ============================================================ THIS IS THE START OF Pipeline 8 - BACKUP 1 ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_8_backup
# according to Fakhrul, the order for pipeline 8 is final_8_backup -> final_8_backup_2 -> final_8_backup_3 -> final_8
# ----------------------------------
### Split new and old UAMS

## 13/11/2022: Amzar's pyspark method does not run the for loop that Fakhrul used
Merge_new_old_key = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/merge_new_old_key_intermediate1.orc".format(date_key)) # count = 21008589
New_address = Merge_new_old_key.filter(f.col('Address_ID_old').isNull()) 
Old_address = Merge_new_old_key.filter(f.col('Address_ID_old').isNotNull()) 
print('New Address count:', New_address.select('Address_ID_old').count()) # 3959358
print('Old Address count:', Old_address.select('Address_ID_old').count()) # 17049231

## save the files
New_address.write.orc(UAMS_PySpark_save_path+"phase_6/{}/new_address_intermediate1.orc".format(date_key), mode='overwrite', compression='snappy')
Old_address.write.orc(UAMS_PySpark_save_path+"phase_6/{}/old_address_intermediate1.orc".format(date_key), mode='overwrite', compression='snappy')

#Revision - fakhrul - 27/6/22 - dropped duplicate key and unnamed column and also address_ID_x to save on memory
# chunksize = 100000
# count = 1
# for Merge_new_old_key in wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/merge_new_old_key_temp_7.csv', usecols = ['Key','Account_No','OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag','Address_ID_y'], chunksize=chunksize):
#     New_address = Merge_new_old_key[Merge_new_old_key['Address_ID_y'].isnull()]
#     Old_address = Merge_new_old_key[Merge_new_old_key['Address_ID_y'].notnull()]
#     wr.s3.to_csv(df = New_address, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_backup_before_8_folder' + '/' + timestr + '/new_address_backup_before_8_' + str(count) + '.csv')
#     wr.s3.to_csv(df = Old_address, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_address_backup_before_8_folder' + '/' + timestr + '/old_address_backup_before_8_' + str(count) + '.csv')
#     count += 1

# print('memory after reading : ')
# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Megabytes):')
# print(usage)

New Address count: 4207663
Old Address count: 17366684


In [20]:
## ============================================================ THIS IS THE START OF Pipeline 8 - BACKUP 2 ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_8_backup_2
# according to Fakhrul, the order for pipeline 8 is final_8_backup -> final_8_backup_2 -> final_8_backup_3 -> final_8
# new_address_path = args['new_address_path'] = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_backup_before_8_folder/new_address_backup_before_8_*.csv
## ================================================================================================================================

#Revision - fakhrul - 27/6/22 - dropped duplicate key and unnamed column and also address_ID_x to save on memory
#Merge_new_old_key = wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/merge_new_old_key_temp_7.csv',
#usecols = ['Key','Account_No','OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag','Address_ID_y'],
#dtype = {'Postcode':object, 'Address_ID_y': 'float32', 'Account_No': 'float32', 'OBJID': 'float32'})

## read in old_uams to get latest Address_ID
old_uams = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/old_address_intermediate1.orc".format(date_key))
# old_uams = wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_uams_temp_7_backup.csv', usecols = ['Address_ID'], dtype = {'Address_ID': 'int32'})

#print('this is merge info :', Merge_new_old_key.info())
#print('this is coolumns of merge: ', Merge_new_old_key.columns)
#print('this is old uams info :', old_uams.info())
#print('this is coolumns of old_uams: ', old_uams.columns)

#print('memory after reading : ')
#usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
#print('[debug] memory usage is (Megabytes):')
#print(usage)

### Add Address_ID to New_AMS

## first extract the last Address_ID currently available (max)
Last_Address_ID = old_uams.select(f.max(f.col('Address_ID').cast('integer'))).first()[0]
print('Last_Address_ID:', Last_Address_ID, 'Last_Address_ID type:', type(Last_Address_ID)) # 12841512 

## read in New_address
New_address = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/new_address_intermediate1.orc".format(date_key))

# New_address = wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_backup_before_8_folder/' + timestr + '/' + 'new_address_backup_before_8_*.csv')
#print('this is last address id max:', Last_Address_ID)
#
#New_address = Merge_new_old_key[Merge_new_old_key['Address_ID_y'].isnull()]
#Old_address = Merge_new_old_key[Merge_new_old_key['Address_ID_y'].notnull()]
#
#print('memory after not null and null : ')
#usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
#print('[debug] memory usage is (Megabytes):')
#print(usage)
#
#print('acc no of new: ', New_address[New_address.Account_No.isnull()])
#print('p flag of new: ', New_address[New_address.P_Flag.isnull()])
#print('acc no of old: ', Old_address[Old_address.Account_No.isnull()])
#print('p flag of old: ', Old_address[Old_address.P_Flag.isnull()])


#revision - fakhrul - 3/7/22 - regex to see if it makes sense or not
#New_address= New_address[New_address["House_No"]!= "-"]
#New_address= New_address[New_address["House_No"]!= "\*"]
#New_address= New_address[New_address["House_No"]!= "\&"]
#New_address= New_address[New_address["House_No"]!= "\+"]
#New_address= New_address[New_address["House_No"]!= "\*/"]
#New_address= New_address[New_address["House_No"]!= "\-..-"]

#New_address['House_No'] = New_address['House_No'].astype(str)
#New_address = New_address[~New_address['House_No'].str.contains('-', regex = True)]
#New_address = New_address[~New_address['House_No'].str.contains('\*', regex = True)]
#New_address = New_address[~New_address['House_No'].str.contains('\&', regex = True)]
#New_address = New_address[~New_address['House_No'].str.contains('\+', regex = True)]
#New_address = New_address[~New_address['House_No'].str.contains('\*/', regex = True)]
#New_address = New_address[~New_address['House_No'].str.contains('\-..-', regex = True)]
#New_address = New_address[~New_address['House_No'].str.contains('\:', regex = True)]
#New_address = New_address[~New_address['House_No'].str.contains('VALUE', regex = True)]
#New_address = New_address[~New_address['House_No'].str.contains('nan', regex = True)]
#New_address = New_address[New_address['Building_Name'] != New_address['State']]
#New_address = New_address[New_address['Building_Name'] != New_address['City']]
#New_address = New_address[New_address['Building_Name'] != 'KUALA TERENGGANU']

#revision - fakhrul 8/7/22 - testing this out 
#New_address['Key'] = New_address['Key'].astype(str)
#New_address = New_address[~New_address['Key'].str.contains("[`^_]", regex = True)]

print('checking new address here: ', New_address.count(), New_address.columns) # 3959358 
#print('checking old address here: ', Old_address.shape)

#wr.s3.to_csv(df = New_address, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_test_check.csv')

#print('checking memory usage here to see if its ridiculous or not : ')
#usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
#print('[debug] memory usage is (Megabytes):')
#print(usage)

#print('checking new address shape', New_address.shape)
#print('checking old address shape', Old_address.shape)

# create a sequential index as Zohreh did a pandas reset_index at this step. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
New_address = New_address.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )
## de-dupe on Key, & keep LAST based on index. To do in Spark: https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first
window = Window.partitionBy(['Key']).orderBy(f.col("index").desc()) ## descending because we want to keep 'last'
New_address = New_address.withColumn('row', f.row_number().over(window)).filter(col('row') == 1).drop('row')
print('New_address count AFTER de-dupe on Key:', New_address.select('Key').count()) # 3459577

## Generate new ID for new address
i = Last_Address_ID
print(i) # 12841512
j= i+2 # 12841514
# New_address = New_address.reset_index()

New_address = New_address.withColumn('Address_ID_old', f.col('index') + j)
# New_address['Address_ID_y'] = New_address.index + j
print('this is new address account no checking', New_address.select('Account_No').head(10))

#Old_address = Old_address.drop_duplicates(subset = ['Account_No', 'Address_ID_y'])
#print('this is old address account no checking', Old_address.Account_No.head(100))

## save the files
New_address.write.orc(UAMS_PySpark_save_path+"phase_6/{}/new_address_intermediate2.orc".format(date_key), mode='overwrite', compression='snappy')
# wr.s3.to_csv(df = New_address, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_backup_before_8_2.csv')
#wr.s3.to_csv(df = Old_address, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_address_backup_before_8.csv')

del old_uams

# print('checking memory usage here to see if its ridiculous or not : ')
# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Megabytes):')
# print(usage)

Last_Address_ID: 8018089 Last_Address_ID type: <class 'int'>
checking new address here:  4207663 ['Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag', 'duplicate_key', 'index', 'Address_ID_old']
New_address count AFTER de-dupe on Key: 3713748
8018089
this is new address account no checking [Row(Account_No='97561971'), Row(Account_No=''), Row(Account_No=''), Row(Account_No='97725472'), Row(Account_No=''), Row(Account_No='97438645'), Row(Account_No='98624835'), Row(Account_No='97515688'), Row(Account_No='97532218'), Row(Account_No='97522870')]


In [21]:
## ============================================================ THIS IS THE START OF Pipeline 8 - BACKUP 3 ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_8_backup_3
# according to Fakhrul, the order for pipeline 8 is final_8_backup -> final_8_backup_2 -> final_8_backup_3 -> final_8
# old_address_path = args['old_address_path'] = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_address_backup_before_8_folder/old_address_backup_before_8*.csv
## ================================================================================================================================

## read in Old_address
Old_address = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/old_address_intermediate1.orc".format(date_key))
# Old_address = wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_address_backup_before_8_folder/' + timestr + '/' + 'old_address_backup_before_8*.csv')

print('Old_address count BEFORE de-dupe on Account_No & Address_ID_old:', Old_address.select('Key').count()) # 17049231

# create a sequential index as Zohreh did a pandas reset_index at this step. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
Old_address = Old_address.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )
## de-dupe on 'Account_No', 'Address_ID_old', & keep FIRST based on index. To do in Spark: https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first
window = Window.partitionBy(['Account_No', 'Address_ID_old']).orderBy(f.col("index").asc()) ## ascending because we want to keep 'first'
Old_address = Old_address.withColumn('row', f.row_number().over(window)).filter(col('row') == 1).drop('row')
print('Old_address count AFTER de-dupe on Account_No & Address_ID_old:', Old_address.select('Key').count()) # 4122207
print('this is old address account no checking', Old_address.filter(f.col('Account_No') != '').select('Account_No').head(10))

## save the files
Old_address.write.orc(UAMS_PySpark_save_path+"phase_6/{}/old_address_intermediate2.orc".format(date_key), mode='overwrite', compression='snappy')
#wr.s3.to_csv(df = New_address, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_backup_before_8_3.csv')
# wr.s3.to_csv(df = Old_address, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_address_backup_before_8_3.csv')

# print('checking memory usage here to see if its ridiculous or not : ')
# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Megabytes):')
# print(usage)

del Old_address

Old_address count BEFORE de-dupe on Account_No & Address_ID_old: 17366684
Old_address count AFTER de-dupe on Account_No & Address_ID_old: 4439602
this is old address account no checking [Row(Account_No='80000008'), Row(Account_No='80000011'), Row(Account_No='80000013'), Row(Account_No='80000015'), Row(Account_No='80000032'), Row(Account_No='80000033'), Row(Account_No='80000050'), Row(Account_No='80000053'), Row(Account_No='80000056'), Row(Account_No='80000060')]


In [6]:
## ============================================================ THIS IS THE START OF Pipeline 8 - FINAL ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_8
# according to Fakhrul, the order for pipeline 8 is final_8_backup -> final_8_backup_2 -> final_8_backup_3 -> final_8
# new_fullfeed_path = s3://astro-groupdata-prod-source/new_fullfeed_uams_with_key/
# old_uams_temp_save_path = s3://astro-groupdata-prod-pip eline/address_standardization/uams_temp_final/
# new_address_temp_save_path = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/
## ================================================================================================================================

## read in the new_address & old_address
New_address = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/new_address_intermediate2.orc".format(date_key))
Old_address = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/old_address_intermediate2.orc".format(date_key))
# New_address = wr.s3.read_csv('s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_backup_before_8_2.csv')
# #Old_address = wr.s3.read_csv('s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_address_backup_before_8.csv')
# Old_address = wr.s3.read_csv('s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_address_backup_before_8_3.csv')

#revision - fakhrul - 13/8/22 - remove this below as we dont use it at all
#old_uams = wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_uams_temp_7.csv', usecols = ['Address_ID'], dtype = {'Address_ID': 'int'})

print('New_address shape', New_address.select('Key').count()) #  3713748
print('Old_address shape', Old_address.select('Key').count()) # 4439602

## Concat/union Old_address with New_address
print(Old_address.columns, New_address.columns)
New_AMS = Old_address.union(New_address)
# Frame = [Old_address,New_address]
# New_AMS = pd.concat(Frame)
print('checking new AMS shape', New_AMS.select('Key').count()) #  8153350

## select & rename columns
New_AMS = New_AMS.select(['Key', 'Address_ID_old', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City',
       'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State','UR_Flag']) 
New_AMS = New_AMS.withColumnRenamed('Address_ID_old','Address_ID')

print('checking head of new uams fullfeed : ', New_AMS.head(10))
print('checking head of account no new uams fullfeed : ', New_AMS.select('Account_No').head(10))
print('checking number of rows in acc no of new uams fullfeed : ', New_AMS.select('Account_No').count())
print('checking nan account no in new uams :', New_AMS.filter(f.col('Account_No').isNull()).count()) # 7581784
print('checking empty account no in new uams :', New_AMS.filter(f.col('Account_No') == '').count()) # 0
print('checking OBJID of new fullfeed: ', New_AMS.select('OBJID').distinct().show()) # 4776402

## clean postcode column (make into string, remove float \.0, select only first 5 digits, lpad with '0')
New_AMS = New_AMS.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
New_AMS = New_AMS.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )
print('heyyyyyyyyyyyyy', New_AMS.select(f.length(f.col('Postcode').cast('string'))).distinct().show())

## clean some other columns
New_AMS = New_AMS.withColumn("Address_ID", f.regexp_replace(f.col('Address_ID').cast('string'), '\.0', '') )
New_AMS = New_AMS.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )

#revision - fakhrul - 5/8/22 - remove the weird double space entry
New_AMS = New_AMS.withColumn('Address_Type', when(f.col('Building_Name') == '  ', 'SDU').otherwise(f.col('Address_Type')) )
New_AMS = New_AMS.withColumn("Building_Name", f.regexp_replace(f.col('Building_Name').cast('string'), '  ', '') )

print('this is P1 numbers :', New_AMS.filter(f.col('P_Flag') == 'P1').select('Account_No').count() )
print('this is P2 numbers :', New_AMS.filter(f.col('P_Flag') == 'P2').select('Account_No').count() )

New_AMS = New_AMS.filter(~f.col("Serviceable").cast('string').contains('ERROR'))
print('Count of New_AMS',New_AMS.select("Serviceable").count())
print('Unique serviceable values in New_AMS', New_AMS.select('Serviceable').distinct().show())

## save files
New_AMS.write.orc(UAMS_PySpark_save_path+"phase_6/{}/NEW_FULLFEED_UAMS_WITH_KEY_{}.orc".format(date_key, date_key), mode='overwrite', compression='snappy')
New_AMS.coalesce(1).write.csv(UAMS_PySpark_save_path+"phase_6/{}/NEW_FULLFEED_UAMS_WITH_KEY_{}.csv.gz".format(date_key, date_key), mode='overwrite', header=True, compression='gzip')
New_address.write.orc(UAMS_PySpark_save_path+"new_address_temp_8_{}.orc".format(date_key), mode='overwrite', compression='snappy') # this file doesn't seem to be used later on anywher

#wr.s3.to_csv(df = New_AMS, path = old_uams_temp_save_path + 'NEW_FULLFEED_UAMS_WITH_KEY_postcode_check_with_padding.csv')
# wr.s3.to_csv(df = New_AMS, path = new_fullfeed_path + 'NEW_FULLFEED_UAMS_WITH_KEY.csv')
#wr.s3.to_csv(df = New_AMS, path = new_address_temp_save_path + 'NEW_FULLFEED_UAMS_WITH_KEY.parquet')
#New_AMS.to_csv('NEW_FULLFEED_UAMS_WITH_KEY.csv') ## Save in pipeline bucket
#wr.s3.to_csv(df = old_uams, path = old_uams_temp_save_path + 'old_uams_temp_8.csv')
# wr.s3.to_csv(df = New_address, path = new_address_temp_save_path + 'new_address_temp_8.csv')


New_address shape 3713748
Old_address shape 4439602
['Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag', 'duplicate_key', 'index', 'Address_ID_old'] ['Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag', 'duplicate_key', 'index', 'Address_ID_old']
checking new AMS shape 8153350
checking head of new uams fullfeed :  [Row(Key='B2-18-03,KENWINGSTONSQUAREGARDENBLOKB2,PERSIARAN,BESTARI,,,CYBERJAYA,CYBER9,63000,SELANGOR', Address_ID='10000001.0', Account_No='', OBJID='', House_No='  B2-18-03', Building_Name='KENWINGSTON SQUARE GARDEN BLOK B2', Standard_Building_Name='NO', Street_Type='PERSIARAN', Street_Name='   BESTARI', Area='CYBER 9', City='CYBERJAYA', Postcode='63

## Pipeline 9

In [26]:
## ============================================================ THIS IS THE START OF Pipeline 9 - Backup 1 ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_9_backup
# according to Fakhrul, the order for pipeline 9 is final_9_backup -> final_9_backup_2 -> final_9_backup_3 -> final_9_backup_4 -> final_9_backup_5 -> final_9_backup_6 -> final_9
## ================================================================================================================================

## read in old_uams
# old_uams = spark.read.csv(UAMS_PySpark_save_path+"uploaded/Old_Fullfeed_UAMS-20220818(1).csv.gz", header=True) # from Qubole Zep (but it's not the same file as the old_uams read in earlier...)
old_uams = spark.read.csv(old_fullfeed_path, header=True)
print(old_uams.columns)
old_uams = old_uams.select(['Address_ID','Account_No','OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag'])
#old_uams = wr.s3.read_csv(path = old_uams_temp_read_path, usecols = ['Key','Address_ID','Account_No','OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag','duplicate_key'], dtype = {'Postcode': object})
# old_uams = wr.s3.read_csv(path = 's3://astro-groupdata-prod-source/old_fullfeed_uams_with_key/Old_FUllfeed_UAMS_18_Aug (1).csv', usecols = ['Address_ID','Account_No','OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag'], dtype = {'Postcode': object})

#revision - fakhrul - 20/6/22 - otherwise postcode of 08000 will be read as 8000
## clean postcode column (make into string, remove float \.0, select only first 5 digits, lpad with '0')
old_uams = old_uams.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
old_uams = old_uams.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )

## read in New_AMS
New_AMS = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/NEW_FULLFEED_UAMS_WITH_KEY_{}.orc".format(date_key, date_key))
New_AMS = New_AMS.select('Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name','Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City','Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State','UR_Flag')
# New_AMS = wr.s3.read_csv(path = new_fullfeed_with_key_read_path, usecols = ['Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name','Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City','Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State','UR_Flag'], dtype = {'Postcode':object})

#revision - fakhrul - 20/6/22 - otherwise postcode of 08000 will be read as 8000
## clean postcode column (make into string, remove float \.0, select only first 5 digits, lpad with '0')
New_AMS = New_AMS.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
New_AMS = New_AMS.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )

#revision - fakhrul - 27/6/22 removed address_id_x and duplicate_key columns here as prev we removed it. we shall see if it works
#New_address = wr.s3.read_csv(path = new_address_temp_read_path, usecols = ['index','Key','Account_No','OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag','Address_ID_y'], dtype = {'Postcode':object})
#print('shape of new address is: ', New_address.shape)

# ###  Compare New AMS and Old AMS
#revision - fakhrul - 20/6/22 - otherwise postcode of 08000 will be read as 8000
#New_address['Postcode']= New_address['Postcode'].astype(str).apply(lambda x:x[0:5])
#New_address['Postcode'] = New_address['Postcode'].str.pad(width=5, side='left', fillchar='0')  

print(old_uams.head(5))
#print(New_address.head())

### ---- Creating combined key (looks more like I'm deleting based on Account_No)
Old_AMS = old_uams

## first do some cleaning in both Old_AMS & New_AMS
Old_AMS = Old_AMS.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), 'nan', '') )
Old_AMS = Old_AMS.fillna('', subset='Account_No')
Old_AMS = Old_AMS.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )
Old_AMS = Old_AMS.withColumn("Address_ID", f.regexp_replace(f.col('Address_ID').cast('string'), '\.0', '') )

New_AMS = New_AMS.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), 'nan', '') )
New_AMS = New_AMS.fillna('', subset='Account_No')
New_AMS = New_AMS.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )
New_AMS = New_AMS.withColumn("Address_ID", f.regexp_replace(f.col('Address_ID').cast('string'), '\.0', '') )

## rename columns before the join (coz spark doesn't add suffixes to duplicate columns after joins)
Old_AMS = Old_AMS.withColumnRenamed("Account_No", "Account_No_old")
Merge_Old_New_ONID = Old_AMS.select(['Address_ID','Account_No_old']).join(New_AMS, on ='Address_ID', how = 'inner')
print('Merge_Old_New_ONID count post JOIN', Merge_Old_New_ONID.select('Account_No').count()) # 4089257

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage after the merging isssss (Bytes):')
# print(usage)

Updated_accounts = Merge_Old_New_ONID.filter(f.col('Account_No_old') != f.col('Account_No'))
print('Updated_accounts count post filter on Account_No_old != Account_No', Updated_accounts.select('Account_No').count()) # 644557

# create a sequential index as Zohreh did a pandas reset_index at this step. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
Updated_accounts = Updated_accounts.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )
## de-dupe on 'Account_No', 'Address_ID_old', & keep FIRST based on index. To do in Spark: https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first
window = Window.partitionBy(['Address_ID', 'Account_No']).orderBy(f.col("index").asc()) ## ascending because we want to keep 'first'
Updated_accounts = Updated_accounts.withColumn('row', f.row_number().over(window)).filter(col('row') == 1).drop('row')
print('Updated_accounts count AFTER de-dupe on Address_ID & Account_No:', Updated_accounts.select('Account_No').count()) # 644557

## select relevant columns & rename some
Updated_accounts = Updated_accounts.select(['Address_ID', 'Account_No_old', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City',
       'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State','UR_Flag'])
Updated_accounts = Updated_accounts.withColumnRenamed('Account_No_old','Account_No')

print('final updated accounts count:', Updated_accounts.count()) # 644557
print('checking Serviceable values in updated_accounts:', Updated_accounts.groupBy('Serviceable').count().orderBy('count', ascending=False).show())

## save files
Updated_accounts.write.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_Updated_accounts.orc".format(date_key), mode='overwrite', compression='snappy')
# wr.s3.to_csv(df = Updated_accounts, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/Updated_accounts_9_backup.csv')

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage after the merging isssss (Bytes):')
# print(usage)

['_c0', 'Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag']
[Row(Address_ID='2556933.0', Account_No='80000006.0', OBJID=None, House_No='        15', Building_Name=None, Standard_Building_Name='NO', Street_Type='LORONG', Street_Name='BAGAN JERMAL', Area=None, City='GEORGE TOWN', Postcode='10250', Address_Type='SDU', Serviceable='TM|FTTH', P_Flag='P1', State='PULAU PINANG', UR_Flag='URBAN'), Row(Address_ID='1043943.0', Account_No='80000008.0', OBJID=None, House_No='        46', Building_Name=None, Standard_Building_Name='NO', Street_Type='JALAN', Street_Name='  SS 14/5B', Area='SS 14', City='SUBANG JAYA', Postcode='47500', Address_Type='SDU', Serviceable='TM|FTTH', P_Flag='P1', State='SELANGOR', UR_Flag='URBAN'), Row(Address_ID='1232373.0', Account_No='80000011.0', OBJID=None, House_No='        63', Building_Name=None, Stand

In [27]:
## ============================================================ THIS IS THE START OF Pipeline 9 - Backup 2 ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_9_backup_2
# according to Fakhrul, the order for pipeline 9 is final_9_backup -> final_9_backup_2 -> final_9_backup_3 -> final_9_backup_4 -> final_9_backup_5 -> final_9_backup_6 -> final_9
# args = getResolvedOptions(sys.argv,
                        #   ['old_uams_temp_read_path', = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_uams_temp_7.csv
                        #   'new_fullfeed_with_key_read_path', = s3://astro-groupdata-prod-source/new_fullfeed_uams_with_key/NEW_FULLFEED_UAMS_WITH_KEY.csv
                        #   'new_address_temp_read_path']) = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_temp_8.csv
## ================================================================================================================================

# ### ---- Creating combined key
Old_AMS = Old_AMS.withColumnRenamed('Account_No_old', 'Account_No')
Old_AMS = Old_AMS.withColumn('combined_Key', f.concat_ws(",", f.col('Account_No').cast('string'), f.col('Address_ID').cast('string')) )
New_AMS = New_AMS.withColumn('combined_Key', f.concat_ws(",", f.col('Account_No').cast('string'), f.col('Address_ID').cast('string')) )

## rename columns to smoothen the join
New_AMS = New_AMS.withColumnRenamed('Address_ID', 'Address_ID_new')
Merge_Old_New_AMS = Old_AMS.join(New_AMS.select('Address_ID_new', 'combined_Key'), on ='combined_Key', how = 'left')
print('Old_AMS count:', Old_AMS.count(), 'New_AMS count:', New_AMS.count(), 'Merge_Old_New_AMS (after joining) count:', Merge_Old_New_AMS.count()) 
# Old_AMS count: 9435243 New_AMS count: 7581784 Merge_Old_New_AMS (after joining) count: 9435243

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage after the merging isssss (Bytes):')
# print(usage)

## filtering & dedupe
Removed_accounts = Merge_Old_New_AMS.filter(f.col('Address_ID_new').isNull())
print('removed_accounts (null Address ID new) count:', Removed_accounts.select("Address_ID").count()) # 5990543
Removed_accounts = Removed_accounts.filter(f.col('Account_No') != '')
print('removed_accounts after filtering out blank Account_No count:', Removed_accounts.select("Address_ID").count()) # 1493494

# create a sequential index as Zohreh did a pandas reset_index at this step. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
Removed_accounts = Removed_accounts.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )
## de-dupe on 'Account_No' & keep FIRST based on index. To do in Spark: https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first
window = Window.partitionBy(['Account_No']).orderBy(f.col("index").asc()) ## ascending because we want to keep 'first'
Removed_accounts = Removed_accounts.withColumn('row', f.row_number().over(window)).filter(col('row') == 1).drop('row')
print('removed_accounts after dedupe on Account_No, keep first count:', Removed_accounts.select("Address_ID").count())  # 1486736

## select & rename columns
Removed_accounts = Removed_accounts.select([ 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City',
       'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State','UR_Flag'])

print('checking final removed accs count : ', Removed_accounts.count()) # 1486736

## save files
Removed_accounts.write.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_Removed_accounts.orc".format(date_key), mode='overwrite', compression='snappy')
# wr.s3.to_csv(df = Removed_accounts, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/Removed_accounts_9_backup_2.csv')

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage after the merging isssss (Bytes):')
# print(usage)


Old_AMS count: 9361290 New_AMS count: 8153350 Merge_Old_New_AMS (after joining) count: 10095118
removed_accounts (null Address ID new) count: 4350136
removed_accounts after filtering out blank Account_No count: 1355526
removed_accounts after dedupe on Account_No, keep first count: 1355140
checking final removed accs count :  1355140


In [28]:
## ============================================================ THIS IS THE START OF Pipeline 9 - Backup 3 ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_9_backup_3
# according to Fakhrul, the order for pipeline 9 is final_9_backup -> final_9_backup_2 -> final_9_backup_3 -> final_9_backup_4 -> final_9_backup_5 -> final_9_backup_6 -> final_9
# args = getResolvedOptions(sys.argv,
                        #   ['old_uams_temp_read_path', = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_uams_temp_7.csv
                        #   'new_fullfeed_with_key_read_path', = s3://astro-groupdata-prod-source/new_fullfeed_uams_with_key/NEW_FULLFEED_UAMS_WITH_KEY.csv
                        #   'new_address_temp_read_path']) = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_temp_8.csv
## ================================================================================================================================

## rename columns for a smoother join
Old_AMS = Old_AMS.withColumnRenamed('Serviceable', 'Serviceable_old')
Inner_Old_New_AMS = Old_AMS.select(['combined_Key','Serviceable_old']).join(New_AMS, on ='combined_Key', how = 'inner')
print('Inner_Old_New_AMS (after join) count:', Inner_Old_New_AMS.select('Serviceable').count()) # 3444700

## filtering & dedupe
Updated_serviceability = Inner_Old_New_AMS.filter(f.col('Serviceable') != f.col('Serviceable_old'))
print('Updated_serviceability after filter for records where Serviceable != Serviceable_old count:', Updated_serviceability.select("Serviceable").count())  # 299059

# create a sequential index as Zohreh did a pandas reset_index at this step. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
Updated_serviceability = Updated_serviceability.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )
## de-dupe on combined_Key & keep FIRST based on index. To do in Spark: https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first
window = Window.partitionBy(['combined_Key']).orderBy(f.col("index").asc()) ## ascending because we want to keep 'first'
Updated_serviceability = Updated_serviceability.withColumn('row', f.row_number().over(window)).filter(col('row') == 1).drop('row')
print('Updated_serviceability after dedupe on combined_Key, keep first count:', Updated_serviceability.select("Serviceable").count())  # 299059

## rename & select columns
Updated_serviceability = Updated_serviceability.withColumnRenamed('Address_ID_new', 'Address_ID')
Updated_serviceability = Updated_serviceability.select('Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City',
       'Postcode', 'Address_Type', 'Serviceable_old', 'P_Flag', 'State','UR_Flag')
Updated_serviceability = Updated_serviceability.withColumnRenamed('Serviceable_old', 'Serviceable')
print('updated serviceability checking :', Updated_serviceability.groupBy('Serviceable').count().orderBy('count', ascending=False).show())

## save files
Updated_serviceability.write.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_Updated_serviceability.orc".format(date_key), mode='overwrite', compression='snappy')
# wr.s3.to_csv(df = Updated_serviceability, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/Updated_serviceability_9_backup_3.csv')

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage after the merging isssss (Bytes):')
# print(usage)

Inner_Old_New_AMS (after join) count: 5744982
Updated_serviceability after filter for records where Serviceable != Serviceable_old count: 564921
Updated_serviceability after dedupe on combined_Key, keep first count: 564173
+--------------------+------+
|         Serviceable| count|
+--------------------+------+
|             TM|FTTH|233866|
|             TM|VDSL|203343|
|          Maxis|FTTH| 27204|
|           ALLO|FTTH| 26141|
|   TM|FTTH,ALLO|FTTH| 23749|
|    TM|FTTH,CTS|FTTH| 20946|
|  TM|FTTH,Maxis|FTTH| 16737|
|            CTS|FTTH|  6982|
|  TM|VDSL,Maxis|FTTH|  3733|
|  TM|FTTH,Maxis|VDSL|   575|
|  TM|VDSL,Maxis|VDSL|   434|
|          Maxis|VDSL|   428|
|   TM|VDSL,ALLO|FTTH|    18|
|TM|FTTH,Maxis|FTT...|     8|
|TM|VDSL,Maxis|FTT...|     6|
|TM|VDSL,Maxis|FTT...|     1|
|TM|VDSL,Maxis|VDS...|     1|
|    TM|VDSL,CTS|FTTH|     1|
+--------------------+------+

updated serviceability checking : None


In [29]:
## ============================================================ THIS IS THE START OF Pipeline 9 - Backup 4 ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_9_backup_4
# according to Fakhrul, the order for pipeline 9 is final_9_backup -> final_9_backup_2 -> final_9_backup_3 -> final_9_backup_4 -> final_9_backup_5 -> final_9_backup_6 -> final_9
# args = getResolvedOptions(sys.argv,
                        #   ['old_uams_temp_read_path', = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_uams_temp_7.csv
                        #   'new_fullfeed_with_key_read_path', = s3://astro-groupdata-prod-source/new_fullfeed_uams_with_key/NEW_FULLFEED_UAMS_WITH_KEY.csv
                        #   'new_address_temp_read_path']) = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_temp_8.csv
## ================================================================================================================================

print('unique serviceability of new : ', New_AMS.select('Serviceable').distinct().show())
print('unique serviceability of old : ', old_uams.select('Serviceable').distinct().show())

#revision - 22/8/22 - only for this run (should I replicate if it was only for the 22/8/22 run?)
New_AMS = New_AMS.withColumnRenamed('Address_ID_new','Address_ID')
## do a left antijoin (want to remove Address_ID that are in Old_AMS from New_AMS
New_address = New_AMS.join(Old_AMS, on='Address_ID', how='leftanti')
print('this is new address shape', New_address.select('Address_ID').count()) # 3492527

# ###  Compare New AMS and Old AMS
# revision - fakhrul - 20/6/22 - otherwise postcode of 08000 will be read as 8000
New_address = New_address.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
New_address = New_address.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )

Old_AMS = Old_AMS.withColumn('combined_Key', f.concat_ws(",", f.col('Account_No').cast('string'), f.col('Address_ID').cast('string')) )
New_AMS = New_AMS.withColumn('combined_Key', f.concat_ws(",", f.col('Account_No').cast('string'), f.col('Address_ID').cast('string')) )

New_address = New_address.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )
New_address = New_address.withColumn("Address_ID", f.regexp_replace(f.col('Address_ID').cast('string'), '\.0', '') )

New_address = New_address.select([ 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City',
       'Postcode','State', 'Address_Type', 'Serviceable', 'P_Flag', 'UR_Flag'])
# New_address = New_address.withColumnRenamed('Address_ID_old','Address_ID')
print('new address serviceability checking :', New_address.groupBy('Serviceable').count().orderBy('count', ascending=False).show())

## save files
New_address.write.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_New_address.orc".format(date_key), mode='overwrite', compression='snappy')
# wr.s3.to_csv(df = New_address, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/New_address_9_backup_4.csv')

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage after the merging isssss (Bytes):')
# print(usage)

+--------------------+
|         Serviceable|
+--------------------+
|TM|VDSL,ALLO|FTTH...|
|            ,TM|FTTH|
|    CTS|FTTH,TM|FTTH|
|            TM|VDSL,|
|maxis|FTTH,ALLO|FTTH|
|   ALLO|FTTH,TM|FTTH|
|             TM|VDSL|
|maxis|FTTH,ALLO|F...|
|TM|VDSL,maxis|FTT...|
|          maxis|VDSL|
|  maxis|VDSL,TM|FTTH|
|TM|VDSL,maxis|FTT...|
|TM|LOT 411 JALAN ...|
|           ALLO|FTTH|
|   TM|VDSL,ALLO|FTTH|
|    TM|VDSL,CTS|FTTH|
|             TM|FTTH|
|  Maxis|FTTH,TM|FTTH|
|          Maxis|FTTH|
|  TM|VDSL,maxis|FTTH|
+--------------------+
only showing top 20 rows

unique serviceability of new :  None
+-------------+
|  Serviceable|
+-------------+
|"   ""3-15-2"|
|"     ""3-04"|
|"     ""5-1A"|
|"  ""A-04-04"|
|" ""A1-02-02"|
|       KLUANG|
|"   ""26-4-5"|
|"  ""5-09-30"|
|"  ""7540-23"|
|" ""60-05-02"|
|"    ""2-1-9"|
|"    ""1-322"|
|"   ""40-2-3"|
|"     ""17-7"|
|"   ""7-1-27"|
|"   ""7545-4"|
|"     ""1-39"|
|"     ""3-20"|
|"    ""17-07"|
|"    ""06-07"|
+-------------+
o

In [30]:
## ============================================================ THIS IS THE START OF Pipeline 9 - Backup 5 ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_9_backup_5
# according to Fakhrul, the order for pipeline 9 is final_9_backup -> final_9_backup_2 -> final_9_backup_3 -> final_9_backup_4 -> final_9_backup_5 -> final_9_backup_6 -> final_9
# args = getResolvedOptions(sys.argv,
                        #   ['old_uams_temp_read_path', = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_uams_temp_7.csv
                        #   'new_fullfeed_with_key_read_path', = s3://astro-groupdata-prod-source/new_fullfeed_uams_with_key/NEW_FULLFEED_UAMS_WITH_KEY.csv
                        #   'new_address_temp_read_path']) = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_temp_8.csv
## ================================================================================================================================


## read in old_uams
# old_uams = spark.read.csv(UAMS_PySpark_save_path+"uploaded/Old_Fullfeed_UAMS-20220818(1).csv.gz", header=True) # from Qubole Zep (but it's not the same file as the old_uams read in earlier...)
old_uams = spark.read.csv(old_fullfeed_path, header=True)

print(old_uams.columns)
old_uams = old_uams.select(['Address_ID','Account_No','OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag'])
# old_uams = wr.s3.read_csv(path = 's3://astro-groupdata-prod-source/old_fullfeed_uams_with_key/Old_FUllfeed_UAMS_18_Aug (1).csv', usecols = ['Address_ID','Account_No','OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag'], dtype = {'Postcode': object})
#old_uams = wr.s3.read_csv(path = old_uams_temp_read_path, usecols = ['Key','Address_ID','Account_No','OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag','duplicate_key'], dtype = {'Postcode': object})

#revision - fakhrul - 20/6/22 - otherwise postcode of 08000 will be read as 8000
## clean postcode column (make into string, remove float \.0, select only first 5 digits, lpad with '0')
old_uams = old_uams.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
old_uams = old_uams.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )

## read in New_AMS
# New_AMS = spark.read.orc(UAMS_PySpark_save_path+"NEW_FULLFEED_UAMS_WITH_KEY_{}.orc".format(date_key))
New_AMS = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/NEW_FULLFEED_UAMS_WITH_KEY_{}.orc".format(date_key, date_key))
New_AMS = New_AMS.select('Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name','Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City','Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State','UR_Flag')
# New_AMS = wr.s3.read_csv(path = new_fullfeed_with_key_read_path, usecols = ['Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name','Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City','Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State','UR_Flag'], dtype = {'Postcode':object})

#revision - fakhrul - 20/6/22 - otherwise postcode of 08000 will be read as 8000
## clean postcode column (make into string, remove float \.0, select only first 5 digits, lpad with '0')
New_AMS = New_AMS.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
New_AMS = New_AMS.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )

#revision - fakhrul - 27/6/22 removed address_id_x and duplicate_key columns here as prev we removed it. we shall see if it works
#New_address = wr.s3.read_csv(path = new_address_temp_read_path, usecols = ['index','Key','Account_No','OBJID','House_No','Building_Name','Standard_Building_Name','Street_Type','Street_Name','Area','City','Postcode','Address_Type','Serviceable','P_Flag','State','UR_Flag','Address_ID_y'], dtype = {'Postcode':object})
#print('shape of new address is: ', New_address.shape)

# ###  Compare New AMS and Old AMS
#revision - fakhrul - 20/6/22 - otherwise postcode of 08000 will be read as 8000
#New_address['Postcode']= New_address['Postcode'].astype(str).apply(lambda x:x[0:5])
#New_address['Postcode'] = New_address['Postcode'].str.pad(width=5, side='left', fillchar='0')  

print(old_uams.head(5))
#print(New_address.head())

### ---- Creating combined key
Old_AMS = old_uams

## first do some cleaning in both Old_AMS & New_AMS
Old_AMS = Old_AMS.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), 'nan', '') )
Old_AMS = Old_AMS.fillna('', subset='Account_No')
Old_AMS = Old_AMS.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )
Old_AMS = Old_AMS.withColumn("Address_ID", f.regexp_replace(f.col('Address_ID').cast('string'), '\.0', '') )

New_AMS = New_AMS.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), 'nan', '') )
New_AMS = New_AMS.fillna('', subset='Account_No')
New_AMS = New_AMS.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )
New_AMS = New_AMS.withColumn("Address_ID", f.regexp_replace(f.col('Address_ID').cast('string'), '\.0', '') )

Old_AMS = Old_AMS.withColumn('combined_Key', f.concat_ws(",", f.col('Account_No').cast('string'), f.col('Address_ID').cast('string')) )
New_AMS = New_AMS.withColumn('combined_Key', f.concat_ws(",", f.col('Account_No').cast('string'), f.col('Address_ID').cast('string')) )

## Revision 18 Aug 2022 --> when inner joining Old and New AMS on the combined Address_ID+Account_No --> if the UR_Flag value is different on the result of merging, it means UR_Flag is updated for these records
# rename columns for smoother join
Old_AMS = Old_AMS.withColumnRenamed('UR_Flag', 'UR_Flag_old')
Inner_Old_New_AMS = Old_AMS.select(['combined_Key','UR_Flag_old']).join(New_AMS, on ='combined_Key', how = 'inner')
print('Inner_Old_New_AMS (after join):', Inner_Old_New_AMS.select('UR_Flag').count()) # 3444700

Updated_Location = Inner_Old_New_AMS.filter(f.col('UR_Flag_old')!= f.col('UR_Flag'))
print('Updated_Location after filtering UR_Flag_old != UR_Flag:', Updated_Location.select('UR_Flag').count()) # 627367

## Revision 18 Aug 2022
# create a sequential index as Zohreh did a pandas reset_index at this step. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
Updated_Location = Updated_Location.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )
## de-dupe on combined_Key & keep FIRST based on index. To do in Spark: https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first
window = Window.partitionBy(['combined_Key']).orderBy(f.col("index").asc()) ## ascending because we want to keep 'first'
Updated_Location = Updated_Location.withColumn('row', f.row_number().over(window)).filter(col('row') == 1).drop('row')
print('Updated_Location after dedupe on combined_Key, keep first count:', Updated_Location.select("Serviceable").count())  # 627367

## Revision 18 Aug 2022
## select columns (for the UR_Flag, use the ones from New AMS)
Updated_Location = Updated_Location.select([ 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City',
       'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State','UR_Flag'])

print('Updated_Location checking :', Updated_Location.groupBy('Serviceable').count().orderBy('count', ascending=False).show())

## save files
Updated_Location.write.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_Updated_location.orc".format(date_key), mode='overwrite', compression='snappy')
# wr.s3.to_csv(df = Updated_Location, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/Updated_location_9_5.csv')

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage after the merging isssss (Bytes):')

['_c0', 'Key', 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag']
[Row(Address_ID='2556933.0', Account_No='80000006.0', OBJID=None, House_No='        15', Building_Name=None, Standard_Building_Name='NO', Street_Type='LORONG', Street_Name='BAGAN JERMAL', Area=None, City='GEORGE TOWN', Postcode='10250', Address_Type='SDU', Serviceable='TM|FTTH', P_Flag='P1', State='PULAU PINANG', UR_Flag='URBAN'), Row(Address_ID='1043943.0', Account_No='80000008.0', OBJID=None, House_No='        46', Building_Name=None, Standard_Building_Name='NO', Street_Type='JALAN', Street_Name='  SS 14/5B', Area='SS 14', City='SUBANG JAYA', Postcode='47500', Address_Type='SDU', Serviceable='TM|FTTH', P_Flag='P1', State='SELANGOR', UR_Flag='URBAN'), Row(Address_ID='1232373.0', Account_No='80000011.0', OBJID=None, House_No='        63', Building_Name=None, Stand

In [31]:
## ============================================================ THIS IS THE START OF Pipeline 9 - Backup 6 ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_9_backup_6
# according to Fakhrul, the order for pipeline 9 is final_9_backup -> final_9_backup_2 -> final_9_backup_3 -> final_9_backup_4 -> final_9_backup_5 -> final_9_backup_6 -> final_9
# args = getResolvedOptions(sys.argv,
                        #   ['old_uams_temp_read_path', = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/old_uams_temp_7.csv
                        #   'new_fullfeed_with_key_read_path', = s3://astro-groupdata-prod-source/new_fullfeed_uams_with_key/NEW_FULLFEED_UAMS_WITH_KEY.csv
                        #   'new_address_temp_read_path']) = s3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/new_address_temp_8.csv
## ================================================================================================================================

## Revision 18 Aug 2022 --> when inner joining Old and New AMS on the combined Address_ID+Account_No --> if the P_Flag value is different on the result of mergging, it means P_Flag is updated for these records
Old_AMS_notnull = Old_AMS.filter(f.col('P_Flag').isNotNull())
print('checking old ams notnull count', Old_AMS.select("P_Flag").count()) # 9435243
# rename columns for smoother join
Old_AMS = Old_AMS.withColumnRenamed("P_Flag", "P_Flag_old")
Inner_Old_New_AMS = Old_AMS.select('combined_Key','P_Flag_old').join(New_AMS, on ='combined_Key', how = 'inner')
print('Inner_Old_New_AMS (after join):', Inner_Old_New_AMS.select('P_Flag').count()) # 3444700
Updated_P_Flag = Inner_Old_New_AMS.filter(f.col('P_Flag_old') != f.col('P_Flag'))
print('Updated_P_Flag after filtering P_Flag_old != P_Flag:', Updated_P_Flag.select('P_Flag').count()) # 95880

## Revision 18 Aug 2022
# create a sequential index as Zohreh did a pandas reset_index at this step. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
Updated_P_Flag = Updated_P_Flag.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )
## de-dupe on combined_Key & keep FIRST based on index. To do in Spark: https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first
window = Window.partitionBy(['combined_Key']).orderBy(f.col("index").asc()) ## ascending because we want to keep 'first'
Updated_P_Flag = Updated_P_Flag.withColumn('row', f.row_number().over(window)).filter(col('row') == 1).drop('row')
print('Updated_P_Flag after dedupe on combined_Key, keep first count:', Updated_P_Flag.select("Serviceable").count())  # 95880

## Revision 18 Aug 2022
## select columns (for the P_Flag, use the ones from New AMS)
Updated_P_Flag = Updated_P_Flag.select([ 'Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City',
       'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State','UR_Flag'])

print('updated p flag count is : ', Updated_P_Flag.select('Serviceable').count()) # 95880

## save files
Updated_P_Flag.write.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_Updated_P_Flag.orc".format(date_key), mode='overwrite', compression='snappy')
# wr.s3.to_csv(df = Updated_P_Flag, path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/Updated_P_Flag_9_6.csv')

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage after the merging isssss (Bytes):') 

checking old ams notnull count 9361290
Inner_Old_New_AMS (after join): 5744982
Updated_P_Flag after filtering P_Flag_old != P_Flag: 17508
Updated_P_Flag after dedupe on combined_Key, keep first count: 17508
updated p flag count is :  17508


### Pipeline 9 - FINAL

In [3]:
## ============================================================ THIS IS THE START OF Pipeline 9 - FINAL ============================================================
# taken from Glue Job: address_standardization-prod-uams_generation_final_9
# according to Fakhrul, the order for pipeline 9 is final_9_backup -> final_9_backup_2 -> final_9_backup_3 -> final_9_backup_4 -> final_9_backup_5 -> final_9_backup_6 -> final_9
# args = getResolvedOptions(sys.argv,
                        #   'uams_bucket_path', = s3://amsdatabucket/
                        #   'date', = 22_August_2022
                        #   'shamani_data_read_path', = s3://amsdatabucket/Sales Team Data/full_dump-2022-07-21.csv
                        #   'target_bucket_path']) = s3://astro-groupdata-prod-target/address_standardization/
## ================================================================================================================================

### Read all the files created in Pipeline 9 back in
Updated_serviceability = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_Updated_serviceability.orc".format(date_key))
# Updated_serviceability = wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/Updated_serviceability_9_backup_3.csv')
print('updated servc count', Updated_serviceability.select('Serviceable').count()) # updated servc count 299059

Updated_accounts = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_Updated_accounts.orc".format(date_key))
# Updated_accounts = wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/Updated_accounts_9_backup.csv')
print('updated accs count', Updated_accounts.select('Serviceable').count()) # updated accs count 644557

New_address = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_New_address.orc".format(date_key))
# New_address = wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/New_address_9_backup_4.csv')
print('new address shape', New_address.select('Serviceable').count()) # new address shape 3492527

Removed_accounts = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_Removed_accounts.orc".format(date_key))
# Removed_accounts = wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/Removed_accounts_9_backup_2.csv')
print('removed accs shape', Removed_accounts.select('Serviceable').count()) # removed accs shape 1486736

Updated_P_Flag = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_Updated_P_Flag.orc".format(date_key))
# Updated_P_Flag = wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/Updated_P_Flag_9_6.csv')
print('updated p flag shape', Updated_P_Flag.select('Serviceable').count()) # updated p flag shape 95880
print('updated p flag columns', Updated_P_Flag.columns)
print('updated p flag unique', Updated_P_Flag.select('P_Flag').distinct().show())

Updated_Location = spark.read.orc(UAMS_PySpark_save_path+"phase_6/{}/pipeline9_Updated_location.orc".format(date_key))
# Updated_Location = wr.s3.read_csv(path = 's3://astro-groupdata-prod-pipeline/address_standardization/uams_temp_final/Updated_location_9_5.csv')
print('updated location shape', Updated_Location.select('Serviceable').count()) # 627367

## rearrange the columns for all DFs to ensure Union step is clean
Updated_P_Flag = Updated_P_Flag.select('Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag')
Updated_Location = Updated_Location.select('Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag')
Updated_serviceability = Updated_serviceability.select('Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag')
Updated_accounts = Updated_accounts.select('Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag')
New_address = New_address.select('Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag')
    
## concat all the above DFs except removed_accounts
Delta = Updated_P_Flag.union(Updated_Location).union(Updated_serviceability).union(Updated_accounts).union(New_address)
# old code: Frame = [Updated_P_Flag, Updated_Location, Updated_serviceability, Updated_accounts, New_address]
# old code: Delta = pd.concat(Frame)
Delta = Delta.drop_duplicates()
print('this is checking updated serviceability, updated accounts and new address: ', Updated_serviceability.count(), Updated_accounts.count(), New_address.count()) # 299059 644557 3492527

Delta = Delta.fillna("")
Removed_accounts = Removed_accounts.fillna("")

Delta = Delta.select(['Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City',
'Postcode', 'State','Address_Type', 'Serviceable', 'P_Flag', 'UR_Flag'])
Removed_accounts = Removed_accounts.select(['Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City',
'Postcode', 'State','Address_Type', 'Serviceable', 'P_Flag', 'UR_Flag'])

print(Delta.select(f.length(f.col('Account_No').cast('string'))).distinct().show())
print(Delta.select(f.length(f.col('Postcode').cast('string'))).distinct().show())

Delta = Delta.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )
Delta = Delta.withColumn("OBJID", f.regexp_replace(f.col('OBJID').cast('string'), '\.0', '') )
Delta = Delta.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
Delta = Delta.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )

Removed_accounts = Removed_accounts.withColumn("Account_No", f.regexp_replace(f.col('Account_No').cast('string'), '\.0', '') )
Removed_accounts = Removed_accounts.withColumn("OBJID", f.regexp_replace(f.col('OBJID').cast('string'), '\.0', '') )
Removed_accounts = Removed_accounts.withColumn("Postcode", f.regexp_replace(f.col('Postcode').cast('string'), '\.0', '') )
Removed_accounts = Removed_accounts.withColumn("Postcode", f.lpad(f.substring(f.col('Postcode'), 1, 5), 5, '0') )

Delta = Delta.withColumn('Address_Type', when(f.col('Building_Name')=='', 'SDU').otherwise('MDU') )
# create a sequential index as Zohreh did a pandas reset_index at this step. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
Delta = Delta.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )

## extra cleaning after finding some weird values in Serviceable column
Delta = Delta.withColumn('Serviceable', f.upper(f.col("Serviceable").cast('string')) )
Delta = Delta.withColumn( 'Serviceable', when((f.col('Serviceable').contains('|FTTH') | f.col('Serviceable').contains('|VDSL')), f.col('Serviceable')).otherwise(f.lit('TM|FTTH')) )  ## fix cases where there are no FTTH or VDSL in the name - they are all TM & I checked they are FTTH. In the future, TM is only laying FTTH, so this line of code can continue to be used

## add astrofibre FTTH into Serviceable column whenever it's TM|FTTH 
Delta = Delta.withColumn('Serviceable', when( f.col("Serviceable").contains('TM|FTTH'), f.concat(f.lit('AstroFibre|FTTH,'), f.col('Serviceable')) ).otherwise(f.col('Serviceable')) )
# Delta = Delta.withColumn('Serviceable', when( f.col("Serviceable").contains('TM|FTTH'), f.lit('AstroFibre|FTTH,'+ f.col('Serviceable')) ).otherwise(f.col('Serviceable')) )
# ORI_CODE --> add_astrofiber_delta= Delta[Delta['Serviceable'].str.contains('[Tt][Mm]\|FTTH')].index  # add_astrofiber_delta = list(add_astrofiber_delta)  # Delta.loc[add_astrofiber_delta,'Serviceable'] = 'AstroFibre|FTTH,' + Delta['Serviceable'].astype(str).str.upper()

## ## extra cleaning after finding some weird values in Serviceable column
Delta = Delta.withColumn('Serviceable', f.regexp_replace(f.col('Serviceable'), ',,', ',')) ## fix cases with double commas
Delta = Delta.withColumn( 'Serviceable', when( f.col('Serviceable').endswith(','), f.expr("substring(Serviceable, 1, length(Serviceable) - 1)") ).otherwise(f.col('Serviceable')) )  ## fix cases where the value ends in a comma ## a bit slow coz it uses f.expr

print('checking delta numbers here : ', Delta.select('Account_No').count())
print('checking account no of delta :', Delta.select('Account_No').head(10))
# print('checking number of null account number in delta: ', Delta.filter(f.col('Account_No').isNull()).count()) # 0
print('checking number of null account number in delta: ', Delta.select('Account_No').filter(f.col('Account_No')=='').count()) # 3056868

# create a sequential index as Zohreh did a pandas reset_index at this step. To do it in Spark: https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe
Removed_accounts = Removed_accounts.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id())) )

## extra cleaning after finding some weird values in Serviceable column
Removed_accounts = Removed_accounts.withColumn('Serviceable', f.upper(f.col("Serviceable").cast('string')) )
Removed_accounts = Removed_accounts.withColumn( 'Serviceable', when((f.col('Serviceable').contains('|FTTH') | f.col('Serviceable').contains('|VDSL')), f.col('Serviceable')).otherwise(f.lit('TM|FTTH')) )  ## fix cases where there are no FTTH or VDSL in the name - they are all TM & I checked they are FTTH. In the future, TM is only laying FTTH, so this line of code can continue to be used

## add astrofibre FTTH into Serviceable column whenever it's TM|FTTH 
Removed_accounts = Removed_accounts.withColumn('Serviceable', when( f.col("Serviceable").contains('TM|FTTH'), f.concat(f.lit('AstroFibre|FTTH,'), f.col('Serviceable')) ).otherwise(f.col('Serviceable')) )
# Removed_accounts = Removed_accounts.withColumn('Serviceable', when( f.col("Serviceable").contains('TM|FTTH'), f.lit('AstroFibre|FTTH'+ f.col('Serviceable')) ).otherwise(f.col('Serviceable')) )
# ORI_CODE --> add_astrofiber_remove= Removed_accounts[Removed_accounts['Serviceable'].str.contains('TM\|FTTH')].index
# add_astrofiber_remove = list(add_astrofiber_remove)
# Removed_accounts.loc[add_astrofiber_remove,'Serviceable'] = 'AstroFibre|FTTH,' + Removed_accounts['Serviceable'].astype(str)

## extra cleaning after finding some weird values in Serviceable column
Removed_accounts = Removed_accounts.withColumn('Serviceable', f.regexp_replace(f.col('Serviceable'), ',,', ',')) ## fix cases with double commas
Removed_accounts = Removed_accounts.withColumn( 'Serviceable', when( f.col('Serviceable').endswith(','), f.expr("substring(Serviceable, 1, length(Serviceable) - 1)") ).otherwise(f.col('Serviceable')) )  ## fix cases where the value ends in a comma ## a bit slow coz it uses f.expr

print('checking removed_accounts count here: ', Removed_accounts.select('Account_No').count()) # 1486736

#Delta.to_csv('DELTA_ADD_UPDATE_UAMS_16Mar2022.csv') ## Save in UAMS bucket
#Removed_accounts.to_csv('DELTA_DELETE_UAMS_16Mar2022.csv')

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Bytes):')
# print(usage)

print('this is delta serviceable unique values : ', Delta.groupBy('Serviceable').count().orderBy('count', ascending=False).show(30, False))
print('this is Removed_accounts serviceable unique values : ', Removed_accounts.groupBy('Serviceable').count().orderBy('count', ascending=False).show(30, False))

#Delta.to_csv('DELTA_ADD_UPDATE_UAMS_16Mar2022.csv') ## Save in UAMS bucket
#wr.s3.to_csv(df = Delta, path = uams_bucket_path + 'DELTA_ADD_UPDATE_UAMS_' + str(date) + '.csv')

#Removed_accounts.to_csv('DELTA_DELETE_UAMS_16Mar2022.csv')
#wr.s3.to_csv(df = Removed_accounts, path = uams_bucket_path + 'DELTA_DELETE_UAMS_' + str(date) + '.csv') 

updated servc count 564173
updated accs count 2586838
new address shape 155399
removed accs shape 1355140
updated p flag shape 17508
updated p flag columns ['Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'Address_Type', 'Serviceable', 'P_Flag', 'State', 'UR_Flag']
+--------+
|  P_Flag|
+--------+
|CTS_file|
|        |
|      P1|
|      P2|
+--------+

updated p flag unique None
updated location shape 1059205
this is checking updated serviceability, updated accounts and new address:  564173 2586838 155399
+----------------------------------+
|length(CAST(Account_No AS STRING))|
+----------------------------------+
|                                 0|
|                                 8|
|                                 5|
+----------------------------------+

None
+--------------------------------+
|length(CAST(Postcode AS STRING))|
+--------------------------------+
|                

In [4]:
## Revision 17MAy2022
print('checking removed accounts info here----------------',Removed_accounts.select('Account_No').count(), Removed_accounts.columns) # 1486736
print('testing reading shamani data here')
#Shamani_data = pd.read_csv('prodDump.csv')
# Shamani = wr.s3.read_csv('s3://astro-datalake-prod-sandbox/amzar/BB/other_adhoc/uploaded/address_checking/Shamani_data/dump_20220915.csv', dtype=str) # Qubole read path
# Shamani = wr.s3.read_csv(path = shamani_data_read_path)

# # read in shamani data # added on 24/11/2022

# shamani_data = glueContext.create_dynamic_frame_from_options(

#     connection_type = 's3',
#     connection_options = {'paths' : [shamani_data_read_path]},
#     format = 'csv',
#     format_options = {'withHeader':True}

# ) # read in dynamic 

# Shamani = shamani_data.toDF() # convert DynamicFrame to Spark DF first
Shamani = spark.read.csv(shamani_data_read_path, header=True) # read in the data as pyspark DF
Shamani = Shamani.toPandas() # convert Spark DF to Pandas DF to allow for pandas code

#revision - zohreh - 2/8/22 - splitting objid-----------------------------------------------------------------------------------------------------------------------------------

s1 = Shamani.acc_nos.str.split(',', expand=True).stack().str.strip()
s2 = Shamani.obj_ids.str.split(',', expand=True).stack().str.strip()
Shamani_V1 = pd.concat([s1,s2], axis=1, keys=['acc_nos','obj_ids'])
print('Shamani_V1 shape:', Shamani_V1.shape)
Shamani_V1 = Shamani_V1.reset_index()
Shamani_V1 = Shamani_V1[['acc_nos','obj_ids']]

Shamani_V2= Shamani.set_index(Shamani.columns.drop('acc_nos',1).tolist()).acc_nos.str.split(',', expand=True).stack().reset_index().rename(columns={0:'acc_nos'}).loc[:, Shamani.columns]
Shamani_V2 = Shamani_V2.drop(['obj_ids','acc_nos'], axis=1)

S1 = Shamani_V2['Address_ID']

Shamani_V3 = pd.concat([S1,Shamani_V1], axis=1)

Shamani_Final = pd.merge(Shamani_V3, Shamani_V2, on="Address_ID", how="left")
print('Shamani_Final shape:', Shamani_Final.shape)

Shamani_Final = Shamani_Final[Shamani_Final['Address_ID'].notnull()]

Shamani_Final['Address_ID'] = Shamani_Final['Address_ID'].astype(int)

Shamani_Final = Shamani_Final.drop_duplicates()

Shamani_data = Shamani_Final
#------------------------------------------------------------------------------------------

Shamani_data['acc_nos'] = Shamani_data['acc_nos'].astype(str)
Shamani_data['acc_nos'] = Shamani_data['acc_nos'].str.replace('\.0','', case = False)

Shamani_data['acc_nos'] = Shamani_data['acc_nos'].apply(lambda x : "" if len(x)!= 8 else x)


Shamani_data['Standard_Building_Name'] = 'NO'
Shamani_data.loc[Shamani_data["Building_Name"].notnull(),"Standard_Building_Name"] = 'YES'


Shamani_data = Shamani_data.fillna("")

Shamani_data = Shamani_data.rename(columns={'acc_nos':'Account_No','state':'State'
                                            ,'obj_ids':'OBJID'})
Shamani_data['OBJID'] = Shamani_data['OBJID'].astype(str)
Shamani_data['OBJID']  = Shamani_data['OBJID'].str.replace('\.0','', case = False)

Shamani_data['Postcode'] = Shamani_data['Postcode'].map(str).apply(lambda x: x.zfill(5))

Shamani_data['Postcode'] = Shamani_data['Postcode'].str.pad(width=5)

Shamani_data = Shamani_data[['Address_ID', 'Account_No', 'OBJID',
                             'House_No', 'Building_Name',
                             'Standard_Building_Name', 'Street_Type',
                             'Street_Name', 'Area', 'City','Postcode',
                             'State','Address_Type', 'Serviceable',
                             'P_Flag','UR_Flag']]


print('done reading shamani data here')

checking removed accounts info here---------------- 1355140 ['Address_ID', 'Account_No', 'OBJID', 'House_No', 'Building_Name', 'Standard_Building_Name', 'Street_Type', 'Street_Name', 'Area', 'City', 'Postcode', 'State', 'Address_Type', 'Serviceable', 'P_Flag', 'UR_Flag', 'index']
testing reading shamani data here
Shamani_V1 shape: (2264, 2)
Shamani_Final shape: (2280, 16)
done reading shamani data here


# Phase 7
- at least I think the below cell is Phase 7... (actually maybe not after looking at the original local notebook from Zohreh...)
- At the end of the notebook we need to replace the new UAMS into the old UAMS for the next cycle. It means what we newly generated will be the old UAMS in the next cycle.

In [5]:
## 15/11/2022: added line to convert Shamani's data to a PySpark DF
Shamani_data_spark = spark.createDataFrame(Shamani_data)

## Revision 17MAy2022
Removed_accounts = Removed_accounts.drop(*['index']).union(Shamani_data_spark) ## remove the Old_Inactive_GAPI for next cycle

print('this is to see unique serviceable values :', Removed_accounts.groupBy('Serviceable').count().orderBy('count', ascending=False).show())

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage is (Bytes):')
# print(usage)

#revision - fakhrul - 4/8/22 - fillna because priyanka complained abt na
Removed_accounts = Removed_accounts.fillna('')
#Delta['Account_No'] = Delta['Account_No'].str.replace('\.0','', case = False)
Delta = Delta.fillna('')
Delta = Delta.withColumn('Account_No', when(f.col('Account_No') == 'nan', '').otherwise(f.col('Account_No')) )
Delta = Delta.withColumn('Address_Type', when(f.trim(f.col('Building_Name')) == '', 'SDU').otherwise(f.col('Building_Name')) )
Delta = Delta.withColumn('Building_Name', f.trim(f.upper(f.col('Building_Name'))) )

#Delta['Account_No'] = Delta['Account_No'].replace('NAN', '')
#Delta['Account_No'] = Delta['Account_No'].replace('NaN', '')
# Removed_accounts = Removed_accounts.replace(np.nan, '', regex=True)

#Delta['Account_No'] = pd.to_numeric(Delta['Account_No'])
#Delta = Delta.fillna('')

## Saving files

#Delta.to_csv('DELTA_ADD_UPDATE_UAMS_16Mar2022.csv') ## Save in UAMS bucket
# wr.s3.to_csv(df = Delta, path = uams_bucket_path + 'DELTA_ADD_UPDATE_UAMS_' + str(date) + '_URBAN_RURAL.csv', s3_additional_kwargs = {"ACL":"bucket-owner-full-control"})
Delta.write.orc(UAMS_PySpark_save_path+"phase_7/{}/DELTA_ADD_UPDATE_UAMS_URBAN_RURAL.orc".format(date_key), mode='overwrite', compression='snappy')
Delta.coalesce(1).write.csv(UAMS_PySpark_save_path+"phase_7/{}/DELTA_ADD_UPDATE_UAMS_URBAN_RURAL.csv".format(date_key), mode='overwrite', header=True)

#Revision - fakhrul 15/6/22 - send delta add update to target for hazim to check against edw data
# wr.s3.to_csv(df = Delta, path = target_bucket_path + 'uams_delta_add_update_full/' + 'DELTA_ADD_UPDATE_UAMS_' + timestr + '.csv')
Delta.write.orc(UAMS_PySpark_save_path+"phase_7/{}/DELTA_ADD_UPDATE_UAMS_{}.orc".format(date_key, date_key), mode='overwrite', compression='snappy')
Delta.coalesce(1).write.csv(UAMS_PySpark_save_path+"phase_7/{}/DELTA_ADD_UPDATE_UAMS_{}.csv".format(date_key, date_key), mode='overwrite', header=True)

#Removed_accounts.to_csv('DELTA_DELETE_UAMS_16Mar2022.csv')
# wr.s3.to_csv(df = Removed_accounts, path = uams_bucket_path + 'DELTA_DELETE_UAMS_' + str(date) + '_URBAN_RURAL.csv', s3_additional_kwargs = {"ACL":"bucket-owner-full-control"})
Removed_accounts.write.orc(UAMS_PySpark_save_path+"phase_7/{}/DELTA_DELETE_UAMS_URBAN_RURAL.orc".format(date_key), mode='overwrite', compression='snappy')
Removed_accounts.coalesce(1).write.csv(UAMS_PySpark_save_path+"phase_7/{}/DELTA_DELETE_UAMS_URBAN_RURAL.csv".format(date_key), mode='overwrite', header=True)

#Revision - 27/5/22 - fakhrul Send to target bucket as requirement for izham
# Acc_no_df = Removed_accounts[['Account_No']]
# wr.s3.to_csv(df = Acc_no_df, path = target_bucket_path + 'uams_delta_delete/' + 'DELTA_DELETE_UAMS_' + timestr + '.csv')
Acc_no_df = Removed_accounts.select('Account_No')
Acc_no_df.write.orc(UAMS_PySpark_save_path+"phase_7/{}/DELTA_DELETE_UAMS_acc.orc".format(date_key), mode='overwrite', compression='snappy')
Acc_no_df.coalesce(1).write.csv(UAMS_PySpark_save_path+"phase_7/{}/DELTA_DELETE_UAMS_acc.csv".format(date_key), mode='overwrite', header=True)

#Revision - 9/6/22 fakhrul - to save full for reading to update c360 pqp2 tm
# wr.s3.to_csv(df = Removed_accounts, path = 's3://astro-groupdata-prod-target/address_standardization/uams_delta_delete_full/' + 'DELTA_DELETE_UAMS_' + timestr + '.csv')
Removed_accounts.write.orc(UAMS_PySpark_save_path+"phase_7/{}/DELTA_DELETE_UAMS.orc".format(date_key), mode='overwrite', compression='snappy')
Removed_accounts.coalesce(1).write.csv(UAMS_PySpark_save_path+"phase_7/{}/DELTA_DELETE_UAMS.csv".format(date_key), mode='overwrite', header=True)

#revision - 27/6/22 - fakhrul - testing to see final output
# wr.s3.to_csv(df = Delta, path = 's3://astro-groupdata-prod-target/address_standardization/delta_add_update_uams_test.csv')

# usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
# print('[debug] memory usage after the merging isssss (Bytes):')
# print(usage)

+--------------------+-------+
|         Serviceable|  count|
+--------------------+-------+
|AstroFibre|FTTH,T...|1272898|
|             TM|VDSL|  24449|
|AstroFibre|FTTH,T...|  24233|
|AstroFibre|FTTH,T...|  10887|
|AstroFibre|FTTH,T...|  10109|
|          MAXIS|FTTH|   6491|
|            CTS|FTTH|   3452|
|     AstroFibre|FTTH|   1835|
|  TM|VDSL,MAXIS|FTTH|   1674|
|           ALLO|FTTH|   1016|
|                    |    155|
|AstroFibre|FTTH,T...|     70|
|   TM|VDSL,ALLO|FTTH|     56|
|  TM|VDSL,MAXIS|VDSL|     39|
|AstroFibre|FTTH,T...|     15|
|          MAXIS|VDSL|      9|
|          Maxis|FTTH|      7|
|TM|VDSL,MAXIS|FTT...|      3|
|AstroFibre|FTTH,M...|      2|
|    TM|VDSL,CTS|FTTH|      2|
+--------------------+-------+
only showing top 20 rows

this is to see unique serviceable values : None
