# Desafio 2
## Objetivo

Criar um script PYSPARK, que gere um arquivo único de output, utilizando os 3 arquivos em csv (station.csv, trip.csv e weather.csv), contendo todos os dados de viagens, acrescidos de:

- Latitude/Longitude da estação de início e de fim.
- Uma coluna com o nome "long_trip" de valor booleano sendo 'true' para viagens superiores a 10 minutos.
- Condição meteorológia no dia da viagem (coluna events da tabela weather).
- Uma coluna com o nome "age_range" sendo o valor de 1 para pessoas de 0-16 anos, 2 de 17-25, 3 de 26-50 e 4 para 50+.

 
 
 

In [1]:
try:
    !pip install pyspark=="2.4.5"  --quiet
except:
    print("Running throw py file.")

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd
import os

Identifica o diretorio atual e o repositorio dos arquivos CSV

In [3]:
dirpath = os.getcwd()
dataPath = dirpath + "/../data"

## Criando a session Spark

In [4]:
spark = SparkSession\
        .builder\
        .appName("Teste de Data Engineer - Fabio Kfouri")\
        .getOrCreate()

In [5]:
spark

## Criação dos DataFrames dos arquivos usando spark

In [6]:
#dataFrame de station.csv
dfs = spark.read.format("com.databricks.spark.csv")\
    .option("header","true").option("delimiter",",")\
    .option("inferSchema", "true")\
    .load(dataPath + "/station.csv") 

In [7]:
#dataFrame de trip.csv
dft = spark.read.format("com.databricks.spark.csv")\
    .option("header","true").option("delimiter",",")\
    .option("inferSchema", "true")\
    .load(dataPath + "/trip.csv") 

In [8]:
#dataFrame de weather.csv
dfw = spark.read.csv(dataPath + "/weather.csv", header = True, sep = ",")

## Identificacao dos Schemas

In [9]:
print("Dataframe Station")
dfs.printSchema()

Dataframe Station
root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- install_date: string (nullable = true)
 |-- install_dockcount: integer (nullable = true)
 |-- modification_date: string (nullable = true)
 |-- current_dockcount: integer (nullable = true)
 |-- decommission_date: string (nullable = true)



In [10]:
print("Dataframe Trip")
dft.printSchema()

Dataframe Trip
root
 |-- trip_id: integer (nullable = true)
 |-- starttime: string (nullable = true)
 |-- stoptime: string (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- tripduration: double (nullable = true)
 |-- from_station_name: string (nullable = true)
 |-- to_station_name: string (nullable = true)
 |-- from_station_id: string (nullable = true)
 |-- to_station_id: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthyear: string (nullable = true)



In [11]:
print("Dataframe Weather")
dfw.printSchema()

Dataframe Weather
root
 |-- Date: string (nullable = true)
 |-- Max_Temperature_F: string (nullable = true)
 |-- Mean_Temperature_F: string (nullable = true)
 |-- Min_TemperatureF: string (nullable = true)
 |-- Max_Dew_Point_F: string (nullable = true)
 |-- MeanDew_Point_F: string (nullable = true)
 |-- Min_Dewpoint_F: string (nullable = true)
 |-- Max_Humidity: string (nullable = true)
 |-- Mean_Humidity: string (nullable = true)
 |-- Min_Humidity: string (nullable = true)
 |-- Max_Sea_Level_Pressure_In: string (nullable = true)
 |-- Mean_Sea_Level_Pressure_In: string (nullable = true)
 |-- Min_Sea_Level_Pressure_In: string (nullable = true)
 |-- Max_Visibility_Miles: string (nullable = true)
 |-- Mean_Visibility_Miles: string (nullable = true)
 |-- Min_Visibility_Miles: string (nullable = true)
 |-- Max_Wind_Speed_MPH: string (nullable = true)
 |-- Mean_Wind_Speed_MPH: string (nullable = true)
 |-- Max_Gust_Speed_MPH: string (nullable = true)
 |-- Precipitation_In: string (nullable =

## Correção/conversão de dados dos DataFrames

In [12]:
#conversao de colunas [starttime, stoptime] do tipo String para o tipo timestamp
dft = dft.withColumn('tripdate', F.date_format(F.to_timestamp('starttime', 'MM/dd/yyyy HH:mm'), 'MM/dd/yyyy'))\
         .withColumn('starttime', F.to_timestamp('starttime', 'MM/dd/yyyy HH:mm'))\
         .withColumn('stoptime', F.to_timestamp('stoptime', 'MM/dd/yyyy HH:mm'))

In [13]:
print("Dataframe Trip transformed.")
dft.printSchema()

Dataframe Trip transformed.
root
 |-- trip_id: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- tripduration: double (nullable = true)
 |-- from_station_name: string (nullable = true)
 |-- to_station_name: string (nullable = true)
 |-- from_station_id: string (nullable = true)
 |-- to_station_id: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- tripdate: string (nullable = true)



In [14]:
# conversao da coluna Date do tipo String para o tipo date, em seguida formatado novamente para o tipo String
# pois normaliza o formato de data de mês e dia com dois algarismos. (Ex. 1/2/2020 para 01/02/2020)
dfw = dfw.withColumn(  'Date', F.date_format(F.to_date('Date', 'MM/dd/yyyy'), 'MM/dd/yyyy'))

In [15]:
print("Dataframe Weather transformed.")
dfw.printSchema()

Dataframe Weather transformed.
root
 |-- Date: string (nullable = true)
 |-- Max_Temperature_F: string (nullable = true)
 |-- Mean_Temperature_F: string (nullable = true)
 |-- Min_TemperatureF: string (nullable = true)
 |-- Max_Dew_Point_F: string (nullable = true)
 |-- MeanDew_Point_F: string (nullable = true)
 |-- Min_Dewpoint_F: string (nullable = true)
 |-- Max_Humidity: string (nullable = true)
 |-- Mean_Humidity: string (nullable = true)
 |-- Min_Humidity: string (nullable = true)
 |-- Max_Sea_Level_Pressure_In: string (nullable = true)
 |-- Mean_Sea_Level_Pressure_In: string (nullable = true)
 |-- Min_Sea_Level_Pressure_In: string (nullable = true)
 |-- Max_Visibility_Miles: string (nullable = true)
 |-- Mean_Visibility_Miles: string (nullable = true)
 |-- Min_Visibility_Miles: string (nullable = true)
 |-- Max_Wind_Speed_MPH: string (nullable = true)
 |-- Mean_Wind_Speed_MPH: string (nullable = true)
 |-- Max_Gust_Speed_MPH: string (nullable = true)
 |-- Precipitation_In: strin

## Criação de Views para representação dos DataFrames

In [16]:
dfs.createOrReplaceTempView("station")
dft.createOrReplaceTempView("trip")
dfw.createOrReplaceTempView("weather")

## Construção de Query
Esta query baseado no padrão ANSI, foi desenvolvida para atender os requisitos iniciais deste desafio.

In [17]:
dfOut = spark.sql("""
WITH local as (
    SELECT station_id, lat, long FROM station
), --
meteorology as (
    SELECT Date, Events FROM weather
)
SELECT t.trip_id, t.bikeid, -- id 
        /********** RELATED TO TRIP TIME *************/
        t.starttime, t.stoptime,  t.tripduration,
        CASE WHEN t.tripduration >= 600 THEN
            'true'
        ELSE
            'false'
        END long_trip,
        /********** RELATED TO TRIP ORIGIN & DESTIN *************/
        t.from_station_id, t.from_station_name, lf.lat from_lat, lf.long from_long,--origin
        t.to_station_id, t.to_station_name, lt.lat to_lat, lt.long to_long, -- destin
        /********** RELATED TO USER *************/
        t.usertype, t.gender, t.birthyear,
        CASE WHEN YEAR(current_date()) - t.birthyear <= 16 THEN 1
             WHEN YEAR(current_date()) - t.birthyear <= 25 THEN 2
             WHEN YEAR(current_date()) - t.birthyear <= 50 THEN 3
             WHEN YEAR(current_date()) - t.birthyear > 50 THEN 4
        ELSE
            null
        END age_range,  
        /********** RELATED TO WEATHER *************/
        met.Events weatherDay

FROM trip t, local lf, local lt, meteorology met
WHERE 1=1
      AND t.from_station_id = lf.station_id
      AND t.to_station_id = lt.station_id
      AND t.tripdate = met.Date
ORDER BY t.starttime
 """)


Verificando a saida da Query

In [18]:
dfOut.limit(5).toPandas()

Unnamed: 0,trip_id,bikeid,starttime,stoptime,tripduration,long_trip,from_station_id,from_station_name,from_lat,from_long,to_station_id,to_station_name,to_lat,to_long,usertype,gender,birthyear,age_range,weatherDay
0,431,SEA00298,2014-10-13 10:31:00,2014-10-13 10:48:00,985.935,True,CBD-06,2nd Ave & Spring St,47.60595,-122.335768,PS-04,Occidental Park / Occidental Ave S & S Washing...,47.600757,-122.332946,Member,Male,1960,4,Rain
1,431,SEA00298,2014-10-13 10:31:00,2014-10-13 10:48:00,985.935,True,CBD-06,2nd Ave & Spring St,47.60595,-122.335768,PS-04,Occidental Park / Occidental Ave S & S Washing...,47.600757,-122.332946,Member,Male,1960,4,Rain
2,432,SEA00195,2014-10-13 10:32:00,2014-10-13 10:48:00,926.375,True,CBD-06,2nd Ave & Spring St,47.60595,-122.335768,PS-04,Occidental Park / Occidental Ave S & S Washing...,47.600757,-122.332946,Member,Male,1970,3,Rain
3,432,SEA00195,2014-10-13 10:32:00,2014-10-13 10:48:00,926.375,True,CBD-06,2nd Ave & Spring St,47.60595,-122.335768,PS-04,Occidental Park / Occidental Ave S & S Washing...,47.600757,-122.332946,Member,Male,1970,3,Rain
4,433,SEA00486,2014-10-13 10:33:00,2014-10-13 10:48:00,883.831,True,CBD-06,2nd Ave & Spring St,47.60595,-122.335768,PS-04,Occidental Park / Occidental Ave S & S Washing...,47.600757,-122.332946,Member,Female,1988,3,Rain


Schema do Dataframe resultante

In [19]:
print("Dataframe OUT created.")
dfOut.printSchema()

Dataframe OUT created.
root
 |-- trip_id: integer (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- tripduration: double (nullable = true)
 |-- long_trip: string (nullable = false)
 |-- from_station_id: string (nullable = true)
 |-- from_station_name: string (nullable = true)
 |-- from_lat: double (nullable = true)
 |-- from_long: double (nullable = true)
 |-- to_station_id: string (nullable = true)
 |-- to_station_name: string (nullable = true)
 |-- to_lat: double (nullable = true)
 |-- to_long: double (nullable = true)
 |-- usertype: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthyear: string (nullable = true)
 |-- age_range: integer (nullable = true)
 |-- weatherDay: string (nullable = true)



## Exportando Arquivo
<p> A exportação será de um arquivo de dados em estrutura colunar (parquet), pois permite uma leitura mais eficiente em relação a processamento e tempo, e também trás benefícios quanto ao tamanho de arquivos.

In [20]:
dfOut.coalesce(1).write.parquet(dirpath + "/output/trips")

In [21]:
print("The file was created.")

The file was created.
