## Congestion Zone Latent Dates

### Notebook 2/7

### By Gabriel del Valle 
### NYC DATA SCIENCE ACADEMY
### 07/21/24

#### Fill missing rows for each unique combination of street per datetime, 2016 - 2019, OpenNYC Automated Traffic Counts dataset, Manhattan below 60th street.


This notebook was made on the platform Microsoft Azure -- Synapse Analytics.

It utilizes the distributed computing language Apache pySpark, as well as an Apache spark
pool which powers the distributed computing.


### Do you need to run this notebook? 

While this process was useful in my investigation for data density to make the public code more integral, this information is only strictly necessary for imputing missing data, which forms the basis for only one of the three animated maps. If you would like access to the imputed data, feel free to reach out on Linkedin:




## Configuration

I used the default recommended configurations for the spark pool: 
- 12 vCores
- 2 executors
- 3 to 10 nodes

For the notebook, also default configurations:
- Executor size: Small (4 vCores, 28GB memory)


## The primary approach to computing new rows for each unique combination of street and datetime interval down to 15 minutes, between 2016 and 2019:

#### 1. Create a dataset of total time intervals to 15 minute precision from 2016 - 2019

#### 2. Compare if each street is already represented with this time interval in intial_data, if not, merge with street data and add to a new dataset: missing_rows

#### 3. Anticipate the October Pattern. Though it was not described in any supplementary info on the OpenNYC Automated Traffic Counts dataset, there is a much higher density of rows per datetime of valid data during October, as this is apparently when data collection was most focused by New York City. The large ratio of missing data after computing the missing dates 


In [1]:
%%pyspark

#file_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/czone_traffic.csv"

initial_data = spark.read.load('abfss://sparkczone@sparkczone.dfs.core.windows.net/czone_traffic.csv', format='csv', header=True)

StatementMeta(computeSmall, 10, 2, Finished, Available)

In [2]:
initial_data = initial_data.drop('NumericValue')
initial_data = initial_data[initial_data['Yr'] <= 2019]

initial_data = initial_data.withColumnRenamed("Yr", "year") \
                           .withColumnRenamed("M", "month") \
                           .withColumnRenamed("D", "day") \
                           .withColumnRenamed("HH", "hour") \
                           .withColumnRenamed("MM", "minute")

display(initial_data.limit(10))

StatementMeta(computeSmall, 10, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, e4dfe2fd-ce70-4148-ac00-823a2d3e0f45)

In [3]:
import calendar

StatementMeta(computeSmall, 10, 4, Finished, Available)

In [22]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf, explode, lit
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, ArrayType

StatementMeta(computeSmall, 10, 23, Finished, Available)

In [5]:
years = [2016, 2017, 2018, 2019]
months = [i for i in range(1, 13)]
hours = [i for i in range(1, 24)]
minutes = [0, 15, 30, 45]

years_df = spark.createDataFrame([(year,) for year in years], ["year"])
months_df = spark.createDataFrame([(month,) for month in months], ["month"])

year_month_df = years_df.crossJoin(months_df)

StatementMeta(computeSmall, 10, 6, Finished, Available)

In [6]:
def days_array(year, month):
    mlen = calendar.monthrange(year, month)[1]
    return [i for i in range(1, mlen+1)]

StatementMeta(computeSmall, 10, 7, Finished, Available)

In [7]:
days_array_udf = udf(days_array, ArrayType(IntegerType()))


days_df = year_month_df.withColumn("days", days_array_udf("year", "month"))
#Initially the list output of days_array_udf() is inerpreted as a string,
#Explode can be used to turn the list string into a full distribution of combinaitons

# Use explode to expand the days array into separate rows
days_df = days_df.withColumn("day", explode("days"))

# Drop the original 'days' array column if no longer needed
days_df = days_df.drop("days")

StatementMeta(computeSmall, 10, 8, Finished, Available)

In [8]:
days_df = days_df.orderBy("year", "month", "day")
#display(days_df)

StatementMeta(computeSmall, 10, 9, Finished, Available)

In [9]:
hours_df = spark.createDataFrame([(hour,) for hour in hours], ["hour"])

days_hours_df = days_df.crossJoin(hours_df).orderBy("year", "month", "day", "hour")

#display(days_hours_df)

StatementMeta(computeSmall, 10, 10, Finished, Available)

In [10]:
minutes_df = spark.createDataFrame([(minute,) for minute in minutes], ["minute"])

full_dates = days_hours_df.crossJoin(minutes_df).orderBy("year", "month", "day", "hour", "minute")

StatementMeta(computeSmall, 10, 11, Finished, Available)

### Now that the dataframe full_dates has been created with all time intervals between 2016 and 2019 down to the 15 minute interval. 



.



### We can compare to the dates per street in our traffic data initial_data by creating an additional dataset full_streets_df which contains all combinations of date and street.

.



### We will compile a dataset of missing dates per street called missing_dates

In [11]:
streets_df = initial_data.select("street").distinct()
full_streets_df = full_dates.crossJoin(streets_df)

StatementMeta(computeSmall, 10, 12, Finished, Available)

In [12]:
missing_dates = full_streets_df.join(
    initial_data,
    on=["street", "year", "month", "day", "hour", "minute"],
    how="left_anti"
)

StatementMeta(computeSmall, 10, 13, Finished, Available)

Total unique dates:  140256

Total unique streets:  224

Intervals per street:  31417344

Missing data len:  30815985

Initial data len:  1565977

In [74]:
full_dates_count = full_dates.count()
print("Total unique dates: ", full_dates_count)

streets_df_count = streets_df.count()
print("Total unique streets: ", streets_df_count)

full_streets_df_count = full_streets_df.count()
print("Intervals per street: ", full_streets_df_count)

missing_dates_count = missing_dates.count()
print("Missing data len: ", missing_dates_count)

initial_data_count = initial_data.count()
print("Initial data len: ", initial_data_count)

StatementMeta(computeSmall, 9, 62, Finished, Available)

Total unique dates:  140256
Total unique streets:  224
Intervals per street:  31417344
Missing data len:  30815985
Initial data len:  1565977


#### Summary

Missing data len + initial data len = 32,381,962

The number being greater than Intervals per street represents that in initial_data there are often multiple rows per street per datetime.

Diff = 964,618


##### Next task is to expand missing_dates with columns so that it can be merged with initial_data. Also, convert both to datetime format

In [13]:
initial_data.columns

StatementMeta(computeSmall, 10, 14, Finished, Available)

['RequestID',
 'Boro',
 'year',
 'month',
 'day',
 'hour',
 'minute',
 'Vol',
 'SegmentID',
 'WktGeom',
 'street',
 'fromSt',
 'toSt',
 'Direction']

In [16]:
missing_datesCol = missing_dates.withColumn("RequestID", lit(None)) \
                             .withColumn("Boro", lit("Manhattan")) \
                             .withColumn("Vol", lit(None)) \
                             .withColumn("SegmentID", lit(None)) \
                             .withColumn("WktGeom", lit(None)) \
                             .withColumn("fromSt", lit(None)) \
                             .withColumn("toSt", lit(None)) \
                             .withColumn("Direction", lit(None))

missing_datesCol = missing_datesCol.select(
['RequestID',
 'Boro',
 'year',
 'month',
 'day',
 'hour',
 'minute',
 'Vol',
 'SegmentID',
 'WktGeom',
 'street',
 'fromSt',
 'toSt',
 'Direction'])

missing_datesCol.columns

StatementMeta(computeSmall, 10, 17, Finished, Available)

['RequestID',
 'Boro',
 'year',
 'month',
 'day',
 'hour',
 'minute',
 'Vol',
 'SegmentID',
 'WktGeom',
 'street',
 'fromSt',
 'toSt',
 'Direction']

In [19]:
czone_fulldata = initial_data.unionByName(missing_datesCol)

StatementMeta(computeSmall, 10, 20, Finished, Available)

Result of the following (time intensive) computation:

Combined data len:  32381962

This is the same number predicted by initial_data len + missing_data len, showing the union was successful.


In [21]:
combined_count = czone_fulldata.count()
print("Combined data len: ", combined_count)

StatementMeta(computeSmall, 10, 22, Finished, Cancelled)

#### Now czone_fulldata contains both the initial_data (which contained no missing rows) and a much larger set of NA rows indicating all the time intervals per street that a recording was not made. We have thus imputed the missing data.

.


## The next step is to examine the density of the dataset across different dates. 

#### With the ratio of missing data to recorded data being so high, it would compromise or severely limit the validity of our data, if we were not able to find a pattern, allowing us to zoom in on the data at specific times to see a much more concentrated data timeframe.

#### I used rows per datetime across various time frames as a way to look for such patterns, and discovered that the vast majority of recordings were made in October across all years. Thus, the majority of the missing data was within the other 11 months of the year.

#### This is a very convenient discovery for many reasons, but especially to the context of having needed this Synapse notebook to compute this missing data:

#### By focusing on October it allows us to greatly shrink the dataset to a size where it is workable from a macbook.

In [26]:
monthly_rows = initial_data.groupBy("month").agg(F.count("*").alias("count"))

monthly_rows = monthly_rows.orderBy("count", ascending = False)

monthly_rows.show()

StatementMeta(computeSmall, 10, 27, Finished, Available)

+-----+------+
|month| count|
+-----+------+
|   10|714603|
|   11|220248|
|    6|178142|
|    1|136288|
|    2| 96666|
|    3| 71141|
|    5| 43887|
|    4| 38303|
|   12| 30243|
|    9| 23889|
|    7| 10263|
|    8|  2304|
+-----+------+



The count from above can now be put in context with comparison to the missing data per month.

In [28]:
monthly_mrows = missing_dates.groupBy('month').agg(F.count("*").alias("Mcount"))

monthly_rows2 = monthly_rows.join(
    monthly_mrows, 
    monthly_rows.month == monthly_mrows.month, 
    "inner"  
).orderBy("Mcount", ascending = True)


monthly_rows2.show()

StatementMeta(computeSmall, 10, 29, Finished, Available)

+-----+------+-----+-------+
|month| count|month| Mcount|
+-----+------+-----+-------+
|    7| 10263|    7|2661780|
|    6|178142|    6|2501692|
|    9| 23889|    9|2563015|
|    5| 43887|    5|2644125|
|    1|136288|    1|2633786|
|   10|714603|   10|2417062|
|    3| 71141|    3|2644629|
|   12| 30243|   12|2651257|
|    8|  2304|    8|2664840|
|   11|220248|   11|2480853|
|    2| 96666|    2|2394551|
|    4| 38303|    4|2558395|
+-----+------+-----+-------+



Knowing of the October Pattern, we can prepare the dataset for export by:

- Creating an October and NonOctober version of the data
- sorting the data

Further alterations and explorations may more easily done in normal python once the datasets are a workable size, including:

- Add a datetime column
- Display density of data vs missing data

These operations will follow in the next notebook, which will be back to Python.

In [31]:
czone_October = czone_fulldata.filter(czone_fulldata['month'] == 10)
czone_notOctober = czone_fulldata.filter(czone_fulldata['month'] != 10)

StatementMeta(computeSmall, 10, 32, Finished, Available)

In [32]:
sorted_October = czone_October.orderBy('year', 'month', 'day', 'hour', 'minute')
sorted_notOctober = czone_notOctober.orderBy('year', 'month', 'day', 'hour', 'minute')

StatementMeta(computeSmall, 10, 33, Finished, Available)

In [34]:
#recombine data from distributed computed nodes to make single data entities for export

single_October = sorted_October.repartition(1)
single_notOctober = sorted_notOctober.repartition(1)

StatementMeta(computeSmall, 10, 35, Finished, Available)

In [35]:
october_csv_path = "czone_October.csv"
not_october_csv_path = "czone_notOctober.csv"

single_October.write.format("csv").option("header", "true").mode("overwrite").save(october_csv_path)
single_notOctober.write.format("csv").option("header", "true").mode("overwrite").save(not_october_csv_path)

StatementMeta(computeSmall, 10, 36, Finished, Available)