In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions  import *
from pyspark.sql.types import StructField, StructType, StructType, IntegerType, FloatType, DateType, StringType
from pyspark.sql import Row

from dotenv import load_dotenv
import sys
import os
import datetime
# Добавляем корневую директорию проекта в sys.path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '../..')))

load_dotenv()


spark = (SparkSession.builder
         .appName("cbr_currency_etl")
         .getOrCreate())

spark

In [None]:
from consultant.extractors.base import get_cbr_currency_rate_json


currency_list = list(get_cbr_currency_rate_json()['Valute'].values())
rows = [Row(**currency) for currency in currency_list]

raw_currency = spark.createDataFrame(rows)

raw_currency.show(100, truncate=False)



['/user/b.kustov/', 'raw_data/', 'cbr/', 'cbr_currency_2025-01-16']
path='/user/b.kustov/raw_data/cbr/cbr_currency_2025-01-16'
hdfs://172.17.0.23//user/b.kustov/raw_data/cbr/cbr_currency_2025-01-16


In [21]:
df = raw_currency \
    .withColumn('VunitRate', col('Value') / col('Nominal')) \
    .withColumn('ValCursDate', lit(datetime.date.today() - datetime.timedelta(days=1))) \
    .select(['CharCode', 'Nominal', 'Value', 'VunitRate', 'ValCursDate'])

new_df = df.select(
    col("CharCode").alias("code_iso"),
    col("Nominal").cast("int").alias("nominal"),
    col("Value").cast("float").alias("rate"),
    col("VunitRate").cast("float").alias("unit_rate"),
    col("ValCursDate").cast("date").alias("on_date")
)


new_df.printSchema()
new_df.show(100, truncate=False)


root
 |-- code_iso: string (nullable = true)
 |-- nominal: integer (nullable = true)
 |-- rate: float (nullable = true)
 |-- unit_rate: float (nullable = true)
 |-- on_date: date (nullable = false)

+--------+-------+--------+----------+----------+
|code_iso|nominal|rate    |unit_rate |on_date   |
+--------+-------+--------+----------+----------+
|AUD     |1      |63.6894 |63.6894   |2025-01-15|
|AZN     |1      |60.4752 |60.4752   |2025-01-15|
|GBP     |1      |125.4358|125.4358  |2025-01-15|
|AMD     |100    |25.8253 |0.258253  |2025-01-15|
|BYN     |1      |29.5926 |29.5926   |2025-01-15|
|BGN     |1      |53.8526 |53.8526   |2025-01-15|
|BRL     |1      |16.946  |16.946    |2025-01-15|
|HUF     |100    |25.7767 |0.257767  |2025-01-15|
|VND     |10000  |42.2417 |0.00422417|2025-01-15|
|HKD     |1      |13.2263 |13.2263   |2025-01-15|
|GEL     |1      |36.1388 |36.1388   |2025-01-15|
|DKK     |1      |14.1175 |14.1175   |2025-01-15|
|AED     |1      |27.994  |27.994    |2025-01-15|
|

In [22]:
# Writing to GreenPlum

from consultant.tools.basic import get_jdbc_url_for_gp, get_properies_for_gp

# import importlib
# import consultant.tools.basic as module

# importlib.reload(module)

print(get_properies_for_gp())
print(get_jdbc_url_for_gp())

new_df.write.jdbc(
    url=get_jdbc_url_for_gp(),
    table='b_kustov.currency_rate',
    properties=get_properies_for_gp(),
    mode='append'
)

{'user': 'wave12_user_a5', 'password': 'pass', 'driver': 'org.postgresql.Driver'}
jdbc:postgresql://172.17.1.32:5432/wave12_team_a


In [None]:
# Writing to hdfs

from consultant.tools.basic import get_hdfs_url

path = [os.getenv('INPUT_DIR'), 'cbr/']
hdfs_url = get_hdfs_url('cbr_currency', True, *path)


raw_currency \
    .write.mode('overwrite') \
    .json(hdfs_url)

In [5]:
spark.stop()