<a href="https://colab.research.google.com/github/felipecampelo/projetoETLPySparkAPI/blob/main/TestePython_FelipeCampelo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**Teste de Python**

🧑 *Candidato: Felipe Souto Campelo*

📲 *Telefone: (81) 99811-8727*

📧 *E-mail: felipesoutocampelo@gmail.com*

🏢 *Empresa: Dataside*

📅 *Data: 06/01/2023*

##📜 **Arquivos Gerados**: 

↪ *CSV da Tabela 1 com o nome CSVTabela1*

↪ *CSV da Tabela 2 com o nome CSVTabela2*

↪ *Arquivo ipynb com o nome TestePython_FelipeCampelo.ipynb*


```
OBS: Com a filtragem para apenas os estados de São Paulo, o número de cidades com os dados desejados caiu significativamente.
 Se não tivesse sido feita a filtragem, teríamos dados de 38 cidades, uma a menos do que o planejado, tendo em vista que a API não encontrou a cidade de Potim.
```

```
OBS2: Para afirmativa de que VaiChover, utilizei uma condição de que se a probabilidade de chuva for igual ou maior a 50%, então VaiChover = 'Sim'. Caso contrário, VaiChover = 'não'.
```

####✅ Instalando Bibliotecas Necessárias

In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
pip install jupyterlab

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [4]:
pip install unidecode

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


####✅ Importando Bibliotecas e Iniciando a SparkSession

In [5]:
import findspark
findspark.init()

import pandas as pd
import requests
import json
import unidecode
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SQLContext
import pyspark.sql.functions as F

spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .config('spark.ui.port', '4050') \
      .getOrCreate()
    
sc = spark.sparkContext

####✅ Buscando Cidades do Vale do Paraíba

In [6]:
# Buscar cidades do Vale do Paraíba fazendo a requisição HTTP
res = None
url = 'https://servicodados.ibge.gov.br/api/v1/localidades/mesorregioes/3513/municipios'

try:
  res = requests.get(url)
    
except Exception as e:
  print(e)
if res != None and res.status_code == 200:
  json_data = json.loads(res.text)
  print(json_data)

# Criar data frame com as cidades
df = spark.read.json(sc.parallelize([json_data]))
df = df.withColumnRenamed("regiao-imediata", "regiao_imediata")
df = df.withColumnRenamed("id", "CodigoDaCidade")
df = df.withColumnRenamed("nome", "Cidade")

df = df.withColumn("microrregiao_id", df.microrregiao.getItem("id")) \
       .withColumn("microrregiao_nome", df.microrregiao.getItem("nome")) \
       .withColumn("microrregiao_mesorregiao_id", df.microrregiao.mesorregiao.getItem("id")) \
       .withColumn("microrregiao_mesorregiao_nome", df.microrregiao.mesorregiao.getItem("nome")) \
       .withColumn("microrregiao_mesorregiao_UF_id", df.microrregiao.mesorregiao.UF.getItem("id")) \
       .withColumn("microrregiao_mesorregiao_UF_sigla", df.microrregiao.mesorregiao.UF.getItem("sigla")) \
       .withColumn("microrregiao_mesorregiao_UF_nome", df.microrregiao.mesorregiao.UF.getItem("nome")) \
       .withColumn("microrregiao_mesorregiao_UF_regiao_id", df.microrregiao.mesorregiao.UF.regiao.getItem("id")) \
       .withColumn("microrregiao_mesorregiao_UF_regiao_sigla", df.microrregiao.mesorregiao.UF.regiao.getItem("sigla")) \
       .withColumn("Regiao", df.microrregiao.mesorregiao.UF.regiao.getItem("nome")) \
       .withColumn("regiao_imediata_id", df.regiao_imediata.getItem("id")) \
       .withColumn("regiao_imediata_nome", df.regiao_imediata.getItem("nome")) \
       .withColumn("regiao_imediata_regiao_intermediaria_id", df['regiao_imediata']["regiao-intermediaria"].getItem("id")) \
       .withColumn("regiao_imediata_regiao_intermediaria_nome", df['regiao_imediata']["regiao-intermediaria"].getItem("nome")) \
       .withColumn("regiao_imediata_regiao_intermediaria_UF_id", df['regiao_imediata']["regiao-intermediaria"]["UF"].getItem("id")) \
       .withColumn("regiao_imediata_regiao_intermediaria_UF_sigla", df['regiao_imediata']["regiao-intermediaria"]["UF"].getItem("sigla")) \
       .withColumn("regiao_imediata_regiao_intermediaria_UF_nome", df['regiao_imediata']["regiao-intermediaria"]["UF"].getItem("nome")) \
       .withColumn("regiao_imediata_regiao_intermediaria_UF_regiao_id", df['regiao_imediata']["regiao-intermediaria"]["UF"]["regiao"].getItem("id")) \
       .withColumn("regiao_imediata_regiao_intermediaria_UF_regiao_id", df['regiao_imediata']["regiao-intermediaria"]["UF"]["regiao"].getItem("sigla")) \
       .withColumn("regiao_imediata_regiao_intermediaria_UF_regiao_id", df['regiao_imediata']["regiao-intermediaria"]["UF"]["regiao"].getItem("nome")) \
       .drop("microrregiao") \
       .drop("regiao_imediata")

df = df.replace('Guaratinguetá', 'Guaratingueta') 
df = df.replace('Igaratá', 'Igarata') 

df.show(100)

[{'id': 3502507, 'nome': 'Aparecida', 'microrregiao': {'id': 35051, 'nome': 'Guaratinguetá', 'mesorregiao': {'id': 3513, 'nome': 'Vale do Paraíba Paulista', 'UF': {'id': 35, 'sigla': 'SP', 'nome': 'São Paulo', 'regiao': {'id': 3, 'sigla': 'SE', 'nome': 'Sudeste'}}}}, 'regiao-imediata': {'id': 350052, 'nome': 'Guaratinguetá', 'regiao-intermediaria': {'id': 3511, 'nome': 'São José dos Campos', 'UF': {'id': 35, 'sigla': 'SP', 'nome': 'São Paulo', 'regiao': {'id': 3, 'sigla': 'SE', 'nome': 'Sudeste'}}}}}, {'id': 3503158, 'nome': 'Arapeí', 'microrregiao': {'id': 35052, 'nome': 'Bananal', 'mesorregiao': {'id': 3513, 'nome': 'Vale do Paraíba Paulista', 'UF': {'id': 35, 'sigla': 'SP', 'nome': 'São Paulo', 'regiao': {'id': 3, 'sigla': 'SE', 'nome': 'Sudeste'}}}}, 'regiao-imediata': {'id': 350053, 'nome': 'Cruzeiro', 'regiao-intermediaria': {'id': 3511, 'nome': 'São José dos Campos', 'UF': {'id': 35, 'sigla': 'SP', 'nome': 'São Paulo', 'regiao': {'id': 3, 'sigla': 'SE', 'nome': 'Sudeste'}}}}}, {

####✅ Criando uma Temp View com as Cidades do Vale do Paraíba

In [7]:
# Criar view com as cidades
df.createOrReplaceTempView("Cidades")

# Run SQL Query
spark.sql("SELECT * from Cidades").show(100, truncate=False)

+--------------+-----------------------+---------------+--------------------+---------------------------+-----------------------------+------------------------------+---------------------------------+--------------------------------+-------------------------------------+----------------------------------------+-------+------------------+---------------------------------------+---------------------------------------+-----------------------------------------+------------------------------------------+---------------------------------------------+--------------------------------------------+-------------------------------------------------+
|CodigoDaCidade|Cidade                 |microrregiao_id|microrregiao_nome   |microrregiao_mesorregiao_id|microrregiao_mesorregiao_nome|microrregiao_mesorregiao_UF_id|microrregiao_mesorregiao_UF_sigla|microrregiao_mesorregiao_UF_nome|microrregiao_mesorregiao_UF_regiao_id|microrregiao_mesorregiao_UF_regiao_sigla|Regiao |regiao_imediata_id|regiao_imediata

####✅ Buscando Dados do Tempo para as Cidades Usando Duas APIs

⏩ WeatherStack

⏩ VisualCrossing

In [16]:
# Consumindo a API da WeatherStack e criando o DataFrame
# coding: utf-8
data_collect = df.collect()
primeiraConsulta = 1

for row in data_collect:
  params = {
    'access_key': '0147e7d0ebf5f4ea847542a297f34a34',
    'query': row['Cidade']
  }
  try:
    api_result = requests.get('http://api.weatherstack.com/current', params)
    api_response = api_result.json()
    print(api_response)
    print(row['Cidade'])
  except Exception as e:
    print(e)

  if api_response.get('success') == None:
    # Criar data frame com as cidades
    dfAux = spark.read.json(sc.parallelize([api_response]))

    dfAux = dfAux.withColumn("request_type", dfAux['request'].getItem("type")) \
                .withColumn("request_query", dfAux['request'].getItem("query")) \
                .withColumn("request_language", dfAux['request'].getItem("language")) \
                .withColumn("request_unit", dfAux['request'].getItem("unit")) \
                .withColumn("location_name", dfAux.location.getItem("name")) \
                .withColumn("Pais", dfAux.location.getItem("country")) \
                .withColumn("location_region", dfAux.location.getItem("region")) \
                .withColumn("location_lat", dfAux.location.getItem("lat")) \
                .withColumn("location_lon", dfAux.location.getItem("lon")) \
                .withColumn("location_timezone_id", dfAux.location.getItem("timezone_id")) \
                .withColumn("location_localtime", dfAux.location.getItem("localtime")) \
                .withColumn("location_localtime_epoch", dfAux.location.getItem("localtime_epoch")) \
                .withColumn("location_utc_offset", dfAux.location.getItem("utc_offset")) \
                .withColumn("location_observation_time", dfAux.current.getItem("observation_time")) \
                .withColumn("TemperaturaMedia", dfAux.current.getItem("temperature")) \
                .withColumn("current_weather_code", dfAux.current.getItem("weather_code")) \
                .withColumn("current_weather_icons", dfAux.current.getItem("weather_icons")) \
                .withColumn("CondicaoDoTempo", dfAux.current.getItem("weather_descriptions")) \
                .withColumn("VelocidadeMaximaDoVento", dfAux.current.getItem("wind_speed")) \
                .withColumn("current_wind_degree", dfAux.current.getItem("wind_degree")) \
                .withColumn("current_wind_dir", dfAux.current.getItem("wind_dir")) \
                .withColumn("current_pressure", dfAux.current.getItem("pressure")) \
                .withColumn("current_precip", dfAux.current.getItem("precip")) \
                .withColumn("current_humidity", dfAux.current.getItem("humidity")) \
                .withColumn("current_cloudcover", dfAux.current.getItem("cloudcover")) \
                .withColumn("current_feelslike", dfAux.current.getItem("feelslike")) \
                .withColumn("current_uv_index", dfAux.current.getItem("uv_index")) \
                .withColumn("current_visibility", dfAux.current.getItem("visibility")) \
                .withColumn("current_is_day", dfAux.current.getItem("is_day")) \
                .drop("request") \
                .drop("location") \
                .drop("current")
  else:
    emp_RDD = spark.sparkContext.emptyRDD()
    schema = StructType([StructField('request_type', StringType(), True), StructField('request_query', StringType(), True), StructField('request_language', StringType(), True), StructField('request_unit', StringType(), True), StructField('location_name', StringType(), True), StructField('location_country', StringType(), True), StructField('location_region', StringType(), True), StructField('location_lat', StringType(), True), StructField('location_lon', StringType(), True), StructField('location_timezone_id', StringType(), True), StructField('location_localtime', StringType(), True), StructField('location_localtime_epoch', LongType(), True), StructField('location_utc_offset', StringType(), True), StructField('location_observation_time', StringType(), True), StructField('current_temperature', LongType(), True), StructField('current_weather_code', LongType(), True), StructField('current_weather_icons', ArrayType(StringType(), True), True), StructField('current_weather_descriptions', ArrayType(StringType(), True), True), StructField('current_wind_speed', LongType(), True), StructField('current_wind_degree', LongType(), True), StructField('current_wind_dir', StringType(), True), StructField('current_pressure', LongType(), True), StructField('current_precip', DoubleType(), True), StructField('current_humidity', LongType(), True), StructField('current_cloudcover', LongType(), True), StructField('current_feelslike', LongType(), True), StructField('current_uv_index', LongType(), True), StructField('current_visibility', LongType(), True), StructField('current_is_day', StringType(), True)])

    dfAux = spark.createDataFrame(data = emp_RDD, schema = schema)
    dfAux.show()
  
  if primeiraConsulta == 1:
    df2 = dfAux
  else:
    df2 = df2.union(dfAux)
  
  primeiraConsulta = 0

# Como a API retornou cidades de outros Países/Estados. Com o filtro de mostrar apenas no estado de São Paulo, a quantidade de cidades com dados diminuiu.
# Se houvesse um plano pago para a API, poderíamos filtrar diretamente na query da requisição.
# A cidade de Potim não foi encontradas pela API.
df2 = df2.filter((df2['location_region'] == 'Sao Paulo') | (df2['location_region'] == 'São Paulo'))
df2 = df2.replace('Natividade Da Serra', 'Natividade da Serra') 
df2 = df2.replace('Sao Luiz Do Paraitinga', 'São Luiz do Paraitinga')

df2.show(1000)

{'request': {'type': 'City', 'query': 'Aparecida, Brazil', 'language': 'en', 'unit': 'm'}, 'location': {'name': 'Aparecida', 'country': 'Brazil', 'region': 'Sao Paulo', 'lat': '-22.833', 'lon': '-45.233', 'timezone_id': 'America/Sao_Paulo', 'localtime': '2023-01-06 15:03', 'localtime_epoch': 1673017380, 'utc_offset': '-3.0'}, 'current': {'observation_time': '06:03 PM', 'temperature': 20, 'weather_code': 296, 'weather_icons': ['https://cdn.worldweatheronline.com/images/wsymbols01_png_64/wsymbol_0017_cloudy_with_light_rain.png'], 'weather_descriptions': ['Light Rain, Mist'], 'wind_speed': 13, 'wind_degree': 230, 'wind_dir': 'SW', 'pressure': 1018, 'precip': 1.3, 'humidity': 100, 'cloudcover': 100, 'feelslike': 20, 'uv_index': 4, 'visibility': 3, 'is_day': 'yes'}}
Aparecida
{'request': {'type': 'City', 'query': 'Arapeí, Brasil', 'language': 'en', 'unit': 'm'}, 'location': {'name': 'Arapeí', 'country': 'Brasil', 'region': 'Sao Paulo', 'lat': '-22.683', 'lon': '-44.450', 'timezone_id': 'Ame

38

In [18]:
# Consumindo a API da Visual Crossing e criando o DataFrame com as Informações Complementares
df2_collect = df2.collect()
primeiraConsulta = 1

for row in df2_collect:
  lat = row['location_lat']
  lon = row['location_lon']

  try:
    # Indicamos a query de lat/lon dessa forma -> lat%2Clong
    api_result = requests.get(f'https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/{lat}%2C{lon}?unitGroup=metric&include=days%2Ccurrent&key=Q5LYDGFHV3GX5MAPHVUYD443E&contentType=json')
    api_response = api_result.json()
    print(api_response)
    print(row['nome'])
  except Exception as e:
    print(e)

  for i in range(15): # São 15 dias de dados
    dfAux = spark.read.json(sc.parallelize([json.dumps(api_response)]))
    
    # if dfAux['days']["precipprob"][i] >= 50.0:
    #   VaiChover = 'Sim'
    # else:
    #   VaiChover = 'Não'

    dfAux = dfAux.withColumn("Latitude", dfAux['latitude']) \
                    .withColumn("Longitude", dfAux['longitude']) \
                    .withColumn("TemperaturaMaxima", dfAux['days']["tempmax"][i]) \
                    .withColumn("TemperaturaMinima", dfAux['days']["tempmin"][i]) \
                    .withColumn("ChanceDeChuva", dfAux['days']["precipprob"][i]) \
                    .withColumn("NascerDoSol", dfAux['days']["sunrise"][i]) \
                    .withColumn("PorDoSol", dfAux['days']["sunset"][i]) \
                    .withColumn("Data", dfAux['days']["datetime"][i]) \
                    .drop("currentConditions") \
                    .drop("address") \
                    .drop("days") \
                    .drop("queryCost") \
                    .drop("resolvedAddress") \
                    .drop("stations") \
                    .drop("timezone") \
                    .drop("tzoffset") 

    dfAux = dfAux.withColumn('VaiChover', F.when(F.col('ChanceDeChuva') >= 50.0, 'Sim').otherwise('Não'))

    if primeiraConsulta == 1:
      df3 = dfAux
    else:
      df3 = df3.union(dfAux)

    primeiraConsulta = 0

df3.show(1000)
df3.count()

{'queryCost': 1, 'latitude': -22.833, 'longitude': -45.233, 'resolvedAddress': '-22.833,-45.233', 'address': '-22.833,-45.233', 'timezone': 'America/Sao_Paulo', 'tzoffset': -3.0, 'days': [{'datetime': '2023-01-06', 'datetimeEpoch': 1672974000, 'tempmax': 21.8, 'tempmin': 15.7, 'temp': 17.6, 'feelslikemax': 21.8, 'feelslikemin': 15.7, 'feelslike': 17.6, 'dew': 17.3, 'humidity': 98.3, 'precip': 17.3, 'precipprob': 100.0, 'precipcover': 83.33, 'preciptype': ['rain'], 'snow': 0.0, 'snowdepth': 0.0, 'windgust': 12.6, 'windspeed': 14.5, 'winddir': 234.4, 'pressure': 1017.3, 'cloudcover': 96.1, 'visibility': 5.6, 'solarradiation': 120.3, 'solarenergy': 10.3, 'uvindex': 6.0, 'severerisk': 10.0, 'sunrise': '05:22:34', 'sunriseEpoch': 1672993354, 'sunset': '18:50:47', 'sunsetEpoch': 1673041847, 'moonphase': 0.5, 'conditions': 'Rain, Overcast', 'description': 'Cloudy skies throughout the day with a chance of rain throughout the day.', 'icon': 'rain', 'stations': ['SBSJ', 'SBGW', 'SBTA', 'remote',

330

####✅ Criando Temp Views com os Dados do Tempo

In [23]:
# Criar view com os Dados do Tempo para as duas APIs
df2.createOrReplaceTempView("Tempo_WeatherStack")
df3.createOrReplaceTempView("Tempo_VisualCrossing")

# Run SQL Query
spark.sql("SELECT * from Tempo_WeatherStack").show(100, truncate=False)
spark.sql("SELECT * from Tempo_VisualCrossing").show(100, truncate=False)

+------------+------------------------------+----------------+------------+----------------------+------+---------------+------------+------------+--------------------+------------------+------------------------+-------------------+-------------------------+----------------+--------------------+-----------------------------------------------------------------------------------------------------+--------------------+-----------------------+-------------------+----------------+----------------+--------------+----------------+------------------+-----------------+----------------+------------------+--------------+
|request_type|request_query                 |request_language|request_unit|location_name         |Pais  |location_region|location_lat|location_lon|location_timezone_id|location_localtime|location_localtime_epoch|location_utc_offset|location_observation_time|TemperaturaMedia|current_weather_code|current_weather_icons                                                                 

####✅ Utilizando o Spark SQL para as Consultas Desejadas


⏩ Tabela 1

- [X] Cidade


- [X] CodigoDaCidade


- [X]  Data


- [X]  Regiao


- [X]  Pais


- [X]  Latitude


- [X]  Longitude


- [X]  TemperaturaMaxima


- [X]  TemperaturaMinima


- [X]  TemperaturaMedia (temperatura)


- [X]  VaiChover


- [X]  ChanceDeChuva


- [X]  CondicaoDoTempo


- [X]  NascerDoSol


- [X]  PorDoSol


- [X]  VelocidadeMaximaDoVento (wind_speed)

⏩ Tabela 2

- [X]  Cidade


- [X]  QtdDiasVaiChover


- [X]  QtdDiasNaoVaiChover


- [X]  TotalDiasMapeados


In [24]:
# Printando as tabelas para facilitar a visualização das colunas
spark.sql("SELECT * from Tempo_WeatherStack").show(1, truncate=False)
print(spark.sql("SELECT * from Tempo_WeatherStack").count())

spark.sql("SELECT * from Tempo_VisualCrossing").show(1, truncate=False)
print(spark.sql("SELECT * from Tempo_VisualCrossing").count())

spark.sql("SELECT * from Cidades").show(1, truncate=False)
spark.sql("SELECT * from Cidades").count()

+------------+-----------------+----------------+------------+-------------+------+---------------+------------+------------+--------------------+------------------+------------------------+-------------------+-------------------------+----------------+--------------------+-----------------------------------------------------------------------------------------------------+------------------+-----------------------+-------------------+----------------+----------------+--------------+----------------+------------------+-----------------+----------------+------------------+--------------+
|request_type|request_query    |request_language|request_unit|location_name|Pais  |location_region|location_lat|location_lon|location_timezone_id|location_localtime|location_localtime_epoch|location_utc_offset|location_observation_time|TemperaturaMedia|current_weather_code|current_weather_icons                                                                                |CondicaoDoTempo   |VelocidadeM

39

In [25]:
# Com as três TempViews, é possível obter TODOS os dados desejáveis
# Foi considerado um VaiChover = Sim para probabilidades de chuva igual ou superior a 50.0%
# Tabela 1:
tabela1 = spark.sql("SELECT c.Cidade, c.CodigoDaCidade, v.Data, c.Regiao, w.Pais, v.Latitude, v.Longitude, v.TemperaturaMaxima, v.TemperaturaMinima, w.TemperaturaMedia, v.VaiChover, v.ChanceDeChuva, w.CondicaoDoTempo, v.NascerDoSol, v.PorDoSol, w.VelocidadeMaximaDoVento \
                     FROM Cidades c \
                     JOIN Tempo_WeatherStack w \
                     ON c.Cidade = w.location_name \
                     JOIN Tempo_VisualCrossing v \
                     ON w.location_lat = v.Latitude AND w.location_lon = v.Longitude")

# TempView
tabela1.createOrReplaceTempView("Tabela1")
tabela1.count()

330

In [26]:
# Tabela 2:
tabela2 = spark.sql("SELECT t1.Cidade, COUNT(case when VaiChover = 'Sim' then 1 else null end) AS QtdDiasVaiChover, COUNT(case when VaiChover = 'Não' then 1 else null end) AS QtdDiasNaoVaiChover, COUNT(VaiChover) AS TotalDiasMapeados \
                     FROM Tabela1 t1 \
                     GROUP BY t1.Cidade")

# TempView
tabela2.createOrReplaceTempView("Tabela2")
tabela2.count()

22

####✅ Exportando os dados em CSVs

In [34]:
# Exportando CSVs
tabela1.toPandas().to_csv('CSVTabela1.csv', index = False)
tabela2.toPandas().to_csv('CSVTabela2.csv', index = False)