In [22]:
!pip install ipympl



In [1]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder.getOrCreate())

In [2]:
import pyspark.sql.functions as F

spark.read.parquet("output/UsefullAPIResults.parquet").orderBy(F.desc("timestamp")).show()

#key - partitioning key
#value - the data, in binary format. This is our JSON payload. We'll need to cast it to STRING.
#topic - the topic we are subscribing to
#partition 
#offset - the offset value. This is per topic, partition, and consumer group
#timestamp - the timestamp
#timestampType - whether timestamp is created time or log append time (by default created time)

+----+--------------------+-----------------+---------+------+--------------------+-------------+
| key|               value|            topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------------+---------+------+--------------------+-------------+
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   299|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   298|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   297|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   296|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   295|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   294|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   293|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 6

In [3]:
spark.read.parquet("output/UsefullAPIResults.parquet/*.parquet").createOrReplaceTempView("vw_UsefullAPIResults")

In [4]:
spark.sql("SELECT * FROM vw_UsefullAPIResults ORDER BY offset desc").show()

+----+--------------------+-----------------+---------+------+--------------------+-------------+
| key|               value|            topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------------+---------+------+--------------------+-------------+
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   299|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   298|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   297|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   296|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   295|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   294|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 63 6...|UsefullAPIResults|        2|   293|2022-08-05 09:01:...|            0|
|null|[7B 22 52 65 6

In [5]:
spark.sql("SELECT CAST(key AS STRING) key, CAST(value AS STRING) value, timestamp AS ts FROM vw_UsefullAPIResults").show(20, 50)

+----+--------------------------------------------------+-----------------------+
| key|                                             value|                     ts|
+----+--------------------------------------------------+-----------------------+
|null|{"RecordedAtTime": "2022-08-05T09:20:19.7140615...|2022-08-05 07:22:06.227|
|null|{"RecordedAtTime": "2022-08-05T09:11:58.706+02:...|2022-08-05 07:22:06.276|
|null|{"RecordedAtTime": "2022-08-05T09:20:13.079+02:...|2022-08-05 07:22:06.303|
|null|{"RecordedAtTime": "2022-08-05T09:21:53.072+02:...|2022-08-05 07:22:06.339|
|null|{"RecordedAtTime": "2022-08-05T08:59:05.8+02:00...|2022-08-05 07:22:06.394|
|null|{"RecordedAtTime": "2022-08-05T09:22:04+02:00",...|2022-08-05 07:22:06.423|
|null|{"RecordedAtTime": "2022-08-05T09:21:17+02:00",...|2022-08-05 07:22:07.004|
|null|{"RecordedAtTime": "2022-08-05T09:21:52.105+02:...|2022-08-05 07:22:07.173|
|null|{"RecordedAtTime": "2022-08-05T09:21:03.128+02:...|2022-08-05 07:22:07.249|
|null|{"Recorded

In [6]:
json_schema = """
STRUCT<
RecordedAtTime: STRING,
VehicleMode: STRING,
OperatorRef: STRING,
VehicleRef: STRING,
ArrivalStatus: STRING,
VehicleAtStop: STRING,
VehicleLatitude: FLOAT,
VehicleLongitude: FLOAT
>
"""

In [7]:
df = spark.sql(f"""
SELECT ts, json.* FROM (
SELECT FROM_JSON(CAST(value AS STRING),'{json_schema}') AS json,
timestamp as ts
FROM vw_UsefullAPIResults
WHERE offset > 30
)
""").toPandas()
df

Unnamed: 0,ts,RecordedAtTime,VehicleMode,OperatorRef,VehicleRef,ArrivalStatus,VehicleAtStop,VehicleLatitude,VehicleLongitude
0,2022-08-05 07:22:08.926,2022-08-05T09:22:03.707+02:00,BUS,160,2133,DELAYED,,63.284897,10.284815
1,2022-08-05 07:22:08.996,2022-08-05T09:19:42.033+02:00,,Unibuss,103081,"{""OnwardCall"":[{""StopPointRef"":{""value"":""NSR:Q...",true,59.895599,10.802183
2,2022-08-05 07:22:09.058,2022-08-05T09:18:46.47+02:00,BUS,80,811222,ON_TIME,,59.000759,9.748670
3,2022-08-05 07:22:09.090,2022-08-05T09:18:25.476+02:00,BUS,70,753259,ON_TIME,,0.000000,0.000000
4,2022-08-05 07:22:09.135,2022-08-05T09:21:10.145+02:00,,Unibuss,103082,"{""OnwardCall"":[{""StopPointRef"":{""value"":""NSR:Q...",false,59.909866,10.795000
...,...,...,...,...,...,...,...,...,...
264,2022-08-05 09:01:45.768,2022-08-05T11:00:49+02:00,BUS,SKY:Operator:45,3350453152,,false,60.355038,5.368106
265,2022-08-05 09:01:45.804,2022-08-05T07:56:54+02:00,BUS,SKY:Operator:45,3350453151,,false,60.355465,5.367959
266,2022-08-05 09:01:45.888,2022-08-05T07:07:04+02:00,BUS,OST:Operator:152,3620502516,,false,59.621181,11.204015
267,2022-08-05 09:01:45.927,2022-08-05T11:01:32+02:00,BUS,SKY:Operator:45,3350453154,,false,60.405590,5.328194


In [8]:
# ### task 1 
df_task1 = df[['ArrivalStatus','VehicleRef']]
df_unique_delayes = df_task1[df_task1['ArrivalStatus'] == 'DELAYED'].drop_duplicates()
print("ALERT!!! THE FOLLOWING BUSSES ARE LATE! OMG!!!!")
print(df_unique_delayes)

ALERT!!! THE FOLLOWING BUSSES ARE LATE! OMG!!!!
    ArrivalStatus      VehicleRef
0         DELAYED            2133
66        DELAYED            1696
69        DELAYED  551_x0020_3433
259       DELAYED            1698


In [9]:
# ### task 3

In [10]:
df_task3 = df[['VehicleAtStop','OperatorRef','VehicleRef']]
df_unique_moving_busses_per_operator = df_task3[df_task3['VehicleAtStop'] == 'false'].drop_duplicates()
del df_unique_moving_busses_per_operator['VehicleAtStop']
#df_unique_moving_busses_per_operator


In [11]:
df_count = df_unique_moving_busses_per_operator.groupby('OperatorRef').count()
print("CURRENT MOVING VEIECHLES")
print(df_count)

CURRENT MOVING VEIECHLES
                   VehicleRef
OperatorRef                  
OST:Operator:152            7
SKY:Operator:2438           5
SKY:Operator:31             2
SKY:Operator:44            13
SKY:Operator:45            23
Tide_sjø_AS                 3
Unibuss                    13


In [12]:
### task 2
df_locations = df[['VehicleRef','VehicleLatitude','VehicleLongitude','RecordedAtTime']]
df_locations = df_locations[df_locations['VehicleLatitude'] != 0.000000].dropna()
df_locations

Unnamed: 0,VehicleRef,VehicleLatitude,VehicleLongitude,RecordedAtTime
0,2133,63.284897,10.284815,2022-08-05T09:22:03.707+02:00
1,103081,59.895599,10.802183,2022-08-05T09:19:42.033+02:00
2,811222,59.000759,9.748670,2022-08-05T09:18:46.47+02:00
4,103082,59.909866,10.795000,2022-08-05T09:21:10.145+02:00
6,103088,59.949318,10.729617,2022-08-05T09:21:10.146+02:00
...,...,...,...,...
263,3350453150,60.349895,5.279658,2022-08-05T11:01:39+02:00
264,3350453152,60.355038,5.368106,2022-08-05T11:00:49+02:00
265,3350453151,60.355465,5.367959,2022-08-05T07:56:54+02:00
266,3620502516,59.621181,11.204015,2022-08-05T07:07:04+02:00


In [13]:
!pip install geopandas
!pip install folium



In [14]:
import geopandas
import folium
import matplotlib.pyplot as plt
from folium import plugins

In [15]:
geometry = geopandas.points_from_xy(df_locations.VehicleLongitude, df_locations.VehicleLatitude) 
geo_df = geopandas.GeoDataFrame(df_locations[['VehicleRef','VehicleLatitude','VehicleLongitude','RecordedAtTime']], geometry=geometry)  
geo_df.head()


Unnamed: 0,VehicleRef,VehicleLatitude,VehicleLongitude,RecordedAtTime,geometry
0,2133,63.284897,10.284815,2022-08-05T09:22:03.707+02:00,POINT (10.28481 63.28490)
1,103081,59.895599,10.802183,2022-08-05T09:19:42.033+02:00,POINT (10.80218 59.89560)
2,811222,59.000759,9.74867,2022-08-05T09:18:46.47+02:00,POINT (9.74867 59.00076)
4,103082,59.909866,10.795,2022-08-05T09:21:10.145+02:00,POINT (10.79500 59.90987)
6,103088,59.949318,10.729617,2022-08-05T09:21:10.146+02:00,POINT (10.72962 59.94932)


In [16]:
geo_df_list = [[point.xy[1][0], point.xy[0][0]] for point in geo_df.geometry]
#geo_df_list

In [19]:
map = folium.Map(location = [62,10], tiles='Cartodb dark_matter', zoom_start = 5)
plugins.HeatMap(geo_df_list).add_to(map)
map

In [23]:
from IPython.display import Javascript
display(Javascript('IPython.notebook.execute_all_cells()'))

<IPython.core.display.Javascript object>