## Bloque para cargar las distintas credenciales

In [1]:
# IMPORT KEYS USING .env FILE
!pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.0.0-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.0


In [5]:
import os

# Example from https://pypi.org/project/python-dotenv/
from dotenv import load_dotenv
load_dotenv()

# OR, the same with increased verbosity
load_dotenv(verbose=True)

# Print variable FOO
#print(os.environ.get('NEWSAPI_KEY'))

True

## Bloque para obtener JSON de NewsAPI

In [7]:
# Obtención de datos
import requests

# para replicar el GET de ejempl:
# GET https://newsapi.org/v2/everything?q=Apple&from=2023-05-26&sortBy=popularity&apiKey=API_KEY

# Por defecto, la versión gratuita nos trae noticias del último mes (o últimos 30 días)
query = {'q': 'data engineer',
         'p': 1,
         'apiKey': os.environ.get('NEWSAPI_KEY')}

response = requests.get('https://newsapi.org/v2/everything', params=query)
#print(response.json())
#print(response.encoding)
#print(response.content.decode('utf-8'))

In [8]:
# Convertimos a JSON

import json

#convert string to  object
json_object = json.loads(response.content.decode('utf-8'))

#check new data type
print(type(json_object))

<class 'dict'>


## Cargamos Pyspark

In [9]:
# Cargamos PYSPARK

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .config("spark.jars", "/jars/postgresql-42.2.27.jre7.jar") \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

## Función principal para obtención de páginas de artículos (extract)

In [14]:
# Función para obtener páginas de articulos

def generateDataframeApiWeather(topic, pages, language = 'en'):

    df_list = []
    pages_temp = 0

    # first query to get number of results and decide number of pages
    query =  {  'q': topic,
                'language': language,
                'apiKey': os.environ.get('NEWSAPI_KEY')}

    response = requests.get('https://newsapi.org/v2/everything', params=query)

    #convert string to  object
    json_object = json.loads(response.content.decode('utf-8'))
    print("La API encontró {num} artículos sobre el tema consultado".format(num = json_object['totalResults']))

    if(json_object['totalResults'] == 0):
        raise Exception("No se encontraron resultados sobre el tema consultado! :( ")

    elif(json_object['totalResults'] <= 100):
        print("Se ignora el parámetro de cantidad de páginas...")

        df = spark.createDataFrame(data=json_object['articles']) \
            .withColumn("source_name", F.col("source").getItem("name")).drop(F.col("source")) \
            .withColumn("publishedAt", F.to_timestamp("publishedAt")) \
            #.withColumn("idrecord", F.lit(0))

        df_list.append(df)
        return df_list

    elif(json_object['totalResults'] > 100):
        
        # Agregamos los primeros 100 artículos
        df = spark.createDataFrame(data=json_object['articles']) \
                .withColumn("source_name", F.col("source").getItem("name")) \
                .withColumn("source_id", F.col("source").getItem("id")) \
                .drop(F.col("source")) \
                .withColumn("publishedAt", F.to_timestamp("publishedAt"))

        df_list.append(df)

        pages_temp = int(json_object['totalResults']/100) + (0 if(json_object['totalResults']%100==0) else 1)

        print("Se encontraron {pages_total} páginas en total... obteniendo artículos para {pags} páginas...".format(pags = pages, pages_total = pages_temp))
        # Corrección de num de páginas solicitadas (depende de la cantidad de resultados obtenidos)
        if (pages_temp < pages):
            pages = pages_temp


        for p in range(2, pages+1):
            query = {'q': topic,
                    'page': p+1,
                    'language': language,
                    'apiKey': os.environ.get('NEWSAPI_KEY')}

            response = requests.get('https://newsapi.org/v2/everything', params=query)

            #convert string to  object
            json_object = json.loads(response.content.decode('utf-8'))

            df = spark.createDataFrame(data=json_object['articles']) \
                    .withColumn("source_name", F.col("source").getItem("name")) \
                    .withColumn("source_id", F.col("source").getItem("id")) \
                    .drop(F.col("source")) \
                    .withColumn("publishedAt", F.to_timestamp("publishedAt"))

            df_list.append(df)

        return df_list
    else:
        raise("Ocurrió un error inesperado!")

In [15]:
from functools import reduce

# traemos 5 páginas de 100 articulos cada una sobre "Ingenieria de Datos" (en inglés)
try:
    df_articles = generateDataframeApiWeather('data engineer', 2)
    df_complete = reduce(DataFrame.unionAll, df_articles)

    df_complete.printSchema()
    df_complete.show()
except Exception as e:
    print(e)

La API encontró 2028 artículos sobre el tema consultado
Se encontraron 21 páginas en total... obteniendo artículos para 2 páginas...
root
 |-- author: string (nullable = true)
 |-- content: string (nullable = true)
 |-- description: string (nullable = true)
 |-- publishedAt: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- urlToImage: string (nullable = true)
 |-- source_name: string (nullable = true)
 |-- source_id: string (nullable = true)

+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-------------------+----------------+
|              author|             content|         description|        publishedAt|               title|                 url|          urlToImage|        source_name|       source_id|
+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+-

## Conexión a db en redshift y carga de datos

In [None]:
df_complete.write.format("jdbc")\
    .option("url", "jdbc:postgresql://{host}:{port}/{database}".format(
        host=os.environ.get('REDSHIFT_HOST'),
        port=os.environ.get('REDSHIFT_PORT'),
        database=os.environ.get('REDSHIFT_DATABASE'))) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "i_zapata1989_coderhouse.df_news_api_dw") \
    .option("user", os.environ.get('REDSHIFT_USER')) \
    .option("password", os.environ.get('REDSHIFT_PASS')) \
    .mode('append') \
    .save()