__Libraries__

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

In [2]:
from geopy.geocoders import Nominatim
import geopy
from geopy.extra.rate_limiter import RateLimiter
geopy.geocoders.options.default_user_agent = "BDBA"
from tqdm import tqdm

__Open Environment__

In [3]:
sc = SparkContext(appName="SparkforEachRDDapp")
spark = SparkSession(sc)

__Read Data__

In [4]:
vehic= spark.read.json('vehicles.json')
vehic.show()

+-------------+-------------+------------------+-------------------+----------+---------------+
|accelerometer| datetime_utc|          latitude|          longitude|vehicle_id|   vehicle_type|
+-------------+-------------+------------------+-------------------+----------+---------------+
|          166|1.588490942E9| 40.57007465863976|-3.6071938480844157|  9263 QVR|           taxi|
|          133|1.588489965E9| 40.44546287335486|-3.5125486436404882|  2895 YDK|            bus|
|          162|1.588490498E9| 40.30815660225567| -3.909501918887368|  6030 ZFZ|private_vehicle|
|            8|1.588490703E9| 40.47042101938655|-3.4346389045623913|  9731 WXJ|          truck|
|           56|1.588490768E9| 40.41651432933575| -3.916999450399277|  3719 XBS|            bus|
|           13|1.588490117E9| 40.42268107917671| -3.482537435543491|  8147 BPF|private_vehicle|
|          119|1.588490066E9| 40.55716134032274| -3.633252828413418|  5823 WMS|            bus|
|          164| 1.58848948E9| 40.4421168

In [5]:
roads= spark.read.json('roads.json')
roads.show()

+------------+---------+------------+--------------------+
|has_bus_lane|min_speed|number_lanes|           road_name|
+------------+---------+------------+--------------------+
|           0|       25|           1|                 A-3|
|           0|       50|           4|               AP-41|
|           0|       60|           1|Acceso Parque de ...|
|           1|       50|           3|Acceso a la Colon...|
|           1|       60|           1|    Acequia Quintano|
|           0|       25|           1|Aeropuerto T1 Sal...|
|           1|       60|           2|Aeropuerto T4 Lle...|
|           1|       25|           3|Aeropuerto T4 Lle...|
|           1|       25|           2|Anillo Verde Cicl...|
|           0|       25|           1|       Antigua M-111|
|           0|       50|           3|       Antigua M-506|
|           0|       60|           4|Autopista Radial R-2|
|           1|       25|           2|Autopista radial ...|
|           1|       25|           3|Autovía Villavici..

**Get Road Names**

In [6]:
def road_from_coord(lat, lon):
    coordinates= str(lat)+ ','+str(lon)
    #10 mins maximum timeout to prevent being blocked
    locator = Nominatim(timeout=10)
    rgeocode = RateLimiter(locator.reverse, min_delay_seconds=0.001)
    location = rgeocode(coordinates)
    return(location.raw['address']['road'])

In [7]:
road_names = []
for i in tqdm(range(vehic.count())):
    road_names.append(road_from_coord(vehic.collect()[i]['latitude'],vehic.collect()[i]['longitude']))

100%|██████████| 4646/4646 [38:42<00:00,  2.00it/s]


__Add them to the df__

In [8]:
row = Row("road_name")
rdd = spark.sparkContext.parallelize(road_names)
rf=rdd.map(row).toDF()

vehic=vehic.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
rf=rf.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

vehic = vehic.join(rf, ["row_index"],how="inner").drop("row_index")

In [9]:
vehic.show()

+-------------+-------------+------------------+-------------------+----------+---------------+--------------------+
|accelerometer| datetime_utc|          latitude|          longitude|vehicle_id|   vehicle_type|           road_name|
+-------------+-------------+------------------+-------------------+----------+---------------+--------------------+
|          166|1.588490942E9| 40.57007465863976|-3.6071938480844157|  9263 QVR|           taxi|Avenida Puente Cu...|
|          133|1.588489965E9| 40.44546287335486|-3.5125486436404882|  2895 YDK|            bus|Plaza de los Plan...|
|          162|1.588490498E9| 40.30815660225567| -3.909501918887368|  6030 ZFZ|private_vehicle|Autovía del Suroeste|
|            8|1.588490703E9| 40.47042101938655|-3.4346389045623913|  9731 WXJ|          truck|Avenida de los Pr...|
|           56|1.588490768E9| 40.41651432933575| -3.916999450399277|  3719 XBS|            bus|Carretera de Boad...|
|           13|1.588490117E9| 40.42268107917671| -3.482537435543

**Join Both Feeds**

In [10]:
df=vehic.join(roads, ['road_name'], how="inner")

In [11]:
df.show()

+--------------------+-------------+-------------+------------------+-------------------+----------+---------------+------------+---------+------------+
|           road_name|accelerometer| datetime_utc|          latitude|          longitude|vehicle_id|   vehicle_type|has_bus_lane|min_speed|number_lanes|
+--------------------+-------------+-------------+------------------+-------------------+----------+---------------+------------+---------+------------+
|Avenida Puente Cu...|          166|1.588490942E9| 40.57007465863976|-3.6071938480844157|  9263 QVR|           taxi|           1|       50|           1|
|Plaza de los Plan...|          133|1.588489965E9| 40.44546287335486|-3.5125486436404882|  2895 YDK|            bus|           1|       50|           4|
|Autovía del Suroeste|          162|1.588490498E9| 40.30815660225567| -3.909501918887368|  6030 ZFZ|private_vehicle|           0|       60|           3|
|Avenida de los Pr...|            8|1.588490703E9| 40.47042101938655|-3.4346389045

**Define Traffic Location**

If accelerometer < min_speed and count(vehicle_id) > (50*number_lanes) we declare there is traffic

In [23]:
df.createOrReplaceTempView("df")
results = spark.sql("select road_name, count(vehicle_id) from df \
                    where accelerometer < min_speed \
                    group by road_name, number_lanes, has_bus_lane\
                    having count(vehicle_id) > (50*(number_lanes-has_bus_lane))")
results.show()

+--------------------+-----------------+
|           road_name|count(vehicle_id)|
+--------------------+-----------------+
|  Camino del Espinar|                3|
|Carretera Particu...|                2|
|Avenida de la Pes...|                1|
| Calle Enrique Casas|                1|
|Calle de Valdemor...|                1|
|                M-50|               28|
| Calle de la Romería|                1|
|   Avenida de Burgos|                3|
|               M-301|                6|
| Cañada de Matamuñoz|                4|
|Carretera camino ...|                3|
|   Avenida de Marsil|                1|
|Avenida de la Ber...|                2|
|                M-45|               10|
|      Calle Marañosa|                3|
|Calle de Extremadura|                1|
|      Calle Bulgaria|                1|
|   Avenida del Pardo|                3|
|   Avenida de Aragón|                1|
|Calle de Dionisio...|                1|
+--------------------+-----------------+
only showing top

__Download Results__

https://community.cloudera.com/t5/Support-Questions/How-to-save-all-the-output-of-pyspark-sql-query-into-a-text/td-p/204560

In [20]:
#results.coalesce(1).write.csv("results_6000.csv")
results.write.csv("td_results_6000.csv")

__Open Results__

In [27]:
import pandas as pd
myres = pd.read_csv("td_results_6000.csv", names=['road_name', 'number_vehicles'])

In [28]:
myres

Unnamed: 0,road_name,number_vehicles
0,Camino del Espinar,3
1,Carretera Particular de la Zarzuela,2
2,Avenida de la Pesadilla,1
3,Calle Enrique Casas,1
4,Calle de Valdemorillo,1
...,...,...
84,enlace con M-40,1
85,Calle XX,1
86,Camino del Esparragal,1
87,Calle de los Morales,1
