# Analisando Dados do Uber

Dataset: https://github.com/fivethirtyeight/uber-tlc-foil-response

In [1]:
from pandas import read_csv

In [2]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession


conf = pyspark.SparkConf().setAppName('Inicial').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
# Versão do Apache Spark
sc.version

'3.0.1'

In [4]:
# Criando um objeto Pandas
uberFile = read_csv("uber.csv")

In [5]:
type(uberFile)

pandas.core.frame.DataFrame

In [6]:
# Visualizando as primeiras linhas
uberFile.head(10)

Unnamed: 0,dispatching_base_number,date,active_vehicles,trips
0,B02512,1/1/2015,190,1132
1,B02765,1/1/2015,225,1765
2,B02764,1/1/2015,3427,29421
3,B02682,1/1/2015,945,7679
4,B02617,1/1/2015,1228,9537
5,B02598,1/1/2015,870,6903
6,B02598,1/2/2015,785,4768
7,B02617,1/2/2015,1137,7065
8,B02512,1/2/2015,175,875
9,B02682,1/2/2015,890,5506


In [7]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Tranformando o dataframe (Pandas) em um Dataframe (Spark)
uberDF = sqlContext.createDataFrame(uberFile)

In [8]:
type(uberDF)

pyspark.sql.dataframe.DataFrame

In [9]:
# Criando o RDD a partir dr arquivo csv
uberRDD = sc.textFile("uber.csv")

In [10]:
type(uberRDD)

pyspark.rdd.RDD

In [11]:
uberRDD.count()

355

In [12]:
uberRDD.first()

'dispatching_base_number,date,active_vehicles,trips'

In [13]:
# Dividindo o arquivo em colunas, separadas pelo caracter ",""
uberLinhas = uberRDD.map(lambda line: line.split(","))

In [14]:
type(uberLinhas)

pyspark.rdd.PipelinedRDD

In [24]:
uberLinhas.first()

['dispatching_base_number', 'date', 'active_vehicles', 'trips']

In [25]:
uberLinhas.map(lambda coluna: coluna[0]).distinct().count()

7

In [26]:
uberLinhas.map(lambda coluna: coluna[0]).distinct().collect()

['dispatching_base_number',
 'B02512',
 'B02765',
 'B02764',
 'B02682',
 'B02617',
 'B02598']

In [27]:
uberLinhas.filter(lambda registro: "B02617" in registro).count()

59

In [28]:
b02617_RDD = uberLinhas.filter(lambda registro: "B02617" in registro)

In [30]:
b02617_RDD.first()

['B02617', '1/1/2015', '1228', '9537']

In [19]:
b02617_RDD.filter(lambda coluna: int(coluna[3]) > 15000).count()

6

In [20]:
b02617_RDD.filter(lambda coluna: int(coluna[3]) > 15000).collect()

[['B02617', '1/31/2015', '1394', '15756'],
 ['B02617', '2/6/2015', '1526', '15417'],
 ['B02617', '2/13/2015', '1590', '16996'],
 ['B02617', '2/14/2015', '1486', '16999'],
 ['B02617', '2/20/2015', '1574', '16856'],
 ['B02617', '2/21/2015', '1443', '16098']]

In [32]:
uberRDD2 = sc.textFile("uber.csv").filter(lambda linha: "base" not in linha).map(lambda coluna:coluna.split(","))

In [22]:
#Total de viagens que aconteceram com o carro do uber
uberRDD2.map(lambda coluna: (coluna[0], int(coluna[3])) ).reduceByKey(lambda k,v: k + v).collect()

[('B02512', 93786),
 ('B02765', 193670),
 ('B02764', 1914449),
 ('B02682', 662509),
 ('B02617', 725025),
 ('B02598', 540791)]

In [37]:
#Ordenando os 10 maiores valores em ordem
uberRDD2.map(lambda coluna: (coluna[0], int(coluna[3])) ).reduceByKey(lambda k,v: k + v).takeOrdered(10, key = lambda x: -x[1])

[('B02764', 1914449),
 ('B02617', 725025),
 ('B02682', 662509),
 ('B02598', 540791),
 ('B02765', 193670),
 ('B02512', 93786)]

# Fim