In [155]:
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import os

In [190]:
import chardet

import re


In [188]:
file_path = r'D:\Documents\Work\Neoflex\DE Проектное задание\neoflex_project\airflow_files\md_currency_d.csv'

In [182]:
with open(file_path, 'rb') as f:
    result = chardet.detect(f.read())
    print(result)

{'encoding': 'Windows-1252', 'confidence': 0.73, 'language': ''}


In [183]:
with open(file_path, 'rb') as f:
            result = chardet.detect(f.read())
            encoding = result['encoding']
print(encoding)

Windows-1252


In [185]:
data = pd.read_csv(file_path, sep=";", encoding=encoding)

In [186]:
data

Unnamed: 0,CURRENCY_RK,DATA_ACTUAL_DATE,DATA_ACTUAL_END_DATE,CURRENCY_CODE,CODE_ISO_CHAR
0,4586704,2011-09-06,2050-12-31,0.0,NON
1,50,2017-05-11,2050-12-31,356.0,INR
2,51,2017-05-11,2050-12-31,484.0,MXN
3,52,2017-05-11,2050-12-31,434.0,LYD
4,53,2017-05-11,2050-12-31,422.0,LBR
5,54,2017-05-11,2050-12-31,504.0,MAD
6,55,2017-05-11,2050-12-31,410.0,KRW
7,56,2017-05-11,2050-12-31,12.0,DZD
8,57,2017-05-11,2050-12-31,417.0,KGS
9,58,2017-05-11,2050-12-31,100.0,BGN


In [187]:
def prepare_dataset(file_path, sep=";"):
    """
    Подготавливает датасет для загрузки в БД. 
    """
    try:

        with open(file_path, 'rb') as f:
            result = chardet.detect(f.read())
            encoding = result['encoding']

        data = pd.read_csv(file_path, sep=sep, encoding=encoding)
        data.columns = [col.lower().strip().replace(';', '') for col in data.columns]  # Приведение имен столбцов к нижнему регистру
        data.drop_duplicates(inplace=True)

        # Функция для очистки строк
        def clean_string(s):
            if pd.isnull(s) or s == '':
                return 'N/A'  # Замена пустых строк на 'N/A'
            cleaned = re.sub(r'[^\x20-\x7Eа-яА-ЯёЁ]+', 'N/A', str(s))  # Замена нечитаемых знаков на 'N/A'
            return cleaned.strip()

        for col in data.columns:
            if data[col].dtype == np.int64:
                data[col] = data[col].astype(int)
            elif data[col].dtype == np.float64:
                data[col] = data[col].astype(float)
            elif data[col].dtype == object:
                data[col] = data[col].apply(clean_string)  # Применяем очистку к строковым данным

        for col in data.columns:
            if "date" in col:
                data[col] = pd.to_datetime(data[col], errors='coerce')

        for col in data.columns:
            if data[col].isnull().any():
                if data[col].dtype in [int, float]:
                    data[col].fillna(0, inplace=True)
                elif "date" in col:
                    data[col].fillna(pd.Timestamp("1900-01-01").date(), inplace=True)
                else:
                    data[col].fillna("N/A", inplace=True)
        return data

    except Exception as e:
        raise RuntimeError(f"Ошибка при подготовке датасета из файла {file_path}: {e}")

In [191]:
df = prepare_dataset(file_path)

In [192]:
df

Unnamed: 0,currency_rk,data_actual_date,data_actual_end_date,currency_code,code_iso_char
0,4586704,2011-09-06,2050-12-31,0.0,NON
1,50,2017-05-11,2050-12-31,356.0,INR
2,51,2017-05-11,2050-12-31,484.0,MXN
3,52,2017-05-11,2050-12-31,434.0,LYD
4,53,2017-05-11,2050-12-31,422.0,LBR
5,54,2017-05-11,2050-12-31,504.0,MAD
6,55,2017-05-11,2050-12-31,410.0,KRW
7,56,2017-05-11,2050-12-31,12.0,DZD
8,57,2017-05-11,2050-12-31,417.0,KGS
9,58,2017-05-11,2050-12-31,100.0,BGN


In [193]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50 entries, 0 to 49
Data columns (total 5 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   currency_rk           50 non-null     int32         
 1   data_actual_date      50 non-null     datetime64[ns]
 2   data_actual_end_date  50 non-null     datetime64[ns]
 3   currency_code         50 non-null     float64       
 4   code_iso_char         50 non-null     object        
dtypes: datetime64[ns](2), float64(1), int32(1), object(1)
memory usage: 1.9+ KB
