In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Introdução

### Instalando as bibliotecas necessárias para realizar as tarefas solicitadas e introduzindo o ambiente de trabalho para se trabalhar com Big Data

In [2]:
# importando bibliotecas necessárias
import numpy as np
import pandas as pd
import datetime
import os
import folium
from dateutil import parser

In [3]:
# instalando PysPark para facilitar processamento em larga escala
!pip install pyspark

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.driver.memory", "15g") \
    .appName('NYCcase') \
    .getOrCreate()

In [5]:
# introduzindo o ambiente de trabalho
from pyspark import SparkContext 

In [6]:
# para usar sql com o objetivo de processamento em larga escala
from pyspark.sql.functions import *
from pyspark.sql.functions import col, countDistinct, unix_timestamp, udf, lag, to_date
from pyspark.sql.window import Window
from pyspark.sql.types import TimestampType, IntegerType, DateType

# 2.Começando o processo de ETL
### Começando o processo de extração e transformação da base de dados fornecida

In [7]:
# Carregando os datasets que serão utilizados
nyctaxi_2009 = spark.read.option("multiline",True).json(r'/kaggle/input/nyc-trips-json/data-sample_data-nyctaxi-trips-2009-json_corrigido.json')
nyctaxi_2010 = spark.read.option("multiline",True).json(r'/kaggle/input/nyc-trips-json/data-sample_data-nyctaxi-trips-2010-json_corrigido.json')
nyctaxi_2011 = spark.read.option("multiline",True).json(r'/kaggle/input/nyc-trips-json/data-sample_data-nyctaxi-trips-2011-json_corrigido.json')
nyctaxi_2012 = spark.read.option("multiline",True).json(r'/kaggle/input/nyc-trips-json/data-sample_data-nyctaxi-trips-2012-json_corrigido.json')

In [8]:
# Unindo todos os dataframes criados na célula anterior em um só e
# inspecionando o número de linhas para checar se está tudo certo
nyctaxi_All_DF = nyctaxi_2009.union(nyctaxi_2010.union(nyctaxi_2011.union(nyctaxi_2012)))
print (nyctaxi_All_DF.count())

In [9]:
# Checando os tipos de dados de cada coluna
types_data = [t for t in nyctaxi_All_DF.schema.fields]
types_data

## Respondendo as questões

### Com o que temos até o momento, podemos resolver as primeiras 2 questões
 #### 1. Qual a distância média percorrida por viagens com no máximo 2 passageiros?

In [10]:
# Utilizando a função avg e comandos de Spark SQL
from pyspark.sql.functions import avg
nyctaxi_All_DF.createOrReplaceTempView('taxi_trips')
media_trips_form = spark.sql("SELECT avg(trip_distance) as MEDIA FROM taxi_trips WHERE passenger_count <= 2").show()

#### 2. Quais os 3 maiores vendors em quantidade total de dinheiro arrecadado?

In [11]:
# Utilizando comandos de Spark SQL 
spark.sql('SELECT sum(total_amount) as TOTAL from taxi_trips GROUP BY vendor_id ORDER BY TOTAL DESC LIMIT 3').show()

#### Realizando algumas conversões, como as datas de String para TimestampType com o objetivo de pré-processamento para responder as próximas questões 

In [12]:
udf_myconverter = udf(parser.parse, TimestampType())
nyctaxi_All_DF = nyctaxi_All_DF.withColumn('new_dropoff_datetime', udf_myconverter('dropoff_datetime'))
nyctaxi_All_DF = nyctaxi_All_DF.withColumn('new_pickup_datetime', udf_myconverter('pickup_datetime'))

In [13]:
udf_myconverter = udf(parser.parse, DateType())
nyctaxi_All_DF = nyctaxi_All_DF.withColumn('trip_date', udf_myconverter('dropoff_datetime'))

In [14]:
# Criando uma coluna de duração da corrida a partir da subtração do fim da viagem (dropoff_time) em formato datetime 
# pelo seu início (pickup_time)
nyctaxi_All_DF = nyctaxi_All_DF.withColumn("duration_trip_time", 
    (unix_timestamp(nyctaxi_All_DF.new_dropoff_datetime) - unix_timestamp(nyctaxi_All_DF.new_pickup_datetime))
)

In [15]:
# Criando novo DataFrame unindo o DataFrame geral com 4 colunas adicionais 
nyctaxi_All_DF.select(year('trip_date').alias('year'), 
                      month('trip_date').alias('month'),
                      dayofmonth('trip_date').alias('day'),
                      dayofweek('trip_date').alias('dayofweek')).show()

In [16]:
# Checando criação das colunas das duas linhas anteriores e seus respectivos tipos
nyctaxi_All_DF.dtypes

In [17]:
# removendo colunas que não serão utilizadas nas próximas análises com o objetivo de poupar espaço
nyctaxi_All_DF = nyctaxi_All_DF.drop('fare_amount', 'store_and_fwd_flag', 'surcharge', 'tolls_amount', 'rate_code')
nyctaxi_All_DF.columns

 ### Convertendo DataFrames de PySpark para Pandas com a finalidade de realizar análises exploratórias

In [18]:
df_plot = nyctaxi_All_DF.select("trip_date","duration_trip_time","pickup_longitude","pickup_latitude","dropoff_latitude","dropoff_longitude","payment_type","tip_amount",year("trip_date").alias('year'), 
month("trip_date").alias('month'),dayofmonth("trip_date").alias('day'),dayofweek("trip_date").alias('dayofweek'))
pdf_df_plot = df_plot.toPandas()
print(pdf_df_plot.head())

In [19]:
# transformando tipo da coluna trip_date para datetime
pdf_df_plot['trip_date'] = pd.to_datetime(pdf_df_plot['trip_date'], format='%Y-%m-%d')
print(pdf_df_plot.dtypes)

# Quesitos bônus
## Qual o tempo médio das corridas nos dias de sábado e domingo

In [20]:
pdf_df_plot[(pdf_df_plot.dayofweek == 6) | (pdf_df_plot.dayofweek == 7)]["duration_trip_time"].mean()

## Faça um gráfico de série temporal contando a quantidade de gorjetas de cada dia, nos
## últimos 3 meses de 2012.

In [28]:
# importando bibliotecas necessárias para criação de gráficos
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns

In [21]:
# selecionando features necessárias
tips_total = pdf_df_plot[["trip_date","tip_amount"]]

In [22]:
tips_total = tips_total.groupby('trip_date')['tip_amount'].sum().reset_index()

In [23]:
# criando index
tips_total = tips_total.set_index('trip_date')

In [25]:
#checando criação de index
tips_total.info()

In [26]:
# selecionando os últimos 3 meses de 2012 para análise, considerando que as amostras acabam no mês 10.
serie_temporal = tips_total['tip_amount'].resample('B').mean()
serie_temporal= serie_temporal['2012-8-1':]

In [29]:
# criando o gráfico de série temporal
serie_temporal.plot(figsize=(15, 6)).set_title("Quantidade de gorjetas diárias pelos 3 últimos meses de 2012")
plt.show()