# <font color='blue'>Data Science Academy</font>
# <font color='blue'>Big Data Real-Time Analytics com Python e Spark</font>

# <font color='blue'>Capítulo 7</font>

### *********** Atenção: *********** 
Utilize Java JDK 11 e Apache Spark 2.4.2

*Caso receba mensagem de erro "name 'sc' is not defined", interrompa o pyspark e apague o diretório metastore_db no mesmo diretório onde está este Jupyter notebook*

# Mini-Projeto 1 - Analisando Dados do Uber com Spark 

Dataset: https://github.com/fivethirtyeight/uber-tlc-foil-response

Esse conjunto de dados contém dados de mais de 4,5 milhões de captações Uber na cidade de Nova York de abril a setembro de 2014 e 14,3 milhões de captações Uber de janeiro a junho de 2015. Dados em nível de viagem sobre 10 outras empresas de veículos de aluguel (FHV) bem como dados agregados para 329 empresas de FHV, também estão incluídos. Todos os arquivos foram recebidos em 3 de agosto, 15 de setembro e 22 de setembro de 2015.

1- Quantos são e quais são as bases de carros do Uber (onde os carros ficam esperando passageiros)?

2- Qual o total de veículos que passaram pela base B02617?

3- Qual o total de corridas por base? Apresente de forma decrescente.

In [1]:
from pandas import read_csv

In [2]:
# Criando um objeto Pandas
uberFile = read_csv("data/uber.csv")

In [3]:
type(uberFile)

pandas.core.frame.DataFrame

In [4]:
# Visualizando as primeiras linhas
uberFile.head(10)

Unnamed: 0,dispatching_base_number,date,active_vehicles,trips
0,B02512,1/1/2015,190,1132
1,B02765,1/1/2015,225,1765
2,B02764,1/1/2015,3427,29421
3,B02682,1/1/2015,945,7679
4,B02617,1/1/2015,1228,9537
5,B02598,1/1/2015,870,6903
6,B02598,1/2/2015,785,4768
7,B02617,1/2/2015,1137,7065
8,B02512,1/2/2015,175,875
9,B02682,1/2/2015,890,5506


In [5]:
# Tranformando o dataframe (Pandas) em um Dataframe (Spark)
uberDF = sqlContext.createDataFrame(uberFile)

In [6]:
type(uberDF)

pyspark.sql.dataframe.DataFrame

In [23]:
# Criando o RDD a partir do arquivo csv
uberRDD = sc.textFile("data/uber.csv")

In [24]:
type(uberRDD)

pyspark.rdd.RDD

In [25]:
# Total de registros
uberRDD.count()

355

In [10]:
# Primeiro registro
uberRDD.first()

'dispatching_base_number,date,active_vehicles,trips'

In [11]:
# Dividindo o arquivo em colunas, separadas pelo caracter ",""
uberLinhas = uberRDD.map(lambda line: line.split(","))

In [12]:
type(uberLinhas)

pyspark.rdd.PipelinedRDD

In [26]:
# Número de bases de carros do Uber
uberLinhas.map(lambda linha: linha[0]).distinct().count() -1

6

In [28]:
# Bases de carros do Uber
uberLinhas.map(lambda linha: linha[0]).distinct().collect() 

['dispatching_base_number',
 'B02765',
 'B02682',
 'B02598',
 'B02512',
 'B02764',
 'B02617']

In [15]:
# Total de veículos que passaram pela base B02617
uberLinhas.filter(lambda linha: "B02617" in linha).count()

59

In [16]:
# Gravando os dados dos veículos da base B02617 em um novo RDD
b02617_RDD = uberLinhas.filter(lambda linha: "B02617" in linha)

In [17]:
# Total de dias em que o número de corridas foi superior a 16.000
b02617_RDD.filter(lambda linha: int(linha[3]) > 16000).count()

4

In [18]:
# Dias em que o total de corridas foi superior a 16.000
b02617_RDD.filter(lambda linha: int(linha[3]) > 16000).collect()

[['B02617', '2/13/2015', '1590', '16996'],
 ['B02617', '2/14/2015', '1486', '16999'],
 ['B02617', '2/20/2015', '1574', '16856'],
 ['B02617', '2/21/2015', '1443', '16098']]

In [19]:
# Criando um novo RDD
uberRDD2 = sc.textFile("data/uber.csv").filter(lambda line: "base" not in line).map(lambda line:line.split(","))

In [20]:
# Aplicando redução para calcular o total por base
uberRDD2.map(lambda kp: (kp[0], int(kp[3])) ).reduceByKey(lambda k,v: k + v).collect()

[('B02765', 193670),
 ('B02682', 662509),
 ('B02598', 540791),
 ('B02512', 93786),
 ('B02764', 1914449),
 ('B02617', 725025)]

In [21]:
# Aplicando redução para calcular o total por base, em ordem decrescente
uberRDD2.map(lambda kp: (kp[0], int(kp[3])) ).reduceByKey(lambda k,v: k + v).takeOrdered(10, key = lambda x: -x[1])

[('B02764', 1914449),
 ('B02617', 725025),
 ('B02682', 662509),
 ('B02598', 540791),
 ('B02765', 193670),
 ('B02512', 93786)]

# Fim

### Obrigado - Data Science Academy - <a href="http://facebook.com/dsacademybr">facebook.com/dsacademybr</a>