<a href="https://colab.research.google.com/github/abelsare348/codes/blob/pyspark/Office/2024_09_23.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=353d7c61735ab4ebc8de75027783b55f24a97cb8a71d202fd45160274ff67732
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [3]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [5]:
spark=SparkSession.builder.appName('spark_json').getOrCreate()

In [6]:
anpr_df=spark.read.format("json") \
          .option("multiLine", True) \
          .option("header",True) \
          .option("inferschema",True) \
          .load("/content/20240825T0000_Axis_ANPR.json")

In [19]:
from pyspark.sql.functions import explode, col

# Explode the array column `Data Axis_ANPR`
df_exploded = anpr_df.withColumn("exploded_ANPR", explode(col("Data Axis_ANPR")))

# Access the nested fields in the exploded struct
anpr_df_flattened = df_exploded.select(
    col("exploded_ANPR.events.currentTimestampUs").alias("currentTimestampUs"),
    col("exploded_ANPR.events.vcount.direction.in").alias("direction_in"),
    col("exploded_ANPR.events.vcount.direction.out").alias("direction_out"),
    col("exploded_ANPR.events.vcount.direction.unk").alias("direction_unk"),
    col("exploded_ANPR.events.vcount.list.blocklist").alias("blocklist"),
    col("exploded_ANPR.events.vcount.list.allowlist").alias("allowlist"),
    col("exploded_ANPR.events.vcount.list.customlist").alias("customlist"),
    col("exploded_ANPR.events.vcount.list.nonelist").alias("nonelist"),
    col("exploded_ANPR.events.vcount.roi.roi1").alias("roi1"),
    col("exploded_ANPR.events.vcount.roi.roi2").alias("roi2"),
    col("exploded_ANPR.events.vcount.total").alias("total")
)

# Show the flattened DataFrame
anpr_df_flattened.show(truncate=False)

+------------------+------------+-------------+-------------+---------+---------+----------+--------+----+----+-----+
|currentTimestampUs|direction_in|direction_out|direction_unk|blocklist|allowlist|customlist|nonelist|roi1|roi2|total|
+------------------+------------+-------------+-------------+---------+---------+----------+--------+----+----+-----+
|1724536805978920  |0           |0            |0            |0        |0        |0         |0       |0   |0   |0    |
|1724536816287082  |0           |0            |0            |0        |0        |0         |0       |0   |0   |0    |
|1724536826685860  |0           |0            |0            |0        |0        |0         |0       |0   |0   |0    |
|1724536837306731  |0           |0            |0            |0        |0        |0         |0       |0   |0   |0    |
|1724536847957683  |0           |0            |0            |0        |0        |0         |0       |0   |0   |0    |
|1724536858725779  |0           |0            |0        

In [30]:
from pyspark.sql.functions import col, from_unixtime, to_utc_timestamp, from_utc_timestamp

In [33]:
anpr_df_final= anpr_df_flattened.withColumn("currentTimestampUs",
                                            from_unixtime(col("currentTimestampUs") / 1000000))

# Remove timezone from cam1_df_a
anpr_df_final = anpr_df_final.withColumn(
    "currentTimestampUs",
    from_utc_timestamp(col("currentTimestampUs"), "UTC+02:00")
)

In [34]:
anpr_df_final.show()

+-------------------+------------+-------------+-------------+---------+---------+----------+--------+----+----+-----+
| currentTimestampUs|direction_in|direction_out|direction_unk|blocklist|allowlist|customlist|nonelist|roi1|roi2|total|
+-------------------+------------+-------------+-------------+---------+---------+----------+--------+----+----+-----+
|2024-08-25 00:00:05|           0|            0|            0|        0|        0|         0|       0|   0|   0|    0|
|2024-08-25 00:00:16|           0|            0|            0|        0|        0|         0|       0|   0|   0|    0|
|2024-08-25 00:00:26|           0|            0|            0|        0|        0|         0|       0|   0|   0|    0|
|2024-08-25 00:00:37|           0|            0|            0|        0|        0|         0|       0|   0|   0|    0|
|2024-08-25 00:00:47|           0|            0|            0|        0|        0|         0|       0|   0|   0|    0|
|2024-08-25 00:00:58|           0|            0|

In [24]:
DBA_df=spark.read.format("json") \
          .option("multiLine", True) \
          .option("header",True) \
          .option("inferschema",True) \
          .load("/content/20240825T0000_DBA data.json")

In [25]:
from pyspark.sql.functions import explode, col

# Explode the outermost array `Data DBA data`
df_exploded_1 = DBA_df.withColumn("exploded_DBA_data", explode(col("Data DBA data")))

# Explode the inner array inside `exploded_DBA_data`
df_exploded_2 = df_exploded_1.withColumn("exploded_element", explode(col("exploded_DBA_data")))

# Select the fields from the exploded structure
df_flattened = df_exploded_2.select(
    col("exploded_element.id").alias("id"),
    col("exploded_element.resultType").alias("resultType"),
    col("exploded_element.timestamp").alias("timestamp"),
    col("exploded_element.locations").alias("locations"),
    col("exploded_element.values").alias("values")
)

# To further explode the `locations` array
df_locations_exploded = df_flattened.withColumn("exploded_locations", explode(col("locations")))

# Select the individual x, y, z values from the exploded locations
DBA_df_final = df_locations_exploded.select(
    col("id"),
    col("resultType"),
    col("timestamp"),
    col("exploded_locations.x").alias("location_x"),
    col("exploded_locations.y").alias("location_y"),
    col("exploded_locations.z").alias("location_z"),
    col("values").alias("values")
)

# Show the final flattened DataFrame
DBA_df_final.show(truncate=False)

+---+----------+-----------------------------------+-----------------+-----------------+-----------------+--------------------+
|id |resultType|timestamp                          |location_x       |location_y       |location_z       |values              |
+---+----------+-----------------------------------+-----------------+-----------------+-----------------+--------------------+
|1  |Event     |2024-08-25T00:00:43.536273587+02:00|4320424.244834278|565871.5884135148|4642458.985721508|[46.830772399902344]|
|1  |Event     |2024-08-25T00:00:43.536273535+02:00|4320424.244834278|565871.5884135148|4642458.985721508|[46.830772399902344]|
|1  |Event     |2024-08-25T00:00:51.063551509+02:00|4320424.244834278|565871.5884135148|4642458.985721508|[48.511775970458984]|
|1  |Event     |2024-08-25T00:00:51.175898729+02:00|4320424.244834278|565871.5884135148|4642458.985721508|[48.94055938720703] |
|1  |Event     |2024-08-25T00:00:51.288246159+02:00|4320424.244834278|565871.5884135148|4642458.98572150

In [13]:
DBC_df=spark.read.format("json") \
          .option("multiLine", True) \
          .option("header",True) \
          .option("inferschema",True) \
          .load("/content/20240825T0000_DBC data.json")

In [14]:
from pyspark.sql.functions import explode, col

# First explode the outer array `Data DBC data`
df_exploded_1 = DBC_df.withColumn("exploded_DBC_data", explode(col("Data DBC data")))

# Second explode the inner array within `Data DBC data`
df_exploded_2 = df_exploded_1.withColumn("exploded_element", explode(col("exploded_DBC_data")))

# Select the fields from the exploded struct
df_flattened = df_exploded_2.select(
    col("exploded_element.id").alias("id"),
    col("exploded_element.resultType").alias("resultType"),
    col("exploded_element.timestamp").alias("timestamp"),
    col("exploded_element.locations").alias("locations"),
    col("exploded_element.values").alias("values")
)

# Explode the `locations` array to access `x`, `y`, and `z`
df_locations_exploded = df_flattened.withColumn("exploded_locations", explode(col("locations")))

# Select the individual x, y, z values from the exploded locations
DBC_df_final = df_locations_exploded.select(
    col("id"),
    col("resultType"),
    col("timestamp"),
    col("exploded_locations.x").alias("location_x"),
    col("exploded_locations.y").alias("location_y"),
    col("exploded_locations.z").alias("location_z"),
    col("values").alias("values")
)

# Show the final flattened DataFrame
DBC_df_final.show(truncate=False)

+---+----------+-----------------------------------+-----------------+-----------------+-----------------+--------------------+
|id |resultType|timestamp                          |location_x       |location_y       |location_z       |values              |
+---+----------+-----------------------------------+-----------------+-----------------+-----------------+--------------------+
|2  |Event     |2024-08-24T23:59:34.966892963+02:00|4320424.244834278|565871.5884135148|4642458.985721508|[51.28324508666992] |
|2  |Event     |2024-08-24T23:59:35.079240601+02:00|4320424.244834278|565871.5884135148|4642458.985721508|[51.395042419433594]|
|2  |Event     |2024-08-24T23:59:35.191587873+02:00|4320424.244834278|565871.5884135148|4642458.985721508|[50.98021697998047] |
|2  |Event     |2024-08-24T23:59:35.303935353+02:00|4320424.244834278|565871.5884135148|4642458.985721508|[51.33732223510742] |
|2  |Event     |2024-08-24T23:59:35.416282729+02:00|4320424.244834278|565871.5884135148|4642458.98572150

In [35]:
anpr_df_final = anpr_df_final.withColumn(
    "currentTimestampUs",
    to_utc_timestamp(col("currentTimestampUs"), "UTC")
)

# Remove timezone from cam2_df_d
DBA_df_final = DBA_df_final.withColumn(
    "timestamp",
    to_utc_timestamp(col("timestamp"), "UTC")
)

In [39]:
from pyspark.sql.functions import col, abs, unix_timestamp,expr

# Convert the timestamps to unix timestamps (seconds since epoch) for comparison
df_a = anpr_df_final
df_d = DBA_df_final

# Perform a cross join between the two DataFrames (this is needed since we're doing an asof join)
cross_joined_df = df_a.join(df_d, df_a["currentTimestampUs"] >= df_d["timestamp"], "left_outer")

# Calculate the absolute difference between the timestamps
cross_joined_df = cross_joined_df.withColumn("time_diff",
                                             abs(col("currentTimestampUs") - col("timestamp")))

# Filter out rows where the time difference is greater than 3 hours (10800 seconds)
filtered_df = cross_joined_df.filter(expr("time_diff <= interval 10800 seconds"))

# Use windowing to get the nearest match within the 3-hour tolerance
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Window partitioned by events.currentTimestampUs and ordered by the time_diff
window_spec = Window.partitionBy("currentTimestampUs").orderBy(col("time_diff"))

# Add a row number and filter to keep only the first row (the nearest match)
final_df = filtered_df.withColumn("rn", row_number().over(window_spec)).filter(col("rn") == 1)

# Drop the auxiliary columns if no longer needed
final_df = final_df.drop("time_diff", "rn")

In [40]:
final_df.show()

+-------------------+------------+-------------+-------------+---------+---------+----------+--------+----+----+-----+---+----------+--------------------+-----------------+-----------------+-----------------+-------------------+
| currentTimestampUs|direction_in|direction_out|direction_unk|blocklist|allowlist|customlist|nonelist|roi1|roi2|total| id|resultType|           timestamp|       location_x|       location_y|       location_z|             values|
+-------------------+------------+-------------+-------------+---------+---------+----------+--------+----+----+-----+---+----------+--------------------+-----------------+-----------------+-----------------+-------------------+
|2024-08-25 00:00:05|           0|            0|            0|        0|        0|         0|       0|   0|   0|    0|  1|     Event|2024-08-24 22:00:...|4320424.244834278|565871.5884135148|4642458.985721508|[48.01514434814453]|
|2024-08-25 00:00:16|           0|            0|            0|        0|        0|  