In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark import SparkFiles
import pandas as pd

# Configurações de Display 
pd.set_option('display.max_columns', None)
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

spark = SparkSession.builder.getOrCreate()

In [0]:
%run /estudos/libs/bibliotecas

In [0]:
dfUBER_ORIGINAL = (
    spark.createDataFrame(pd.read_csv('https://raw.githubusercontent.com/dbaassists/Arquivos_Base_PY/main/uber/trips_data.csv?token=GHSAT0AAAAAACHGS3UTVWRCAQPV4RASFW2CZIYNWHQ'
                                       ,sep=','
                                       ,encoding='utf-8'
                                       ,header=0)
                              )
)

In [0]:
numero_linhas = dfUBER_ORIGINAL.count()

numero_colunas = len(dfUBER_ORIGINAL.dtypes)

print(f"Qtd Linhas: {numero_linhas}" )
print(f"Qtd Colunas: {numero_colunas}" )


In [0]:
dfUBER_ORIGINAL=(dfUBER_ORIGINAL.withColumn("Begin Trip Lat", lit("0"))
.withColumn("Begin Trip Lng", lit("0"))
.withColumn("Begin Trip Address", lit("Origem"))
.withColumn("Dropoff Lat", lit("0"))
.withColumn("Dropoff Lng", lit("0"))
.withColumn("Dropoff Address", lit("Destino")))

In [0]:
pathArquivo = "/Storage/dados_viagens_uber/"
cria_diretorio(pathArquivo)

In [0]:
arquivoUBER_ATUALIZADO = pathArquivo + 'dados_viagens_uber.csv'
arquivoUBER_ATUALIZADO
(
    dfUBER_ORIGINAL
      .write.option("header", "true").option('delimiter',';')
      .format("csv")
      .mode("overwrite")
      .save(arquivoUBER_ATUALIZADO)
)

In [0]:
dbutils.fs.ls(pathArquivo)

In [0]:
print(arquivoUBER_ATUALIZADO)
dfUBER = spark.read.csv(arquivoUBER_ATUALIZADO
                            ,header=True
                            ,sep=';'
                            ,encoding='utf-8')

display(dfUBER)

In [0]:
dfUBER.printSchema()

In [0]:
dfUBER = (dfUBER.withColumnRenamed('City','NomeCidade')
                .withColumnRenamed('Product Type','NomeProduto')
                .withColumnRenamed('Trip or Order Status','StatusViagem')
                .withColumnRenamed('Request Time','DthSolicitacaoViagem')
                .withColumnRenamed('Begin Trip Time','DthInicioViagem')
                .withColumnRenamed('Begin Trip Lat','LatitudeInicioViagem')
                .withColumnRenamed('Begin Trip Lng','LongitudeInicioViagem')
                .withColumnRenamed('Begin Trip Address','EnderecoInicioViagem')
                .withColumnRenamed('Dropoff Time','DthFimViagem')
                .withColumnRenamed('Dropoff Lat','LatitudeDestinoViagem')
                .withColumnRenamed('Dropoff Lng','LongitudeDestinoViagem')
                .withColumnRenamed('Dropoff Address','EnderecoDestinoViagem')
                .withColumnRenamed('Distance (miles)','DistanciaPercorridaEmMilhas')
                .withColumnRenamed('Fare Amount','ValorPago')
                .withColumnRenamed('Fare Currency','Moeda')
                )

In [0]:
dfUBER = (dfUBER.withColumn("DthSolicitacaoViagem", expr("substr(DthSolicitacaoViagem, 1, 19)"))
                .withColumn("DthInicioViagem", expr("substr(DthInicioViagem, 1, 19)"))
                .withColumn("DthFimViagem", expr("substr(DthFimViagem, 1, 19)"))
            )

In [0]:
dfUBER = (dfUBER.withColumn("DthSolicitacaoViagem", col("DthSolicitacaoViagem").cast("timestamp"))
                .withColumn("DthInicioViagem", col("DthInicioViagem").cast("timestamp"))
                .withColumn("DthFimViagem", col("DthFimViagem").cast("timestamp"))
                .withColumn("DistanciaPercorridaEmMilhas", col("DistanciaPercorridaEmMilhas").cast("float"))
                .withColumn("ValorPago", col("ValorPago").cast("float"))
            )

# 09 - Expandindo a visão de Datas

In [0]:
dfUBER = (dfUBER.withColumn("AnoViagem" , year(dfUBER.DthSolicitacaoViagem))
                .withColumn("MesViagem" , month(dfUBER.DthSolicitacaoViagem))
                .withColumn("DiaViagem" , dayofmonth(dfUBER.DthSolicitacaoViagem))
                .withColumn("DiaViagemExtenso" , date_format(dfUBER.DthSolicitacaoViagem,"EEEE")))
                

In [0]:
dfUBER = dfUBER.withColumn('TempoEsperaViagem'
                          , round((to_timestamp(col('DthInicioViagem')).cast("long") -
                           to_timestamp(col('DthSolicitacaoViagem')).cast("long")) /60))

In [0]:
display(dfUBER)

In [0]:
dfUBER = (dfUBER.withColumn("DiaDaSemana", when(col("DiaViagemExtenso")=="Sunday","0 - Domingo")
                  .when(col("DiaViagemExtenso")=="Monday","1 - Segunda-Feira")
                  .when(col("DiaViagemExtenso")=="Tuesday","2 - Terça-Feira")
                  .when(col("DiaViagemExtenso")=="Wednesday","3 - Quarta-Feira")
                  .when(col("DiaViagemExtenso")=="Thursday","4 - Quinta-Feira")
                  .when(col("DiaViagemExtenso")=="Friday","4 - Sexta-Feira")
                  .when(col("DiaViagemExtenso")=="Saturday","6 - Sábado")))

# 10 - TRATAMENTO NAS COLUNAS

In [0]:
dfUBER = (dfUBER.withColumn("DistanciaPercorridaEmMilhas", col("DistanciaPercorridaEmMilhas") * 1.60934))

In [0]:
dfUBER.select(col("StatusViagem")).distinct().show()

In [0]:
dfUBER = (dfUBER.withColumn("StatusViagem" , when(col("StatusViagem")== "UNFULFILLED","Descartada")
                                            .when(col("StatusViagem")== "COMPLETED","Concluida")
                                            .when(col("StatusViagem")== "CANCELED","Cancelada")
                                            .when(col("StatusViagem")== "DRIVER_CANCELED","Cancelada pelo Motorista")
            )
            )

In [0]:
dfUBER.select(col("StatusViagem")).distinct().show()

In [0]:
dfUBER.select(col("NomeProduto")).distinct().show()

In [0]:
dfUBER = (dfUBER.withColumn("NomeProduto" , when((col("NomeProduto")== "UberX") | (col("NomeProduto")== "uberX") ,"Uber X")
                                            .when((col("NomeProduto")== "Comfort Planet") | (col("NomeProduto")== "Comfort"),"Uber Confort")
                                            .when(col("NomeProduto")== "VIP","Uber VIP")
                                            .when(col("NomeProduto")== "Flash Moto","Uber Flash Moto")
                                            .when(col("NomeProduto")== "Flash","Uber Flash")
                                            .when(col("NomeProduto")== "Black","Uber Black")
                                            .otherwise("N/I")
            )
            )

In [0]:
dfUBER.select(col("NomeProduto")).distinct().show()

# 11 - ETAPA DE ELIMINAÇÃO DE REGISTROS

In [0]:
dfUBER = dfUBER.select("*").where("StatusViagem != 'Descartada'")
#display(dfUBER.filter(dfUBER.StatusViagem != 'Descartada'))
#display(dfUBER.select("*").where("StatusViagem != 'Descartada'"))

In [0]:
dfDadosUBER = dfUBER.select('NomeCidade'
                            ,'NomeProduto'
                            ,'StatusViagem'
                            ,'DthSolicitacaoViagem'
                            ,'DthInicioViagem'
                            ,'DthFimViagem'
                            ,'DistanciaPercorridaEmMilhas'
                            ,'ValorPago'
                            ,'AnoViagem'
                            ,'MesViagem'
                            ,'DiaViagem'
                            ,'DiaViagemExtenso'
                            ,'DiaDaSemana'
                            ,'TempoEsperaViagem'
                            ,lit(1).alias('QtdViagem'))


# 12 - ANALISANDO OS DADOS

In [0]:
dfDadosUBER.printSchema()

In [0]:
display(spark.sql("""
          SELECT A.NomeProduto
          , A.AnoViagem
          , A.StatusViagem
          , Sum(A.ValorPago) ValorPago
          , round((Sum(A.ValorPago) / B.QtdViagem),3) MediaGasto
          , SUM(A.QtdViagem) QtdViagem
          , round((SUM(A.QtdViagem) / ( select SUM(QtdViagem) FROM {df} )),3) Media 
          FROM {df} A
          INNER JOIN (
              
              SELECT  NomeProduto , AnoViagem, StatusViagem, Sum(A.QtdViagem) QtdViagem FROM {df} A
              GROUP BY NomeProduto , AnoViagem, StatusViagem
          ) B
          ON A.NomeProduto = B.NomeProduto
          AND A.AnoViagem = B.AnoViagem
          AND A.StatusViagem = B.StatusViagem
          WHERE A.AnoViagem = 2021
          GROUP BY A.NomeProduto
          , A.AnoViagem
          , A.StatusViagem
          , B.QtdViagem
          """,df=dfDadosUBER))

In [0]:
display(spark.sql("""
          SELECT 
          AnoViagem
          , Sum(ValorPago) ValorPago
          , round(Sum(ValorPago) / (SUM(QtdViagem)),3) MediaGasto
          , SUM(QtdViagem) QtdViagem
          , round((SUM(QtdViagem) / ( select SUM(QtdViagem) FROM {df} )),3) Media 
          FROM {df} 
          GROUP BY 
          AnoViagem
          ORDER BY AnoViagem
          """,df=dfDadosUBER))

In [0]:
display(spark.sql("""
          SELECT 
          DiaDaSemana
          , StatusViagem
          , Sum(ValorPago) ValorPago
          , round(Sum(ValorPago) / (SUM(QtdViagem)),3) MediaGasto
          , SUM(QtdViagem) QtdViagem
          , round((SUM(QtdViagem) / ( select SUM(QtdViagem) FROM {df} )),3) Media 
          FROM {df} 
          GROUP BY 
          DiaDaSemana
          , StatusViagem
          ORDER BY DiaDaSemana
          
          """,df=dfDadosUBER))

In [0]:
display(spark.sql("""
          SELECT 
          AnoViagem
          ,TempoEsperaViagem TempoEsperaViagem
          FROM {df} 
          WHERE StatusViagem = 'Concluida'
          --group by AnoViagem
          ORDER BY AnoViagem
          """,df=dfDadosUBER))

In [0]:
display(spark.sql("""
          SELECT 
          AnoViagem
          ,round(sum(ValorPago) / sum(DistanciaPercorridaEmMilhas),2) PagPorKM
          FROM {df} 
          WHERE StatusViagem = 'Concluida'
          group by AnoViagem
          ORDER BY AnoViagem
          """,df=dfDadosUBER))

In [0]:
display(spark.sql("""
          SELECT 
          AnoViagem
          ,100 TotalViagens
          ,(sum(case when StatusViagem = 'Concluida' then 1 else 0 end) /count(1)) * 100 ViagensConcluidas
          ,(sum(case when StatusViagem != 'Concluida' then 1 else 0 end) /count(1)) * 100 ViagensCanceladas
          FROM {df} 
          --WHERE StatusViagem = 'Concluida'
          group by AnoViagem
          ORDER BY AnoViagem
          """,df=dfDadosUBER))