In [5]:
%pip install --upgrade ipykernel matplotlib numpy openpyxl pandas pip plotly_express polars PyCap python-dotenv pyspark[pandas_on_spark] seaborn setuptools tabulate
# %pip install pandas==1.5.3
# %pip install distutils
%pip list


Collecting setuptools
  Using cached setuptools-69.2.0-py3-none-any.whl.metadata (6.3 kB)
Using cached setuptools-69.2.0-py3-none-any.whl (821 kB)
Installing collected packages: setuptools
Successfully installed setuptools-69.2.0
Package            VersionNote: you may need to restart the kernel to use updated packages.

------------------ -----------
asttokens          2.4.1
certifi            2024.2.2
charset-normalizer 3.3.2
colorama           0.4.6
comm               0.2.2
contourpy          1.2.0
cycler             0.12.1
debugpy            1.8.1
decorator          5.1.1
et-xmlfile         1.1.0
executing          2.0.1
fonttools          4.50.0
idna               3.6
ipykernel          6.29.3
ipython            8.22.2
jedi               0.19.1
jupyter_client     8.6.1
jupyter_core       5.7.2
kiwisolver         1.4.5
matplotlib         3.8.3
matplotlib-inline  0.1.6
nest-asyncio       1.6.0
numpy              1.26.4
openpyxl           3.1.2
packaging          24.0
pandas        

In [None]:
import polars as pl
str_file = "fix_Peka40STR2023.txt"
address_cols = ["NoKPKIR", "Alamat", "Poskod", "Bandar", "Negeri"]

df = pl.scan_csv(str_file, separator="|", infer_schema_length=10000000)
df1 = df.select(address_cols).head(20).collect()

# for item in address_cols[2:]:
#     # print(item)
#     df1.with_columns(pl.col("Alamat").str.replace_all(pl.col(item).cast(pl.String), ""))

print(df1.to_pandas().to_markdown(tablefmt = "pretty"))

In [None]:
df1.to_dicts()

In [None]:
pl.scan_csv("fix_Peka40STR2023.txt", separator="|").columns

In [10]:
# Import necessaery packages
from pyspark.sql import SparkSession
from pyspark.sql import functions as spark_func
import pandas as pd
from itertools import chain

# Some settings
str_file = "fix_Peka40STR2023.txt"
address_cols = ["NoKPKIR", "Alamat", "Poskod", "Bandar", "Negeri", "state",]

# Create SparkSession
spark = SparkSession.builder.appName("ReplacePostalCode").getOrCreate()

# Read the csv file
df = spark.read.options(delimiter="|", header=True).csv(str_file)

# Select the important columns for address, then cast string to postcode, capitalize address, city, state, create a column of state
address_df = df.select(*address_cols[0:5])\
               .withColumn("Poskod", df["Poskod"].cast("string"))\
               .withColumn("state", spark_func.upper("Negeri"))
address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], "^#", ""))

# To capitalize all the address related information
for column in address_cols[1:]:
    address_df = address_df.withColumn(column, spark_func.upper(spark_func.trim(column)))
               
# Create list of state
# ['W.PERSEKUTUAN (KL)', 'JOHOR', 'KEDAH', 'PERAK', 'PERLIS', 'PULAU PINANG', 'W.PERSEKUTUAN (LABUAN)', 
#  'SELANGOR', 'TERENGGANU', 'NEGERI SEMBILAN', 'KELANTAN', 'SARAWAK', 'W.PERSEKUTUAN (PUTRAJAYA)', 'PAHANG', 'MELAKA', 'SABAH']
state_df = address_df.groupBy("Negeri").count()
state_list = [item["Negeri"] for item in state_df.sort("Negeri").toLocalIterator()]

# Create another state column by changing federal states' name
state_change_dict = {"W.PERSEKUTUAN (KL)":"KUALA LUMPUR",
                     "W.PERSEKUTUAN (LABUAN)":"LABUAN",
                     "W.PERSEKUTUAN (PUTRAJAYA)":"PUTRAJAYA"}

# Change the state name
for key in state_change_dict:
    address_df = address_df.withColumn("state", spark_func.when(spark_func.col("state") == key, state_change_dict[key])\
                                       .otherwise(spark_func.col("state")))

for item in reversed(address_cols[2:]):
    # To replace all poskod, city and state in the address
    address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], address_df[item], ""))
    # To remove all those end with ,
    address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], ",\\s*$", ""))
    address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], ",\\s*", ", "))
    address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], ",\\s*,\\s*", ", "))
    address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], "\\s*,", ","))
    address_df = address_df.withColumn("Alamat", spark_func.trim(spark_func.regexp_replace(address_df["Alamat"], "\\s+", " ")))
    
address_df = address_df.withColumn("address", spark_func.when(spark_func.col("Alamat") != "", spark_func.concat_ws(" ", *address_cols[1:4], "state")))
address_df1 = address_df.filter(spark_func.col("address").isNotNull())
# address_df1.dropDuplicates(["address"])
address_df1.select(address_cols[0], "address").show(100, truncate = False)

+------------+---------------------------------------------------------------------------------------------------------+
|NoKPKIR     |address                                                                                                  |
+------------+---------------------------------------------------------------------------------------------------------+
|000101010049|LOT 8091, KAMPUNG SENGKANG BATU 18 TANGKAK 84800 BUKIT GAMBIR JOHOR                                      |
|000101010057|NO 21 A PARIT AMAL LORONG HAJI SUHAIMI JALAN SRI MUDA 84000 MUAR JOHOR                                   |
|000101010073|NO34 TINGKAT 2 BLOCK B TAMAN DESA RAKYAT PERDANA 81700 PASIR GUDANG JOHOR                                |
|000101010081|NO 13, JALAN TERKUKUR, LARKIN JAYA 80350 JOHOR BAHRU JOHOR                                               |
|000101010102|BLOK A4, 02-10 JALAN SEROJA INDAH 2 TAMAN SEROJA 81200 JOHOR BAHRU JOHOR                                 |
|000101010110|NO 31 JALAN SIERRA

In [16]:
# for value in reversed(address_cols[2:]):
    # print(item)
    # expr(f"regexp_replace(Alamat, '{col_name}$', '')"))
    # spark_func.expr(spark_func.trim(address_df["Alamat"]), address_df[item], ""))
    # address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(spark_func.trim(address_df["Alamat"]), f'\\b{item}\\b$', ''))
    # address_df = address_df.withColumn("Alamat", spark_func.expr(f"CASE WHEN split(Alamat, ' ')[-1] = '{value}' THEN regexp_replace(Alamat, ' {value}$', '') ELSE Alamat END"))

# df1 = address_df1.dropDuplicates(["address"]).select(address_cols[0], "address")
# df1.show(truncate = False)
address_df1.select(address_cols[0], "address")\
    .write.format("csv").mode('overwrite').options(header='True', delimiter='|')\
        .save("str_address")

Py4JJavaError: An error occurred while calling o1377.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


In [43]:
def clean_address(df, columns_to_loop, column_need_to_be_clean,):
    for value in columns_to_loop:
        df = df.withColumn("words", spark_func.split(column_need_to_be_clean, "\\s+"))\
               .withColumn("last", spark_func.expr("words[size(words) - 1]"))\
               .withColumn("words", spark_func.expr("slice(words, 1, size(words) - 1)"))
        df = df.withColumn("last", spark_func.when(df[value]==df["last"], "").otherwise(df["last"]))
        df = df.withColumn(column_need_to_be_clean, spark_func.trim(spark_func.concat_ws(" ", df["words"], df["last"])))
        df = df.withColumn(column_need_to_be_clean, spark_func.regexp_replace(df[column_need_to_be_clean], ",\\s*$", ""))
        df = df.withColumn(column_need_to_be_clean, spark_func.regexp_replace(df[column_need_to_be_clean], ",\\s*", ", "))
        df = df.withColumn(column_need_to_be_clean, spark_func.regexp_replace(df[column_need_to_be_clean], ",\\s*,\\s*", ", "))
        df = df.withColumn(column_need_to_be_clean, spark_func.regexp_replace(df[column_need_to_be_clean], "\\s*,", ","))
        df = df.withColumn(column_need_to_be_clean, spark_func.trim(spark_func.regexp_replace(df[column_need_to_be_clean], "\\s+", " ")))
        return df.drop("words", "last")
clean_address(address_df, reversed(address_cols[2:]), "Alamat").show(truncate = False)

+------------+-------------------------------------------------------------------------+------+------------+------+-----+
|NoKPKIR     |Alamat                                                                   |Poskod|Bandar      |Negeri|state|
+------------+-------------------------------------------------------------------------+------+------------+------+-----+
|000101010049|LOT 8091, KAMPUNG SENGKANG BATU 18 BUKIT GAMBIR TANGKAK                  |84800 |BUKIT GAMBIR|JOHOR |JOHOR|
|000101010057|NO 21 A PARIT AMAL LORONG HAJI SUHAIMI JALAN SRI MUDA                    |84000 |MUAR        |JOHOR |JOHOR|
|000101010073|NO34 TINGKAT 2 BLOCK B TAMAN DESA RAKYAT PERDANA PASIR GUDANG            |81700 |PASIR GUDANG|JOHOR |JOHOR|
|000101010081|NO 13, JALAN TERKUKUR, LARKIN JAYA                                       |80350 |JOHOR BAHRU |JOHOR |JOHOR|
|000101010102|BLOK A4, 02-10 JALAN SEROJA INDAH 2 TAMAN SEROJA                         |81200 |JOHOR BAHRU |JOHOR |JOHOR|
|000101010110|NO 31 JALA