In [2]:
# Imports
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import (
    from_json, col, split, trim, regexp_replace, when, regexp_extract, size, length
)

# Spark session with tuned memory and partitions
spark = (
    SparkSession.builder
    .appName("FlattenAndCleanJSON")
    .config("spark.driver.memory", "4g")   # safe for 8 GB system
    .config("spark.executor.memory", "4g")
    .config("spark.sql.shuffle.partitions", "8")  # reduce overhead
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

# Read JSON
df = spark.read.json("/home/agileox/Project/payn_project/data/cc_sample_transaction.json")

# Schema for nested address
address_schema = StructType([
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", StringType(), True)
])

# Schema for personal_detail
personal_schema = StructType([
    StructField("person_name", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("address", StringType(), True),  # nested JSON as string
    StructField("lat", StringType(), True),
    StructField("long", StringType(), True),
    StructField("city_pop", StringType(), True),
    StructField("job", StringType(), True),
    StructField("dob", StringType(), True)
])

# Parse personal_detail JSON
df_level1 = df.withColumn("personal_detail", from_json(col("personal_detail"), personal_schema))

# Parse address JSON inside personal_detail
df_level2 = df_level1.withColumn(
    "address",
    from_json(col("personal_detail.address"), address_schema)
)

# Flatten everything (transaction + personal + address)
df_flat = df_level2.select(
    "Unnamed: 0", "amt", "category", "cc_bic", "cc_num", "is_fraud",
    "merch_eff_time", "merch_last_update_time", "merch_lat", "merch_long",
    "merch_zipcode", "merchant", "trans_date_trans_time", "trans_num",
    col("personal_detail.person_name").alias("raw_person_name"),
    col("personal_detail.gender").alias("gender"),
    col("personal_detail.lat").alias("lat"),
    col("personal_detail.long").alias("long"),
    col("personal_detail.city_pop").alias("city_pop"),
    col("personal_detail.job").alias("job"),
    col("personal_detail.dob").alias("dob"),
    col("address.street").alias("address_street"),
    col("address.city").alias("address_city"),
    col("address.state").alias("address_state"),
    col("address.zip").alias("address_zip")
)

# ---------------- Name Cleaning Pipeline ----------------

# 1) Normalize '/' to '@'
df_names = df_flat.withColumn("raw_name", regexp_replace(col("raw_person_name"), "/", "@"))

# 2) Cleanup: unify separators to commas, remove noise, trim
df_cleaned = df_names \
    .withColumn("name", regexp_replace(col("raw_name"), r"[@|!]+", ",")) \
    .withColumn("name", regexp_replace(col("name"), r"\bNOOOO\b", "")) \
    .withColumn("name", regexp_replace(col("name"), r"\beeeee\b", "")) \
    .withColumn("name", regexp_replace(col("name"), r",\s*,", ",")) \
    .withColumn("name", regexp_replace(col("name"), r"\s+", " ")) \
    .withColumn("name", trim(col("name"))) \
    .withColumn("name", regexp_replace(col("name"), r"^,+|,+$", ""))

# 3) Tokenize on comma or space
tokens = split(col("name"), r"[ ,]+")
first_tok = tokens.getItem(0)
second_tok = tokens.getItem(1)

df_split = df_cleaned \
    .withColumn("first_tok", first_tok) \
    .withColumn("second_tok", when((size(tokens) >= 2) & (length(second_tok) > 0), second_tok))

# 4) Final first_name and last_name
df_final = df_split \
    .withColumn(
        "first_name",
        when(col("second_tok").isNull(),
             regexp_extract(col("name"), r"^([A-Z][a-z]+)", 1)
        ).otherwise(col("first_tok"))
    ) \
    .withColumn(
        "last_name",
        when(col("second_tok").isNull(),
             regexp_extract(col("name"), r"^[A-Z][a-z]+([A-Z][a-z]+)", 1)
        ).otherwise(col("second_tok"))
    ) \
    .drop("first_tok", "second_tok")

# ---------------- Show Unified Table ----------------
df_final.show(5, truncate=False)

# ---- Optional Pandas conversion ----
# pdf = df_final.limit(100).toPandas()
# print(pdf.head())


                                                                                

+----------+------+-------------+-----------+----------------+--------+----------------+----------------------+------------------+-----------+-------------+----------------------------------+---------------------+--------------------------------+--------------------+------+-------+---------+--------+---------------------------------+----------+----------------------------+--------------+-------------+-----------+--------------------+--------------+----------+---------+
|Unnamed: 0|amt   |category     |cc_bic     |cc_num          |is_fraud|merch_eff_time  |merch_last_update_time|merch_lat         |merch_long |merch_zipcode|merchant                          |trans_date_trans_time|trans_num                       |raw_person_name     |gender|lat    |long     |city_pop|job                              |dob       |address_street              |address_city  |address_state|address_zip|raw_name            |name          |first_name|last_name|
+----------+------+-------------+-----------+-------