# *Siglo XXI*
##Challenge Data engineer 
###Azure

Contexto

El siguiente challenge está pensado para que el desarrollador utilice las herramientas que crea necesarias para llevarlo a cabo. Para el desarrollo sobre las herramientas de azure, se debe utilizar la evaluación gratuita de 30 días que micrsoft ofrece con cualquier cuenta de email.

Parte 1: Extracción y Transformación de Datos con PySpark

Extracción de Datos desde una API:

- Utilizar Spark para extraer datos de una API pública de tu elección.
- Aplica transformaciones necesarias para preparar los datos para su posterior análisis (ej: procesar archivos json a tablas).
- Consolidar un archivo único con el resultado de la extracción.

Para realizar este examen se decidió utilizar la API de CoinGecko que brinda información sobre criptomonedas.

Fuente: https://www.coingecko.com/api/documentation


###**ETL realizado con pySpark**

In [0]:
%run "/sigloXXI/Librerias"

In [0]:
# Se crea una sesión Spark
spark = SparkSession.builder.appName("CryptoData").getOrCreate()

# URL de la API de CoinGecko para obtener el precio de Bitcoin en dólares estadounidenses.
url = "https://api.coingecko.com/api/v3/simple/price"

# Lista de IDs de las criptomonedas que deseas consultar (puedes agregar más o cambiar las que necesites).
crypto_ids = [
    #"bitcoin",
    "ethereum",
    "ripple",
    "litecoin",
    "cardano",
    # Se pueden agregar más monedas aca
]
# Formatea los IDs en una cadena separada por comas.
ids_str = ",".join(crypto_ids)
params = {
    "ids": ids_str,
    "vs_currencies": "usd",
    "include_market_cap": "true",
    "include_24hr_vol": "true",
    "include_24hr_change": "true"
}

# Realizar la solicitud GET a la API
response = requests.get(url, params=params)

# Comprobar si la solicitud se realizó con éxito
if response.status_code == 200:
    # La solicitud fue exitosa, analiza los datos JSON
    data = response.json()
    crypto_data = []
    current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    for crypto_id in crypto_ids:
        price_usd = data[crypto_id]['usd']
        last_price = data[crypto_id]['usd_market_cap']
        crypto_data.append({'Crypto ID': crypto_id, 'Price USD': price_usd, 'usd_market_cap': last_price, 'Current Date':current_date})

    # Crea un DataFrame a partir de los datos
    df = spark.createDataFrame(crypto_data)

    # Mostrar el DataFrame
    df.show()
else:
    # La solicitud no fue exitosa. Puedes manejar el error según tus necesidades.
    print(f"Error en la solicitud. Código de estado: {response.status_code}")

# Detener la sesión Spark cuando hayas terminado
#spark.stop()

+---------+-------------------+---------+--------------------+
|Crypto ID|       Current Date|Price USD|      usd_market_cap|
+---------+-------------------+---------+--------------------+
| ethereum|2023-11-02 04:39:32|  1855.61|2.232391994633327...|
|   ripple|2023-11-02 04:39:32| 0.609428|3.269064577399247...|
| litecoin|2023-11-02 04:39:32|    70.28|  5.18617978795294E9|
|  cardano|2023-11-02 04:39:32| 0.308644|1.080036653545249...|
+---------+-------------------+---------+--------------------+



### **Persistensia de Datos en DataLakeGen2**

Parte 2: Integración de Servicios en Azure 

Escribe los datos procesados en la Parte 1.1 en un almacenamiento de datos persistente en 
Azure (por ejemplo, Azure Data Lake Storage, Azure Blob Storage, etc.).

In [0]:
# Configuracion de la conexión 
configs = {"fs.azure.account.auth.type":"OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<CLIENT_ID>",
           "fs.azure.account.oauth2.client.secret": "<SECRET>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<ENDPOINT>/oauth2/token"}
dbutils.fs.mount(
    source = "abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/",
    mount_point = "/mnt/<STORAGE_ACCOUNT>/<CONTAINER>",
    extra_configs = configs
)

True

In [0]:
# Se calcula la fecha y hora actual
date_file = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# Ruta destino mas fecha y hora actual
ruta_target = "/mnt/<STORAGE_ACCOUNT>/<CONTAINER>" + date_file

# Escribe el DataFrame en el almacenamiento de datos en Azure
df.write.csv(ruta_target, header=True, mode="overwrite")

print("Datos procesados almacenados en Azure Data Lake Storage")

Datos procesados almacenados en Azure Data Lake Storage
