In [None]:
%pip install --upgrade geopy gspread ipykernel matplotlib numpy openpyxl openai pandas pip plotly_express polars PyCap pygsheets python-dotenv pyspark seaborn setuptools scikit-learn streamlit tabulate tabula-py xlsx2csv
# %pip install pandas==1.5.3 [pandas_on_spark]
# %pip install distutils
# %pip install pyarrow, pandasai
%pip list

In [None]:
# Import necessaery packages
from pyspark.sql import SparkSession
from pyspark.sql import functions as spark_func
import pandas as pd
from itertools import chain

# Some settings
str_file = "fix_Peka40STR2023.txt"
address_cols = ["NoKPKIR", "Alamat", "Poskod", "Bandar", "Negeri", "state",]

# Create SparkSession
spark = SparkSession.builder.master("local[*]").appName("ReplacePostalCode").getOrCreate()

# Read the csv file
df = spark.read.options(delimiter="|", header=True).csv(str_file)

# Select the important columns for address, then cast string to postcode, capitalize address, city, state, create a column of state
address_df = df.select(*address_cols[0:5])\
               .withColumn("Poskod", df["Poskod"].cast("string"))\
               .withColumn("state", spark_func.upper("Negeri"))
# address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], "^#", ""))

# To capitalize all the address related information
for column in address_cols[1:]:
    address_df = address_df.withColumn(column, spark_func.upper(spark_func.trim(column)))
               
# Create list of state
# ['W.PERSEKUTUAN (KL)', 'JOHOR', 'KEDAH', 'PERAK', 'PERLIS', 'PULAU PINANG', 'W.PERSEKUTUAN (LABUAN)', 
#  'SELANGOR', 'TERENGGANU', 'NEGERI SEMBILAN', 'KELANTAN', 'SARAWAK', 'W.PERSEKUTUAN (PUTRAJAYA)', 'PAHANG', 'MELAKA', 'SABAH']
state_df = address_df.groupBy("Negeri").count()
state_list = [item["Negeri"] for item in state_df.sort("Negeri").toLocalIterator()]

# Create another state column by changing federal states' name
state_change_dict = {"W.PERSEKUTUAN (KL)":"KUALA LUMPUR",
                     "W.PERSEKUTUAN (LABUAN)":"LABUAN",
                     "W.PERSEKUTUAN (PUTRAJAYA)":"PUTRAJAYA"}

# Change the state name
for key in state_change_dict:
    address_df = address_df.withColumn("state", spark_func.when(spark_func.col("state") == key, state_change_dict[key])\
                                       .otherwise(spark_func.col("state")))

for item in reversed(address_cols[2:]):
    # To replace all poskod, city and state in the address
    address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], address_df[item], ""))
    # To remove all those end with ,
    address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], ",\\s*$", ""))
    address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], ",\\s*", ", "))
    address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], ",\\s*,\\s*", ", "))
    address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(address_df["Alamat"], "\\s*,", ","))
    address_df = address_df.withColumn("Alamat", spark_func.trim(spark_func.regexp_replace(address_df["Alamat"], "\\s+", " ")))
    
address_df = address_df.withColumn("address", spark_func.when(spark_func.col("Alamat") != "", spark_func.concat_ws(" ", *address_cols[1:4], "state")))
address_df1 = address_df.filter(spark_func.col("address").isNotNull())
# address_df1.dropDuplicates(["address"])
address_df1.select(address_cols[0], "address").show(100, truncate = False)

# for value in reversed(address_cols[2:]):
    # print(item)
    # expr(f"regexp_replace(Alamat, '{col_name}$', '')"))
    # spark_func.expr(spark_func.trim(address_df["Alamat"]), address_df[item], ""))
    # address_df = address_df.withColumn("Alamat", spark_func.regexp_replace(spark_func.trim(address_df["Alamat"]), f'\\b{item}\\b$', ''))
    # address_df = address_df.withColumn("Alamat", spark_func.expr(f"CASE WHEN split(Alamat, ' ')[-1] = '{value}' THEN regexp_replace(Alamat, ' {value}$', '') ELSE Alamat END"))

# df1 = address_df1.dropDuplicates(["address"]).select(address_cols[0], "address")
# df1.show(truncate = False)
address_df1.select(address_cols[0], "address")\
    .coalesce(1).write.format("csv").mode('overwrite').options(header='True', delimiter='|')\
        .save("str_address")

In [None]:
def clean_address(df, columns_to_loop, column_need_to_be_clean,):
    for value in columns_to_loop:
        df = df.withColumn("words", spark_func.split(column_need_to_be_clean, "\\s+"))\
               .withColumn("last", spark_func.expr("words[size(words) - 1]"))\
               .withColumn("words", spark_func.expr("slice(words, 1, size(words) - 1)"))
        df = df.withColumn("last", spark_func.when(df[value]==df["last"], "").otherwise(df["last"]))
        df = df.withColumn(column_need_to_be_clean, spark_func.trim(spark_func.concat_ws(" ", df["words"], df["last"])))
        df = df.withColumn(column_need_to_be_clean, spark_func.regexp_replace(df[column_need_to_be_clean], ",\\s*$", ""))
        df = df.withColumn(column_need_to_be_clean, spark_func.regexp_replace(df[column_need_to_be_clean], ",\\s*", ", "))
        df = df.withColumn(column_need_to_be_clean, spark_func.regexp_replace(df[column_need_to_be_clean], ",\\s*,\\s*", ", "))
        df = df.withColumn(column_need_to_be_clean, spark_func.regexp_replace(df[column_need_to_be_clean], "\\s*,", ","))
        df = df.withColumn(column_need_to_be_clean, spark_func.trim(spark_func.regexp_replace(df[column_need_to_be_clean], "\\s+", " ")))
        return df.drop("words", "last")
clean_address(address_df, reversed(address_cols[2:]), "Alamat").show(truncate = False)

In [None]:
# Import necessaery packages
from pyspark.sql import SparkSession
from pyspark.sql import functions as spark_func
import pandas as pd
from itertools import chain

# Some settings
str_file = "fix_Peka40STR2023.txt"
address_cols = ["NoKPKIR", "Alamat", "Poskod", "Bandar", "Negeri", "state",]

# Create SparkSession
spark = SparkSession.builder.appName("ReplacePostalCode").getOrCreate()

# Read the csv file
df = spark.read.options(delimiter="|", header=True).csv("str_address")
df = df.pandas_api()

In [None]:
# with pd.ExcelWriter("str_address.xlsx", mode="w") as writer:
#     for num in range(0, len(df), 1000):
#         df.iloc[num:num+1000].to_excel(writer, sheet_name = f"sheet{num/1000}", index = False)

for num in range(0, len(df), 1000):
    df.iloc[num:num+1000].to_excel(f"str_partition/sheet{num/1000}.xlsx", sheet_name = f"sheet{num//1000}", index = False)

In [None]:
import pandas as pd
import sqlite3

df = pd.read_csv("str_address.csv", sep="|", error = 'coerce')

# Create a connection to a new SQLite database
conn = sqlite3.connect('str.db')

# Write the DataFrame to a table in the SQLite database
df.to_sql('str', conn, if_exists='replace', index=True)

# Close the connection
conn.close()

In [None]:
import pandas as pd
import sqlite3

# df = pd.read_csv("str_address.csv", sep="|")

# Create a connection to a new SQLite database
conn = sqlite3.connect(r'str_address/str.db')
str_db = sqlite3.connect(r"str_address/str_simple.db")

# conn.cursor().execute("ALTER TABLE raw DROP COLUMN NoKPKIR")
# conn.commit()
# pd.read_sql("ALTER TABLE str ADD COLUMN point CHAR", conn)
# pd.read_sql("SELECT name FROM sqlite_master WHERE type='table'", conn).name
for chunk in pd.read_sql("SELECT id, NoKPKIR, address FROM str", con=conn, chunksize=10000):
    chunk.to_sql("str", con = str_db, index=False, if_exists="append")
# conn.close()

In [None]:
import os
import pandas as pd

def read_file(path:str, method:str) -> pd.DataFrame:
    if method == "csv":
        return pd.concat([pd.read_csv(f"{path}/{file}", sep="|")
                          for file in os.listdir(path)
                          if file.endswith("csv")], ignore_index=True).drop_duplicates()

    elif method == "excel":
        df = pd.DataFrame()

        for file in [file for file in os.listdir(path) if file.endswith("xlsx")]:
            if len(pd.ExcelFile(f"{path}/{file}").sheet_names) > 1:
                temp_df = pd.concat([pd.read_excel(f"{path}/{file}", sheet_name=num) 
                                     for num in range(len(pd.ExcelFile(f"{path}/{file}").sheet_names))])
            else:
                temp_df = pd.read_excel(f"{path}/{file}")
            df = pd.concat([df, temp_df], ignore_index=True)
        
        return df.drop_duplicates()
    
def convert_point_to_lat_lon(df:pd.DataFrame) -> pd.DataFrame:
    # Create a copy to prevent loss of original data
    temp_df = df.copy(deep = True)
    try:
        # Temporary replace the (, to reduce memory usage?
        temp_df.loc[:,"temp"] = temp_df.loc[:,"point"].astype(str).str.strip("()")
        # Split it and put the 0 to latitude and 1 to longitude
        temp_df.loc[:,"Latitude"] = temp_df.loc[:,"point"].apply(lambda x: str(x).split(",")[0])
        temp_df.loc[:,"Longitude"] = temp_df.loc[:,"point"].apply(lambda x: str(x).split(",")[1])
        # Return the temp_df after drop columns for "temp"
        return temp_df.drop(columns = "temp")
    except:
        # Loop through for some times if hte method above failed
        for index, row in temp_df.iterrows():
            # Split the content
            temp_str = row["point"].split(",")
            try:
                temp_df.loc[index, "Latitude"] = float(temp_str[0].replace("(", ""))
                temp_df.loc[index, "Longitude"] = float(temp_str[1])
            except:
                continue
        # Return the temp_df
        return temp_df

def convert_id_to_kpir(df:pd.DataFrame, 
                       id_columns:str = "id",
                       database_file:str = 'str_address/str.db',
                       columns:tuple = ("Latitude", "Longitude",)) -> pd.DataFrame:
    import sqlite3

    # Prepare for SQL statement
    id_list = tuple(df.loc[:,id_columns].astype(int).unique())

    # Get the data from database
    return pd.read_sql(f"SELECT * FROM str WHERE id IN{id_list}", con = sqlite3.connect(database_file))\
                .merge(df.loc[:,(id_columns,) + columns], 
                       how = "outer", left_on = id_columns, right_on = "id")

# previous_csv = read_file("str_partition", "csv") # 319760
# previous_csv_no_point = previous_csv.query("point.isnull()") # 36292
# previous_csv_point = convert_point_to_lat_lon(previous_csv.query("point.notnull()")) # 283468
# str_partition_complete = read_file("str_partition/completed", "excel") # Done
# str_partition_with_id = str_partition_complete.query("id.notnull()") # 25966 Done
# str_partition_with_id.loc[:,"ic"] = str_partition_with_id.loc[:,"id"].apply(lambda x: len(str(x))) # Done
# str_partition_with_ic = pd.concat([str_partition_complete.query("NoKPKIR.notnull()"), 
#                                    str_partition_complete.query("ic.notnull()")\
#                                                          .drop(columns = "NoKPKIR")\
#                                                          .rename(columns = {"ic":"NoKPKIR"}),
#                                    str_partition_with_id.query("ic == 14")\
#                                                         .drop(columns = ["NoKPKIR", "ic"])\
#                                                         .rename(columns = {"id":"NoKPKIR"}), 
#                                    pd.read_sql(f"SELECT * FROM str WHERE id IN{tuple(str_partition_with_id.query("ic == 9").loc[:,"id"].astype(int).unique())}", 
#                                                con=conn)\
#                                      .merge(str_partition_with_id.loc[str_partition_with_id.loc[:,"ic"] == 9,
#                                                                       ("id", "Latitude", "Longitude")], 
#                                             how = "outer", on = "id")],
#                                   ignore_index=True).drop_duplicates(subset=["NoKPKIR", "address"]) # 283463

# read_file("str_google", "excel").info() -> no point/lat/lon

# completed_id =  convert_id_to_kpir(convert_point_to_lat_lon(read_file("completed/id", "excel"))) # 10385
# completed_kpir = read_file("completed", "excel") # 66923
# completed_kpir_lat_lon = completed_kpir.query("Latitude.notnull()") #16457
# completed_kpir_point = completed_kpir.query("Latitude.isnull()") # 50466
# completed_kpir_full = pd.concat([convert_point_to_lat_lon(completed_kpir_point).drop(columns="temp"), 
#                                  completed_kpir_lat_lon], ignore_index=True)
# completed_kpir_fuck = completed_kpir_full.query("Latitude.isnull()")
# for index, row in completed_kpir_fuck.iterrows():
#     # Split the content
#     temp_str = row["point"].split(",")
#     try:
#         completed_kpir_fuck.loc[index, "Latitude"] = float(temp_str[0].replace("(", ""))
#         completed_kpir_fuck.loc[index, "Longitude"] = float(temp_str[1])
#     except:
#         continue
# final_df = pd.concat([completed_kpir_full, completed_kpir_fuck], ignore_index=True)\
#              .sort_values("NoKPKIR")\
#                 .drop_duplicates(subset=["NoKPKIR", "address"])
# final_df.to_csv("test_geo.csv", index = False, sep="|") # 66923

# new_str = read_file("new_str", "excel") -> no point/lat/lon
# new_str_complete_point = read_file("new_str/complete", "excel")
# new_str_complete_point

In [None]:
# new_str_complete_point = read_file("new_str/complete", "excel")

# for index, row in new_str_complete_point.iterrows():
#     # Loop through for some times if hte method above failed
#     temp_str = row["point"].split(",")
#     try:
#         new_str_complete_point.loc[index, "Latitude"] = float(temp_str[0].replace("(", ""))
#         new_str_complete_point.loc[index, "Longitude"] = float(temp_str[1])
#     except:
#         continue

# # new_str_complete_point.to_parquet("test_geo2.parquet")
final_df

In [None]:
import polars as pl
# pl.from_pandas(final_df2)
# for column in final_df.columns:
#     try:
#         final_df.loc[:,column] = pd.to_numeric(final_df2.loc[:,column], downcast="integer")
#     except:
#         continue
# final_df = final_df.loc[:,("NoKPKIR", "address", "Latitude", "Longitude")]
# final_df.loc[:,"NoKPKIR"] = final_df.loc[:,"NoKPKIR"].astype(int)
# kpir_issue = pl.from_pandas(new_str_complete_point.query("NoKPKIR.notnull()"))\
#     .select(pl.col("NoKPKIR").cast(pl.Int64), "address", "Latitude", "Longitude")\
#     .to_pandas()


# final_df = pd.concat([convert_id_to_kpir(new_str_complete_point.query("NoKPKIR.isnull()")),
#                       kpir_issue], ignore_index=True).drop(columns = "id")

# final_df2 = pd.concat([final_df.drop_duplicates(subset = ["NoKPKIR", "address"]),
#                        .drop_duplicates(subset=["NoKPKIR", "address"]),
#                        pd.read_csv("test_geo.csv", sep = "|").drop_duplicates(subset=["NoKPKIR", "address"]),
#                        pd.read_parquet("Final Geo Data.parquet")], ignore_index=True)


# test_geo = pd.read_parquet("test_geo.parquet").loc[:,("NoKPKIR", "Latitude", "Longitude")]
# little_wife = pd.read_parquet("Final Geo Data.parquet").loc[:,("NoKPKIR", "Latitude", "Longitude")]
# csv_df = pl.read_csv("test_geo.csv", separator ="|", ignore_errors=True)\
#            .select(pl.col("NoKPKIR").cast(pl.Int64), "Latitude", "Longitude").to_pandas()

# final_df2 = pd.concat([final_df.loc[:,("NoKPKIR", "Latitude", "Longitude")],
#                        test_geo, little_wife, csv_df], ignore_index=True)
final_df2.drop_duplicates(subset="NoKPKIR").to_parquet("2024_07_02.parquet")

In [None]:
pl.read_parquet("2024_07_03.parquet")

In [None]:
import polars as pl
pl.read_csv("fix_Peka40STR2023.txt", separator="|", ignore_errors = True)\
    .join(pl.read_parquet("2024_07_03.parquet").drop("__index_level_0__"), on= "NoKPKIR", how = "left")\
    .write_parquet("2024_07_02.parquet", use_pyarrow=True)

In [None]:
# import polars as pl
# df = pl.read_parquet("2024_07_02.parquet")
# df.filter(pl.col("Latitude").is_not_null())
# df.group_by("Negeri").count()

In [1]:
import pandas as pd
import sqlite3

address_df = pd.read_sql("SELECT NoKPKIR, address FROM str", 
                         con = sqlite3.connect(r"/Users/wh0102/Downloads/github/str/str_address/str.db"))

original_df = pd.read_parquet("2024_07_02.parquet")
address_df.merge(original_df, 
                 how = "outer", 
                 on = "NoKPKIR").to_sql("str", 
                                        con = sqlite3.connect("str_half.db"),
                                        index_label="id",
                                        if_exists="append")

In [None]:
# with pd.ExcelWriter("str_address/siswa.xlsx", engine = "openpyxl", mode = "w") as writer:
#     for num in range(0, 500000, 10000):
#         df.iloc[num:num+10000].to_excel(writer, sheet_name = f"sheet{int(num/10000)}", index = False)

# with pd.ExcelWriter("str_address/google.xlsx", engine = "openpyxl", mode = "w") as writer:
#     for num in range(500000, len(df), 10000):
#         df.iloc[num:num+10000].to_excel(writer, sheet_name = f"sheet{int(num/10000)}", index = False)

for num in range(0, len(df), 10000):
    df.iloc[num:num+10000].to_excel(f"str_google/google/sheet{int(num/10000)}.xlsx", index= False)