# Libary Imports

In [1]:
import sys



sys.path.append("../utils")
from spark_functions import create_SparkSession



from logging_functions import inject_logging



from string_functions import to_snake_case
from pyspark.sql.functions import col, to_timestamp



spark = create_SparkSession()

# Load Data
## Camera Traffic Counts

In [None]:
base_path = "../data"


camera_traffic = spark.read.option("header", True).csv(
    f"{base_path}/Camera_Traffic_Counts_20250125.csv"
)
for col_ in camera_traffic.columns:

    camera_traffic = camera_traffic.withColumnRenamed(col_, to_snake_case(col_))

camera_traffic.limit(10).show()

+--------------------+-------------+--------------------+------------------+----------+----------+-------------+------+----------------------------+-------------+-----------------------+-----------------------+-----+---+----+----+------+-----------+--------------------+
|           record_id|atd_device_id|           read_date| intersection_name| direction|  movement|heavy_vehicle|volume|speed_average_miles_per_hour|speed_std_dev|seconds_in_zone_average|seconds_in_zone_std_dev|month|day|year|hour|minute|day_of_week|bin_duration_seconds|
+--------------------+-------------+--------------------+------------------+----------+----------+-------------+------+----------------------------+-------------+-----------------------+-----------------------+-----+---+----+----+------+-----------+--------------------+
|f6d2caebed1dc902b...|         7047|03/11/2020 02:30:...|MANOR RD / 51ST ST|NORTHBOUND| LEFT TURN|        false|    14|                         8.5|         2.21|                 12.407| 

In [40]:
camera_traffic.limit(10).select("read_date").toPandas().iloc[0]

read_date    03/11/2020 02:30:00 PM
Name: 0, dtype: object

## Radar Traffic Counts

In [50]:
radar_traffic = spark.read.option("header", True).csv(
    f"{base_path}/Radar_Traffic_Counts_20250125.csv"
)
for col_ in radar_traffic.columns:
    radar_traffic = radar_traffic.withColumnRenamed(col_, to_snake_case(col_))

# radar_traffic = radar_traffic.withColumn(
#     "read_date", to_timestamp(col("read_date"), "MM/dd/yyyy hh:mm:ss a")
# )
radar_traffic.limit(10).show()

+--------------------+-----------+-------+--------------------+--------------------+--------+------+---------+-----+-----+---+----+----+------+-----------+--------+---------+
|              row_id|detector_id|kits_id|           read_date|   intersection_name|    lane|volume|occupancy|speed|month|day|year|hour|minute|day_of_week|time_bin|direction|
+--------------------+-----------+-------+--------------------+--------------------+--------+------+---------+-----+-----+---+----+----+------+-----------+--------+---------+
|9794d3655e0572eb8...|         74|     19|01/24/2018 04:15:...|         KINNEYLAMAR|   Lane1|     4|        0|   18|    1| 23|2018|  22|    15|          2|   22:15|     None|
|1f51f4e6a68297c9f...|         92|     24|12/17/2017 01:45:...|    LOOP 360LAKEWOOD|  NB_out|   103|        6|   40|   12| 16|2017|  19|    45|          6|   19:45|       NB|
|9de27c4292306d305...|         10|      3|01/01/156489 12:0...|     LAMARSHOALCREEK|  SB_out|    20|        1|   33|    8|  3

In [None]:
from pyspark.sql.functions import col, expr, lit

# Define regex pattern to find proper dates
regex_pattern = r"^\\d{2}/\\d{2}/\\d{4} \\d{2}:\\d{2}:\\d{2} [AP]M$"

# Add a new column for valid dates
radar_traffic = radar_traffic.withColumn(
    "is_valid_date", expr(f"regexp_like(read_date, '{regex_pattern}')")
)

radar_traffic.where(~col("is_valid_date")).show()

# Show results
# radar_traffic.select("read_date", "is_valid_date").show(truncate=False)

+--------------------+-----------+-------+--------------------+-----------------+------+------+---------+-----+-----+---+----+----+------+-----------+--------+---------+-------------+
|              row_id|detector_id|kits_id|           read_date|intersection_name|  lane|volume|occupancy|speed|month|day|year|hour|minute|day_of_week|time_bin|direction|is_valid_date|
+--------------------+-----------+-------+--------------------+-----------------+------+------+---------+-----+-----+---+----+----+------+-----------+--------+---------+-------------+
|9de27c4292306d305...|         10|      3|01/01/156489 12:0...|  LAMARSHOALCREEK|SB_out|    20|        1|   33|    8|  3|2019|  23|    15|          6|   23:15|       SB|        false|
+--------------------+-----------+-------+--------------------+-----------------+------+------+---------+-----+-----+---+----+----+------+-----------+--------+---------+-------------+



In [88]:
temp = (
    radar_traffic.where((col("is_valid_date")) & (col("detector_id") == lit("10")))
    .sort("read_date")
    .toPandas()
)
temp[(temp.month == "8") & (temp.year == "2019" & (temp.hiour))]

Unnamed: 0,row_id,detector_id,kits_id,read_date,intersection_name,lane,volume,occupancy,speed,month,day,year,hour,minute,day_of_week,time_bin,direction,is_valid_date
9042,96a9ecd4f3762ef2738d3893f13a29a5,10,3,01/19/1970 02:38:11 AM,LAMARSHOALCREEK,SB_out,150,8,33,8,1,2019,15,31,4,15:30,SB,True
9043,e2da7f44124e7fa6d5d026ba53652a93,10,3,01/19/1970 02:38:12 AM,LAMARSHOALCREEK,SB_out,150,8,33,8,1,2019,15,45,4,15:45,SB,True
9044,4798e6b1afa66bc31b67145451879bd4,10,3,01/19/1970 02:38:13 AM,LAMARSHOALCREEK,SB_out,143,7,34,8,1,2019,16,0,4,16:00,SB,True
9045,abdca3d62f9eb286d4d256f9860fb038,10,3,01/19/1970 02:38:14 AM,LAMARSHOALCREEK,SB_out,177,9,34,8,1,2019,16,15,4,16:15,SB,True
9046,1b7d95a42ad061d3a0a9a1f57f152d66,10,3,01/19/1970 02:38:15 AM,LAMARSHOALCREEK,SB_out,139,7,0,8,1,2019,16,45,4,16:45,SB,True
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
11632,cc5947b5488e6a0f121710e8df42bb2f,10,3,01/19/1970 03:21:49 AM,LAMARSHOALCREEK,SB_out,43,2,32,8,31,2019,22,45,6,22:45,SB,True
11633,a0bc4043db39395fa08777909f838846,10,3,01/19/1970 03:21:50 AM,LAMARSHOALCREEK,SB_out,41,2,32,8,31,2019,23,0,6,23:00,SB,True
11634,357797d70d3c2c6af27fccc910152f51,10,3,01/19/1970 03:21:51 AM,LAMARSHOALCREEK,SB_out,38,2,31,8,31,2019,23,15,6,23:15,SB,True
11635,2cd984316fe89402cd45543733b5bc37,10,3,01/19/1970 03:21:52 AM,LAMARSHOALCREEK,SB_out,39,2,33,8,31,2019,23,30,6,23:30,SB,True


In [76]:
import re

regex_pattern = r"^\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2} [AP]M$"
test_data = [
    "01/24/2018 04:15:01 AM",
    "12/17/2017 01:45:00 AM",
    "04/03/2020 11:15:00 PM",
]

for date in test_data:
    print(f"{date}: {bool(re.match(regex_pattern, date))}")

01/24/2018 04:15:01 AM: True
12/17/2017 01:45:00 AM: True
04/03/2020 11:15:00 PM: True


In [71]:
temp = radar_traffic.select("read_date").limit(10).toPandas()
temp["read_date"].iloc[1]

'12/17/2017 01:45:00 AM'

In [49]:
invalid_rows = radar_traffic.filter(
    ~col("read_date").rlike(r"^\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2} [AP]M$")
)
invalid_rows.show(truncate=False)

Py4JJavaError: An error occurred while calling o771.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 70.0 failed 1 times, most recent failure: Lost task 0.0 in stage 70.0 (TID 2420) (Bravo-PC.mshome.net executor driver): org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to parse '01/01/156489 12:00:00 AM' in the new parser. You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, or set to "CORRECTED" and treat it as an invalid datetime string.
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError(ExecutionErrors.scala:54)
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError$(ExecutionErrors.scala:48)
	at org.apache.spark.sql.errors.ExecutionErrors$.failToParseDateTimeInNewParserError(ExecutionErrors.scala:218)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:142)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:135)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:195)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: java.time.format.DateTimeParseException: Text '01/01/156489 12:00:00 AM' could not be parsed at index 6
	at java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2106)
	at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1934)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:193)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4334)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4324)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4322)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4322)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at jdk.internal.reflect.GeneratedMethodAccessor75.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to parse '01/01/156489 12:00:00 AM' in the new parser. You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, or set to "CORRECTED" and treat it as an invalid datetime string.
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError(ExecutionErrors.scala:54)
	at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError$(ExecutionErrors.scala:48)
	at org.apache.spark.sql.errors.ExecutionErrors$.failToParseDateTimeInNewParserError(ExecutionErrors.scala:218)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:142)
	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:135)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:195)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more
Caused by: java.time.format.DateTimeParseException: Text '01/01/156489 12:00:00 AM' could not be parsed at index 6
	at java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2106)
	at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1934)
	at org.apache.spark.sql.catalyst.util.Iso8601TimestampFormatter.parse(TimestampFormatter.scala:193)
	... 20 more


## Traffic Detectors

In [9]:
traffic_detectors = spark.read.option("header", True).csv(
    f"{base_path}/Traffic_Detectors_20250125.csv"
)
for col_ in traffic_detectors.columns:
    traffic_detectors = traffic_detectors.withColumnRenamed(col_, to_snake_case(col_))
traffic_detectors.limit(10).show()

+-----------+-------------+---------------+------------------+-----------------+--------------------+---------------+---------+--------------------+--------------------+--------------+------------------------+-----------------+------------------+--------------------+
|detector_id|detector_type|detector_status|detector_direction|detector_movement|       location_name|atd_location_id|signal_id|        created_date|       modified_date|ip_comm_status|comm_status_datetime_utc|location_latitude|location_longitude|            location|
+-----------+-------------+---------------+------------------+-----------------+--------------------+---------------+---------+--------------------+--------------------+--------------+------------------------+-----------------+------------------+--------------------+
|        257|        VIDEO|             OK|        NORTHBOUND|        THRU ONLY| 12700 BLK DESSAU RD|   LOC16-004035|      820|                NULL|10/07/2024 09:31:...|          NULL|            

## Individual Address Files

In [None]:
IAFs = spark.read.option("header", True).csv(
    f"{base_path}/Bluetooth_Travel_Sensors_-_Individual_Address_Files__IAFs_.csv"
)
for col_ in IAFs.columns:
    IAFs = IAFs.withColumnRenamed(col_, to_snake_case(col_))
# Convert string to timestamp
IAFs = IAFs.withColumn(03/11/2020 02:30:00 PM
    "host_read_time", to_timestamp(col("host_read_time"), "MM/dd/yyyy hh:mm:ss a")
)
IAFs = IAFs.withColumn(
    "field_device_read_time",
    to_timestamp(col("field_device_read_time"), "MM/dd/yyyy hh:mm:ss a"),
)


IAFs.limit(10).show()

+--------------------+-------------------+----------------------+-----------------+--------------+
|           record_id|     host_read_time|field_device_read_time|reader_identifier|device_address|
+--------------------+-------------------+----------------------+-----------------+--------------+
|e60fa504f56592882...|2019-08-28 08:08:15|   2019-08-28 08:08:24|     lamar_morrow|          5444|
|6ccf9bc09f94adcf6...|2019-08-28 08:08:25|   2019-08-28 08:08:35|     lamar_morrow|         37197|
|8c92a42d206a0c867...|2019-08-28 08:08:36|   2019-08-28 08:08:46|     lamar_morrow|          7947|
|b141f5991a32e5874...|2019-08-28 08:09:08|   2019-08-28 08:09:18|     lamar_morrow|          1701|
|9a945c4a0553c1e4e...|2019-08-28 08:09:12|   2019-08-28 08:09:21|     lamar_morrow|           278|
|0c009feda4adb5440...|2019-08-28 08:09:19|   2019-08-28 08:09:29|     lamar_morrow|         37198|
|26ab5aaf898ed8da9...|2019-08-28 08:09:24|   2019-08-28 08:09:34|     lamar_morrow|         24651|
|d8c8c7546

In [24]:
StructField("record_id", StringType(), False)

StructField('record_id', StringType(), False)

In [17]:
IAFs.count()

412529530

In [18]:
IAFs.select("record_id").distinct().count()

412529530

In [19]:
IAFs.select("device_address").distinct().count()

11211525

In [20]:
IAFs.select("reader_identifier").distinct().count()

147

In [33]:
temp = IAFs.select("host_read_time", "field_device_read_time").limit(10).toPandas()
temp.iloc[0]

host_read_time            08/28/2019 08:08:15 AM
field_device_read_time    08/28/2019 08:08:24 AM
Name: 0, dtype: object

## Individual Traffic Match Files

In [15]:
ITMF = spark.read.option("header", True).csv(
    f"{base_path}/Bluetooth_Travel_Sensors_-_Individual_Traffic_Match_Files__ITMF_.csv"
)
for col_ in ITMF.columns:
    ITMF = ITMF.withColumnRenamed(col_, to_snake_case(col_))
ITMF.limit(10).show()

+--------------------+--------------+------------------------+-----------------------------+-------------------+--------------------+--------------+-----------------+-------------------+-------------------+-----------+
|           record_id|device_address|origin_reader_identifier|destination_reader_identifier|travel_time_seconds|speed_miles_per_hour|match_validity|filter_identifier|         start_time|           end_time|day_of_week|
+--------------------+--------------+------------------------+-----------------------------+-------------------+--------------------+--------------+-----------------+-------------------+-------------------+-----------+
|cdac7d191483cacf2...|           706|              lamar_45th|                   lamar_38th|                 50|                  47|         valid|              125|2019-08-17T23:28:56|2019-08-17T23:29:46|   Saturday|
|6e2a448e185b9742c...|           707|              lamar_45th|                   lamar_38th|                 58|            

## Traffic Match Summary Records

In [13]:
TMSR = spark.read.option("header", True).csv(
    f"{base_path}/Bluetooth_Travel_Sensors_-Traffic_Match_Summary_Records__TMSR__20250125.csv"
)
for col_ in TMSR.columns:
    TMSR = TMSR.withColumnRenamed(col_, to_snake_case(col_))
TMSR.limit(10).show()

+--------------------+------------------------+-----------------------------+--------------+-------------------+----------------+-------------------+------------------------+---------------------+--------------------+--------------------+---------------------------+-----------------+------------------------+--------------+------------------+
|           record_id|origin_reader_identifier|destination_reader_identifier|origin_roadway|origin_cross_street|origin_direction|destination_roadway|destination_cross_street|destination_direction|segment_length_miles|           timestamp|average_travel_time_seconds|average_speed_mph|summary_interval_minutes|number_samples|standard_deviation|
+--------------------+------------------------+-----------------------------+--------------+-------------------+----------------+-------------------+------------------------+---------------------+--------------------+--------------------+---------------------------+-----------------+------------------------+---

In [11]:
spark.stop()