In [1]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

In [2]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
spark = SparkSession.builder \
    .appName("JSON Processing") \
    .master("spark://spark-master:7077") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

In [4]:
df1 = spark.read.json('data/sample.json')

In [5]:
df1.show()

+---+-------------+-----+
|age|         city| name|
+---+-------------+-----+
| 30|     New York| John|
| 25|San Francisco|Alice|
| 35|      Chicago|  Bob|
+---+-------------+-----+



In [6]:
df = spark.read.json('data/cc_sample_transaction.json')

In [7]:
df.limit(5).show(truncate=True)

+----------+------+-------------+-----------+----------------+--------+----------------+----------------------+------------------+-----------+-------------+--------------------+--------------------+---------------------+--------------------+
|Unnamed: 0|   amt|     category|     cc_bic|          cc_num|is_fraud|  merch_eff_time|merch_last_update_time|         merch_lat| merch_long|merch_zipcode|            merchant|     personal_detail|trans_date_trans_time|           trans_num|
+----------+------+-------------+-----------+----------------+--------+----------------+----------------------+------------------+-----------+-------------+--------------------+--------------------+---------------------+--------------------+
|         0|  4.97|     misc_net|CITIUS33CHI|2703186189652095|       0|1325376018798532|         1325376018666|         36.011293| -82.048315|        28705|fraud_Rippin, Kub...|{"person_name":"J...|  2019-01-01 00:00:18|0b242abb623afc578...|
|         1|107.23|  grocery_pos

In [8]:
# Define the schema for the main JSON structure
main_schema = StructType([
    StructField("person_name", StringType()),
    StructField("gender", StringType()),
    StructField("address", StringType()), 
    StructField("lat", StringType()),
    StructField("long", StringType()),
    StructField("city_pop", StringType()),
    StructField("job", StringType()),
    StructField("dob", StringType())
])

# Define schema for the nested address
address_schema = StructType([
    StructField("street", StringType()),
    StructField("city", StringType()),
    StructField("state", StringType()),
    StructField("zip", StringType())
])

In [9]:
df_parsed = df.withColumn("personal_detail_parsed", F.from_json(F.col("personal_detail"), main_schema))

In [10]:
df_address = df_parsed.withColumn("address_parsed", F.from_json(F.col("personal_detail_parsed.address"), address_schema))

In [11]:
# Extract all fields
df_final = df_address.select(
    "*",
    F.col("personal_detail_parsed.person_name").alias("person_name"),
    F.col("personal_detail_parsed.gender").alias("gender"),
    F.col("address_parsed.street").alias("street"),
    F.col("address_parsed.city").alias("city"),
    F.col("address_parsed.state").alias("state"),
    F.col("address_parsed.zip").alias("zip"),
    F.col("personal_detail_parsed.lat").alias("latitude"),
    F.col("personal_detail_parsed.long").alias("longitude"),
    F.col("personal_detail_parsed.city_pop").alias("city_population"),
    F.col("personal_detail_parsed.job").alias("job"),
    F.col("personal_detail_parsed.dob").alias("dob")
).drop("personal_detail", "personal_detail_parsed", "address_parsed")

In [12]:
df_final.limit(1).show()

+----------+----+--------+-----------+----------------+--------+----------------+----------------------+---------+----------+-------------+--------------------+---------------------+--------------------+--------------------+------+--------------+--------------+-----+-----+--------+---------+---------------+--------------------+----------+
|Unnamed: 0| amt|category|     cc_bic|          cc_num|is_fraud|  merch_eff_time|merch_last_update_time|merch_lat|merch_long|merch_zipcode|            merchant|trans_date_trans_time|           trans_num|         person_name|gender|        street|          city|state|  zip|latitude|longitude|city_population|                 job|       dob|
+----------+----+--------+-----------+----------------+--------+----------------+----------------------+---------+----------+-------------+--------------------+---------------------+--------------------+--------------------+------+--------------+--------------+-----+-----+--------+---------+---------------+----------

In [None]:
# not working solution, since the timestamp columns are not of equal length

dt_cols = ["merch_eff_time", "merch_last_update_time"]
for col_name in dt_cols:
    df_dt_converted = df_final.withColumn(col_name, F.date_format(
        F.from_utc_timestamp(
            (F.col(col_name) / 1000).cast(TimestampType()), 
            "Asia/Kuala_Lumpur"
        ),
        "yyyy-MM-dd HH:mm:ss"
    ))

In [None]:
df_dt_converted.select(["merch_eff_time", "merch_last_update_time"]).limit(2).show()

In [13]:
# working solution

df_dt_converted = df_final.withColumn("merch_last_update_time", F.date_format(
        F.from_utc_timestamp(
            (F.col("merch_last_update_time") / 1000).cast(TimestampType()), 
            "Asia/Kuala_Lumpur"
        ),
        "yyyy-MM-dd HH:mm:ss"
    ))\
    .withColumn("merch_eff_time", F.date_format(
        F.from_utc_timestamp(
            (F.col("merch_eff_time") / 1000000).cast(TimestampType()), 
            "Asia/Kuala_Lumpur"
        ),
        "yyyy-MM-dd HH:mm:ss"
    ))\
    .withColumn("trans_date_trans_time", F.date_format(
        F.from_utc_timestamp(
            (F.col("trans_date_trans_time")).cast(TimestampType()), 
            "Asia/Kuala_Lumpur"
        ),
        "yyyy-MM-dd HH:mm:ss"
    ))

In [14]:
df_dt_converted.select(["merch_eff_time", "merch_last_update_time", "trans_date_trans_time"]).limit(10).show(truncate=False)

+-------------------+----------------------+---------------------+
|merch_eff_time     |merch_last_update_time|trans_date_trans_time|
+-------------------+----------------------+---------------------+
|2012-01-01 08:00:18|2012-01-01 08:00:18   |2019-01-01 08:00:18  |
|2012-01-01 08:00:44|1974-03-15 07:30:04   |2019-01-01 08:00:44  |
|2012-01-01 08:00:51|2012-01-01 08:00:51   |2019-01-01 08:00:51  |
|2012-01-01 08:01:16|2012-01-01 08:01:16   |2019-01-01 08:01:16  |
|2012-01-01 08:03:06|1974-03-15 07:30:18   |2019-01-01 08:03:06  |
|2012-01-01 08:04:08|2012-01-01 08:04:08   |2019-01-01 08:04:08  |
|2012-01-01 08:04:42|2012-01-01 08:04:42   |2019-01-01 08:04:42  |
|2012-01-01 08:05:08|2012-01-01 08:05:08   |2019-01-01 08:05:08  |
|2012-01-01 08:05:18|2012-01-01 08:05:18   |2019-01-01 08:05:18  |
|2012-01-01 08:06:01|2012-01-01 08:06:01   |2019-01-01 08:06:01  |
+-------------------+----------------------+---------------------+



In [19]:
# name derivation

df_dt_converted.select(["person_name"]).limit(20).show(truncate=False)

+-------------------------+
|person_name              |
+-------------------------+
|Jennifer,Banks,eeeee     |
|Stephanie,Gill,eeeee     |
|Edward@Sanchez           |
|Jeremy/White, !          |
|Tyler@Garcia             |
|Jennifer,Conner,eeeee    |
|Kelsey, , Richards NOOOO |
|Steven, Williams         |
|Heather, , Chase NOOOO   |
|Melissa@Aguilar          |
|Eddie|Mendez!!!          |
|Theresa@Blackwell        |
|Charles|Robles!!!        |
|Jack@Hill                |
|Christopher@Castaneda    |
|Ronald@Carson            |
|Lisa, Mendez             |
|Nathan,Thomas,eeeee      |
|Justin, Gay              |
|Kenneth, , Robinson NOOOO|
+-------------------------+



In [33]:
# Normalize delimiters and remove unwanted characters
df_normalized = df_dt_converted.withColumn(
    "person_name",
    F.regexp_replace("person_name", r"[@/|!]+", ",")  # Replace '@', '/', '|', and '!' with ','
)

# Split the column into an array
df_split = df_normalized.withColumn(
    "name_parts",
    F.split("person_name", ",")
)

# Extract first name, last name, and other parts
df_final = df_split.withColumn("first_name", F.trim(df_split["name_parts"][0])) \
                   .withColumn("last_name", F.trim(df_split["name_parts"][1])) \
                   .withColumn("other_parts", F.trim(df_split["name_parts"][2]))

In [34]:
df_final.select(["first_name", "last_name", "other_parts"]).limit(30).show(truncate=False)

+-----------+---------+--------------+
|first_name |last_name|other_parts   |
+-----------+---------+--------------+
|Jennifer   |Banks    |eeeee         |
|Stephanie  |Gill     |eeeee         |
|Edward     |Sanchez  |NULL          |
|Jeremy     |White    |              |
|Tyler      |Garcia   |NULL          |
|Jennifer   |Conner   |eeeee         |
|Kelsey     |         |Richards NOOOO|
|Steven     |Williams |NULL          |
|Heather    |         |Chase NOOOO   |
|Melissa    |Aguilar  |NULL          |
|Eddie      |Mendez   |              |
|Theresa    |Blackwell|NULL          |
|Charles    |Robles   |              |
|Jack       |Hill     |NULL          |
|Christopher|Castaneda|NULL          |
|Ronald     |Carson   |NULL          |
|Lisa       |Mendez   |NULL          |
|Nathan     |Thomas   |eeeee         |
|Justin     |Gay      |NULL          |
|Kenneth    |         |Robinson NOOOO|
+-----------+---------+--------------+
only showing top 20 rows



In [35]:
# some last name is in the other parts. extract it and put in last name column

In [44]:
df_final = df_final.withColumn(
    "extracted_last_name",
    F.when(
        F.col("last_name").isNull() | (F.trim(F.col("last_name")) == ""),
        F.split(F.col("other_parts"), " ")[0]
    ).otherwise(F.col("last_name"))
)

df_final = df_final.drop("last_name").withColumnRenamed("extracted_last_name", "last_name")

In [45]:
df_final.select(["first_name", "last_name", "other_parts"]).limit(30).show(truncate=False)

+-----------+---------+--------------+
|first_name |last_name|other_parts   |
+-----------+---------+--------------+
|Jennifer   |Banks    |eeeee         |
|Stephanie  |Gill     |eeeee         |
|Edward     |Sanchez  |NULL          |
|Jeremy     |White    |              |
|Tyler      |Garcia   |NULL          |
|Jennifer   |Conner   |eeeee         |
|Kelsey     |Richards |Richards NOOOO|
|Steven     |Williams |NULL          |
|Heather    |Chase    |Chase NOOOO   |
|Melissa    |Aguilar  |NULL          |
|Eddie      |Mendez   |              |
|Theresa    |Blackwell|NULL          |
|Charles    |Robles   |              |
|Jack       |Hill     |NULL          |
|Christopher|Castaneda|NULL          |
|Ronald     |Carson   |NULL          |
|Lisa       |Mendez   |NULL          |
|Nathan     |Thomas   |eeeee         |
|Justin     |Gay      |NULL          |
|Kenneth    |Robinson |Robinson NOOOO|
+-----------+---------+--------------+
only showing top 20 rows



In [None]:
df

In [16]:
# spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [46]:
# df_pd = df_dt_converted.toPandas()