# Objetivo

Utilizar ferramentas Big Data para tratar e visualizar uma grande quantidade de dados de GPS

#Big Data Set de GTFS

O website Kaggle possui um dataset de arquivos SPTRANS GTFS de vários dias compilados em um só. O compilado pode ser encontrado no seguinte link:

 https://www.kaggle.com/joaofb/so-paulo-bus-system. 

Iremos utilizar este compilado para aplicar os procedentes conceitos de Big Data.


# Instalações


In [142]:
!pip3 install pyspark
!pip3 install dask
!pip3 install dask[dataframe]
!pip3 install pyproj
!pip3 install datashader
!pip3 install holoviews



# Download do Big Data Set de GTFS

Referência: https://medium.com/analytics-vidhya/how-to-fetch-kaggle-datasets-into-google-colab-ea682569851a

In [None]:
# Monta o google drive no google colab

from google.colab import drive
drive.flush_and_unmount()
drive.mount('/content/gdrive')

Drive not mounted, so nothing to flush and unmount.
Mounted at /content/gdrive


In [None]:
# Cria variavél de caminho do 
import os
os.environ['KAGGLE_CONFIG_DIR'] = "/content/gdrive/My Drive/big_data/"


In [None]:
%cd /content/gdrive/MyDrive/big_data/

/content/gdrive/MyDrive/big_data


In [None]:
!kaggle datasets download -d joaofb/so-paulo-bus-system

Downloading so-paulo-bus-system.zip to /content/gdrive/My Drive/big_data
100% 697M/699M [00:06<00:00, 131MB/s]
100% 699M/699M [00:06<00:00, 107MB/s]


In [None]:
#unzipping the zip files and deleting the zip files
!unzip \*.zip  && rm *.zip

Archive:  so-paulo-bus-system.zip
  inflating: bus_position.csv        
  inflating: frequencies.csv         
  inflating: overview.csv            
  inflating: passengers.csv          
  inflating: routes.csv              
  inflating: shapes.csv              
  inflating: stop_times.csv          
  inflating: stops.csv               
  inflating: trips.csv               


# Transform com PySpark

Referência: http://www.filipyoo.com/plot-visualization-Hadoop-large-dataset-with-python-datashader/

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("GTFS with Dask").getOrCreate()

In [None]:
# O Arquivo Bus positions possui 2 gb de dados compilados do dia  28/04/2020 até o dia 06/05/2020.
dados = spark.read.csv('/content/gdrive/MyDrive/big_data/bus_position.csv', header=True,inferSchema=True)

In [None]:
# exibe o schema
dados.printSchema()

root
 |-- index: integer (nullable = true)
 |-- hr: string (nullable = true)
 |-- c: string (nullable = true)
 |-- cl: integer (nullable = true)
 |-- lt0: string (nullable = true)
 |-- lt1: string (nullable = true)
 |-- qv: integer (nullable = true)
 |-- p: integer (nullable = true)
 |-- a: integer (nullable = true)
 |-- ta: timestamp (nullable = true)
 |-- py: double (nullable = true)
 |-- px: double (nullable = true)
 |-- id: integer (nullable = true)



In [None]:
dados.head(5)

[Row(index=0, hr='11:13', c='3063-10', cl=33021, lt0='TERM. SÃO MATEUS', lt1='GUAIANAZES', qv=2, p=31034, a=1, ta=datetime.datetime(2020, 4, 28, 14, 12, 17), py=-23.61408, px=-46.4765, id=1),
 Row(index=0, hr='11:13', c='3063-10', cl=33021, lt0='TERM. SÃO MATEUS', lt1='GUAIANAZES', qv=2, p=31487, a=1, ta=datetime.datetime(2020, 4, 28, 14, 11, 30), py=-23.552765, px=-46.405838, id=2),
 Row(index=1, hr='11:13', c='2712-10', cl=926, lt0='SHOP. METRÔ ITAQUERA', lt1='JD. SÃO NICOLAU', qv=3, p=45150, a=1, ta=datetime.datetime(2020, 4, 28, 14, 12, 41), py=-23.530886, px=-46.47826, id=3),
 Row(index=1, hr='11:13', c='2712-10', cl=926, lt0='SHOP. METRÔ ITAQUERA', lt1='JD. SÃO NICOLAU', qv=3, p=45261, a=1, ta=datetime.datetime(2020, 4, 28, 14, 13, 1), py=-23.527357, px=-46.479187, id=4),
 Row(index=1, hr='11:13', c='2712-10', cl=926, lt0='SHOP. METRÔ ITAQUERA', lt1='JD. SÃO NICOLAU', qv=3, p=45728, a=1, ta=datetime.datetime(2020, 4, 28, 14, 12, 26), py=-23.527357, px=-46.479187, id=5)]

In [None]:
dados.show()

+-----+-----+-------+-----+--------------------+--------------------+---+-----+---+-------------------+----------+----------+---+
|index|   hr|      c|   cl|                 lt0|                 lt1| qv|    p|  a|                 ta|        py|        px| id|
+-----+-----+-------+-----+--------------------+--------------------+---+-----+---+-------------------+----------+----------+---+
|    0|11:13|3063-10|33021|    TERM. SÃO MATEUS|          GUAIANAZES|  2|31034|  1|2020-04-28 14:12:17| -23.61408|  -46.4765|  1|
|    0|11:13|3063-10|33021|    TERM. SÃO MATEUS|          GUAIANAZES|  2|31487|  1|2020-04-28 14:11:30|-23.552765|-46.405838|  2|
|    1|11:13|2712-10|  926|SHOP. METRÔ ITAQUERA|     JD. SÃO NICOLAU|  3|45150|  1|2020-04-28 14:12:41|-23.530886| -46.47826|  3|
|    1|11:13|2712-10|  926|SHOP. METRÔ ITAQUERA|     JD. SÃO NICOLAU|  3|45261|  1|2020-04-28 14:13:01|-23.527357|-46.479187|  4|
|    1|11:13|2712-10|  926|SHOP. METRÔ ITAQUERA|     JD. SÃO NICOLAU|  3|45728|  1|2020-04

In [None]:
# Transformando o arquivo df em um parquet
dados.select(['ta', 'py', 'px']).write.parquet("/content/gdrive/MyDrive/big_data/bus_position.parquet")

# Big Data Dataframe com Dask

Dask é o equivalente do pandas para Big Data em Python is the big data equivalent of Pandas in Python.


Referência: 

https://examples.pyviz.org/nyc_taxi/nyc_taxi.html

https://cgcooke.github.io/Blog/datashader/visualisation/pubg/2020/05/31/Visualising-PUBG-Deaths-With-Datashader.html

In [None]:
# carregando o parquet em um dask dataframe
import dask.dataframe as dd
import numpy as np

df = dd.read_parquet("/content/gdrive/MyDrive/big_data/bus_position.parquet")


In [None]:
df.head()

Unnamed: 0,ta,py,px
0,2020-04-28 14:12:17,-23.61408,-46.4765
1,2020-04-28 14:11:30,-23.552765,-46.405838
2,2020-04-28 14:12:41,-23.530886,-46.47826
3,2020-04-28 14:13:01,-23.527357,-46.479187
4,2020-04-28 14:12:26,-23.527357,-46.479187


In [None]:
df['hour'] = df['ta'].dt.hour

In [None]:
df.head()

Unnamed: 0,ta,py,px,hour
0,2020-04-28 14:12:17,-23.61408,-46.4765,14
1,2020-04-28 14:11:30,-23.552765,-46.405838,14
2,2020-04-28 14:12:41,-23.530886,-46.47826,14
3,2020-04-28 14:13:01,-23.527357,-46.479187,14
4,2020-04-28 14:12:26,-23.527357,-46.479187,14


In [None]:
df.dtypes

ta      datetime64[ns]
py             float64
px             float64
hour             int64
dtype: object

# View com Datashader
Datashader é uma biblioteca muito eficiente para visualizar grandes quantidades de dados.

In [149]:
import datashader as ds
from datashader import transfer_functions as t
import holoviews.operation.datashader as hd
from datashader.colors import Hot
import numpy as np
import holoviews as hv
from holoviews import opts
from holoviews.element.tiles import StamenTerrain
hv.extension('bokeh')

shaded = hd.datashade(hv.Points(df, ['px', 'py']), cmap=Hot, aggregator=ds.count('hour'))
hd.dynspread(shaded, threshold=0.5, max_px=4).opts(bgcolor='black', xaxis=None, yaxis=None, width=900, height=700)

Output hidden; open in https://colab.research.google.com to view.