<a href="https://colab.research.google.com/github/RcrvzM/DM_PROYECTO-FINAL/blob/main/Proyecciones_economicas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [87]:
# Instalar Java y Spark
!apt-get install openjdk-11-jdk -y
!wget -q https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz
!pip install -q findspark

# Configurar variables de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

# Iniciar Spark
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PIB_Mundial").getOrCreate()

spark.version


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jre
  x11-utils
Suggested packages:
  libxt-doc openjdk-11-demo openjdk-11-source visualvm mesa-utils
The following NEW packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jdk
  openjdk-11-jre x11-utils
0 upgraded, 10 newly installed, 0 to remove and 34 not upgraded.
Need to get 5,366 kB of archives.
After this operation, 15.2 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-core all 2.37-2build1 [1,041 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-extra all 2.37-2build1 [2,041 kB]
Get:3 http://archive.ubuntu.com/ubuntu jam

'3.3.2'

In [None]:
#Fin de instalacion de Spark

In [None]:
#Incio de descarga de fuentes de datos

In [103]:
import requests, json

# API del Banco Mundial - PIB
url_pib = "http://api.worldbank.org/v2/country/all/indicator/NY.GDP.MKTP.CD?format=json&per_page=20000"
r_pib = requests.get(url_pib)
data_pib = r_pib.json()[1]

# Guardar como JSONL
with open("/content/pib.json", "w") as f:
    for record in data_pib:
        f.write(json.dumps(record) + "\n")


In [104]:
# API del Banco Mundial - Tasa de empleo (% de población)
url_empleo = "http://api.worldbank.org/v2/country/all/indicator/SL.EMP.TOTL.SP.ZS?format=json&per_page=20000"
r_empleo = requests.get(url_empleo)
data_empleo = r_empleo.json()[1]

# Guardar como JSONL
with open("/content/empleo.json", "w") as f:
    for record in data_empleo:
        f.write(json.dumps(record) + "\n")


In [130]:
import requests

# API SDMX del FMI – Tipo de cambio (ENDE_XDC_USD_RATE)
url_fx = "http://dataservices.imf.org/REST/SDMX_XML.svc/CompactData/IFS/A..ENDE_XDC_USD_RATE"

# Descargar archivo
r_fx = requests.get(url_fx)

# Guardar en disco (sin procesamiento aún)
with open("/content/fx.xml", "wb") as f:
    f.write(r_fx.content)


In [None]:
# Fin de descarga de fuentes de datos

In [None]:
# Inicio de validaciones de fuentes de datos

In [106]:
from pyspark.sql.functions import col

# Cargar archivos JSONL
df_pib = spark.read.json("/content/pib.json")
df_empleo = spark.read.json("/content/empleo.json")

# Función de validación de estructura
def validar_dataset(df, nombre, columnas):
    print(f"\n📊 Validando: {nombre}")
    errores = {}
    total = df.count()
    print(f"Total filas: {total}")

    for columna in columnas:
        nulos = df.filter(col(columna).isNull()).count()
        errores[columna] = nulos
        print(f"Nulos en '{columna}': {nulos}")

    return errores

# Validar PIB
validar_dataset(df_pib, "PIB", ["country.value", "date", "value"])

# Validar Empleo
validar_dataset(df_empleo, "Empleo", ["country.value", "date", "value"])



📊 Validando: PIB
Total filas: 17290
Nulos en 'country.value': 0
Nulos en 'date': 0
Nulos en 'value': 2983

📊 Validando: Empleo
Total filas: 17290
Nulos en 'country.value': 0
Nulos en 'date': 0
Nulos en 'value': 9309


{'country.value': 0, 'date': 0, 'value': 9309}

In [134]:
import xml.etree.ElementTree as ET

# Parsear el XML sin namespace
tree = ET.parse("/content/fx.xml")
root = tree.getroot()

# Extraer nodos <Series>
series_nodos = root.findall(".//{http://dataservices.imf.org/compact/IFS}Series")

print(f"📌 Total Series encontradas: {len(series_nodos)}")

# Extraer observaciones
data = []

for serie in series_nodos:
    pais = serie.attrib.get("REF_AREA")
    for obs in serie.findall("{http://dataservices.imf.org/compact/IFS}Obs"):
        anio = obs.attrib.get("TIME_PERIOD")
        valor = obs.attrib.get("OBS_VALUE")
        if pais and anio and valor:
            data.append((pais, int(anio), float(valor)))

# Ver resumen
print(f"✅ Total registros válidos: {len(data)}")
print("Ejemplo:", data[:5])


📌 Total Series encontradas: 226
✅ Total registros válidos: 13939
Ejemplo: [('NG', 1950, 0.714286), ('NG', 1951, 0.714286), ('NG', 1952, 0.714286), ('NG', 1953, 0.714286), ('NG', 1954, 0.714286)]


In [None]:
# Fin de validaciones de fuentes de datos

In [None]:
#Cargar SPARK

In [136]:
df_pib = spark.read.json("/content/pib.json")

# Filtrar registros útiles
df_pib_limpio = df_pib.filter(
    col("country.value").isNotNull() &
    col("date").isNotNull() &
    col("value").isNotNull()
)

# Renombrar columnas y tipos
df_pib_limpio = df_pib_limpio.select(
    col("country.value").alias("pais"),
    col("date").cast("int").alias("anio"),
    col("value").cast("double").alias("pib_usd")
)

df_pib_limpio.show(5)


+--------------------+----+-------------------+
|                pais|anio|            pib_usd|
+--------------------+----+-------------------+
|Africa Eastern an...|2023|1.24547247167595E12|
|Africa Eastern an...|2022|1.19142317624296E12|
|Africa Eastern an...|2021|1.08574517885097E12|
|Africa Eastern an...|2020|9.33391782089617E11|
|Africa Eastern an...|2019|1.00972117405491E12|
+--------------------+----+-------------------+
only showing top 5 rows



In [137]:
df_empleo = spark.read.json("/content/empleo.json")

# Filtrar registros válidos
df_empleo_limpio = df_empleo.filter(
    col("country.value").isNotNull() &
    col("date").isNotNull() &
    col("value").isNotNull()
)

# Renombrar columnas y tipos
df_empleo_limpio = df_empleo_limpio.select(
    col("country.value").alias("pais"),
    col("date").cast("int").alias("anio"),
    col("value").cast("double").alias("tasa_empleo")
)

df_empleo_limpio.show(5)


+--------------------+----+----------------+
|                pais|anio|     tasa_empleo|
+--------------------+----+----------------+
|Africa Eastern an...|2024|63.8048907252938|
|Africa Eastern an...|2023|63.8966859801838|
|Africa Eastern an...|2022|61.6567788664629|
|Africa Eastern an...|2021|61.0661154052549|
|Africa Eastern an...|2020|60.8607715076156|
+--------------------+----+----------------+
only showing top 5 rows



In [139]:
import csv

# Guardar la lista como archivo CSV
with open("/content/fx.csv", "w", newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["pais", "anio", "tipo_cambio"])
    writer.writerows(data)


In [140]:
df_fx_limpio = spark.read.option("header", True).option("inferSchema", True).csv("/content/fx.csv")
df_fx_limpio.show(5)
df_fx_limpio.printSchema()


+----+----+-----------+
|pais|anio|tipo_cambio|
+----+----+-----------+
|  NG|1950|   0.714286|
|  NG|1951|   0.714286|
|  NG|1952|   0.714286|
|  NG|1953|   0.714286|
|  NG|1954|   0.714286|
+----+----+-----------+
only showing top 5 rows

root
 |-- pais: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- tipo_cambio: double (nullable = true)



In [None]:
# Descarga de Paises / Normalizacion

In [142]:
url_alt = "https://raw.githubusercontent.com/datasets/country-codes/master/data/country-codes.csv"
r = requests.get(url_alt)

with open("/content/paises_codigos.csv", "wb") as f:
    f.write(r.content)

df_codigos = spark.read.option("header", True).option("inferSchema", True).csv("/content/paises_codigos.csv")

# Verificar columnas disponibles
df_codigos.columns[:10]


['FIFA',
 'Dial',
 'ISO3166-1-Alpha-3',
 'MARC',
 'is_independent',
 'ISO3166-1-numeric',
 'GAUL',
 'FIPS',
 'WMO',
 'ISO3166-1-Alpha-2']

In [143]:
# Normalizar código y nombre
df_codigos = df_codigos.select(
    col("ISO3166-1-Alpha-2").alias("pais_codigo"),
    col("official_name_en").alias("pais")
).filter(col("pais").isNotNull() & col("pais_codigo").isNotNull())

df_codigos.show(5)


+-----------+--------------+
|pais_codigo|          pais|
+-----------+--------------+
|         AF|   Afghanistan|
|         AX| Åland Islands|
|         AL|       Albania|
|         DZ|       Algeria|
|         AS|American Samoa|
+-----------+--------------+
only showing top 5 rows



In [144]:
df_fx_final = df_fx_limpio.join(df_codigos, df_fx_limpio.pais == df_codigos.pais_codigo, "left") \
                           .select("pais", "anio", "tipo_cambio")

df_fx_final.show(5)


AnalysisException: Reference 'pais' is ambiguous, could be: pais, pais.