# Camada Prata - Transformação - Combinação de dados de terremotos e instalações

In [0]:
### importes necessarios


from geopy.distance import geodesic

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType, IntegerType



In [0]:
### Recuperando dados dos terremotos da camada prata
# Caminho do arquivo Delta Lake
file_path = "/mnt/dados/camada_prata_terremotos"

# Ler o arquivo Delta Lake
df_terremotos = spark.read.format("delta").load(file_path)

# Visualizar as primeiras 5 linhas do DataFrame
df_terremotos.printSchema()
df_terremotos.show(5)

root
 |-- id: string (nullable = true)
 |-- prop_mag: double (nullable = true)
 |-- prop_url: string (nullable = true)
 |-- data: date (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- pais: string (nullable = true)

+----------+--------+--------------------+----------+--------+---------+------------+
|        id|prop_mag|            prop_url|      data|latitude|longitude|        pais|
+----------+--------+--------------------+----------+--------+---------+------------+
|at00sthakw|     6.5|https://earthquak...|2025-03-21|     7.2|    -82.3|Desconhecido|
|us10001ldx|     6.2|https://earthquak...|2015-03-10|  6.7757| -72.9875|    Colombia|
|us10001pn4|     5.1|https://earthquak...|2015-03-22|   6.804|  -73.147|    Colombia|
|us10001rcr|     5.5|https://earthquak...|2015-03-27| -1.2012| -77.5836|     Ecuador|
|us10002azw|     5.1|https://earthquak...|2015-05-20| -3.0826| -77.5501|        Perú|
+----------+--------+--------------------

In [0]:
### Recuperando dados das instalações da camada prata
# Caminho do arquivo Delta Lake
file_path = "/mnt/dados/camada_prata_instalacoes"

# Ler o arquivo Delta Lake
df_instalacoes = spark.read.format("delta").load(file_path)

# Visualizar as primeiras 5 linhas do DataFrame
df_instalacoes.printSchema()
df_instalacoes.show(2)

root
 |-- codigo: string (nullable = true)
 |-- instalacao: string (nullable = true)
 |-- operador: string (nullable = true)
 |-- latitude_instalacao: double (nullable = true)
 |-- longitude_instalacao: double (nullable = true)

+------+--------------------+--------+-------------------+--------------------+
|codigo|          instalacao|operador|latitude_instalacao|longitude_instalacao|
+------+--------------------+--------+-------------------+--------------------+
|   COV|    Terminal Coveñas|  Ocensa|  9.409136280745889|  -75.70015297027714|
|   LAG|Estacion La Granjita|  Ocensa|  8.449501151686118|  -75.50994443266501|
+------+--------------------+--------+-------------------+--------------------+
only showing top 2 rows



Criando funções para medir distância e impacto dos terremotos nas instalações

In [0]:
### Função para medir a distância
def calcular_distância(latitude_ponto_1, longitude_ponto_1, latitude_ponto_2, longitude_ponto_2):
    return geodesic((latitude_ponto_1, longitude_ponto_1), (latitude_ponto_2, longitude_ponto_2)).kilometers

    return distância

In [0]:
### Função para medir o impacto
def calcular_impacto(distancia, magnitude):
    impacto = 0
    if distancia <= 100 and magnitude >= 5:
        impacto = 1
    if distancia <= 200 and magnitude >= 6:
        impacto = 1
    if distancia <= 300 and magnitude >= 6.5:
        impacto = 1
    if distancia <= 400 and magnitude >= 7:
        impacto = 1
    if distancia <= 600 and magnitude >= 7.5:
        impacto = 1
    if distancia >= 600 and magnitude >= 8:
        impacto = 1

    return impacto


    

In [0]:
### Transformar estas funções em UDF (User Defined Functions):

calcular_distância_udf = udf(calcular_distância, DoubleType())
calcular_impacto_udf = udf(calcular_impacto, IntegerType())


Mesclando os dataframes

In [0]:
df_cross = df_instalacoes.crossJoin(df_terremotos)
df_cross.printSchema()

root
 |-- codigo: string (nullable = true)
 |-- instalacao: string (nullable = true)
 |-- operador: string (nullable = true)
 |-- latitude_instalacao: double (nullable = true)
 |-- longitude_instalacao: double (nullable = true)
 |-- id: string (nullable = true)
 |-- prop_mag: double (nullable = true)
 |-- prop_url: string (nullable = true)
 |-- data: date (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- pais: string (nullable = true)



In [0]:
### Calculando as ditâncias entre epicentro e instalação, para cada par terremoto x instalação
df_cross = df_cross.withColumn("distancia", calcular_distância_udf(df_cross["latitude"], df_cross["longitude"], df_cross["latitude_instalacao"], df_cross["longitude_instalacao"]))
df_cross.printSchema()
df_cross.head(3)

root
 |-- codigo: string (nullable = true)
 |-- instalacao: string (nullable = true)
 |-- operador: string (nullable = true)
 |-- latitude_instalacao: double (nullable = true)
 |-- longitude_instalacao: double (nullable = true)
 |-- id: string (nullable = true)
 |-- prop_mag: double (nullable = true)
 |-- prop_url: string (nullable = true)
 |-- data: date (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- pais: string (nullable = true)
 |-- distancia: double (nullable = true)

Out[8]: [Row(codigo='COV', instalacao='Terminal Coveñas', operador='Ocensa', latitude_instalacao=9.409136280745889, longitude_instalacao=-75.70015297027714, id='at00sthakw', prop_mag=6.5, prop_url='https://earthquake.usgs.gov/earthquakes/eventpage/at00sthakw', data=datetime.date(2025, 3, 21), latitude=7.2, longitude=-82.3, pais='Desconhecido', distancia=766.9409381249803),
 Row(codigo='LAG', instalacao='Estacion La Granjita', operador='Ocensa', latitude_instala

In [0]:
### Estimando se houve ou não impacto do terremoto na instação, considerando a distância e a magnitude, para cada par terremoto x instalação
df_cross = df_cross.withColumn("impacto", calcular_impacto_udf(df_cross["distancia"], df_cross["prop_mag"]))
df_impacto = df_cross.filter(df_cross["impacto"] == 1) ### Eliminando as linhas sem impacto.
df_impacto.printSchema()
df_impacto.head(3)

root
 |-- codigo: string (nullable = true)
 |-- instalacao: string (nullable = true)
 |-- operador: string (nullable = true)
 |-- latitude_instalacao: double (nullable = true)
 |-- longitude_instalacao: double (nullable = true)
 |-- id: string (nullable = true)
 |-- prop_mag: double (nullable = true)
 |-- prop_url: string (nullable = true)
 |-- data: date (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- pais: string (nullable = true)
 |-- distancia: double (nullable = true)
 |-- impacto: integer (nullable = true)

Out[9]: [Row(codigo='CHI', instalacao='Estacion Chiquillo', operador='Ocensa', latitude_instalacao=6.849724465710438, longitude_instalacao=-74.48568700509468, id='us10001ldx', prop_mag=6.2, prop_url='https://earthquake.usgs.gov/earthquakes/eventpage/us10001ldx', data=datetime.date(2015, 3, 10), latitude=6.7757, longitude=-72.9875, pais='Colombia', distancia=165.8097685759375, impacto=1),
 Row(codigo='VAS', instalacao='Est

In [0]:
df_eventos_impacto = df_impacto.select(
    "codigo",
    "id",
    "distancia"
)
df_eventos_impacto = df_eventos_impacto.withColumnRenamed("codigo", "instalacoes_cod").withColumnRenamed("id", "terremotos_id") 


df_eventos_impacto.describe().show()

+-------+---------------+-------------+------------------+
|summary|instalacoes_cod|terremotos_id|         distancia|
+-------+---------------+-------------+------------------+
|  count|             37|           37|                37|
|   mean|           null|         null|116.74146564663337|
| stddev|           null|         null|57.846364904525714|
|    min|            CAU|   us10001ldx|16.639204068357735|
|    max|            VAS|   usp000hf0b|230.89478697278986|
+-------+---------------+-------------+------------------+



Agora vamos persistir nossos dados finais.

In [0]:
# Caminho para salvar o DataFrame refinado (camada prata)
file_path_silver = "/mnt/dados/camada_prata_impacto"

# Removendo o diretório existente
dbutils.fs.rm(file_path_silver, recurse=True)

# Salvando o DataFrame refinado no formato Delta Lake (camada prata)
df_eventos_impacto.write.format("delta").mode("overwrite").save(file_path_silver)