In [7]:
# Подключаем библиотеки
import pandas as pd
import os
from deltalake import DeltaTable
from deltalake.writer import write_deltalake

In [8]:
# Путь до папки, где лежат все delta lake таблицы
PATH_TO_DELTA_TABLES = "output/delta_tables"
path_to_test_table = os.path.join(PATH_TO_DELTA_TABLES, 'test_table')

In [9]:
# Создадим рандомный датафрейм
df = pd.DataFrame({"x": [1, 2, 3]})
df

Unnamed: 0,x
0,1
1,2
2,3


In [10]:
# Запишем df в delta lake файл
write_deltalake(path_to_test_table, df)

DeltaError: Generic error: A table already exists at: C:/Users/vzelikova/learning/airflow/dags/third/output/delta_tables/test_table/

In [11]:
# Вытащим данные из delta lake файла
df1 = DeltaTable(path_to_test_table).to_pandas()
df1

Unnamed: 0,x
0,8
1,9
2,10
3,1
4,2
5,3


In [None]:
# Добавим (append-ом) новые данные в нашу тестовую delta lake таблицу
df2 = pd.DataFrame({"x": [8, 9, 10]})
write_deltalake(path_to_test_table, df2, mode="append")

In [None]:
DeltaTable(path_to_test_table).to_pandas()

Unnamed: 0,x
0,8
1,9
2,10
3,1
4,2
5,3


In [None]:
# При этом старая версия также доступна
DeltaTable(path_to_test_table, version=0).to_pandas()

Unnamed: 0,x
0,1
1,2
2,3


Теперь проверим на реальных данных

In [12]:
# Путь до входных файлов
INPUT_PATH = os.path.join("input", "data")
my_csv_file_path = os.path.join(INPUT_PATH, "requests_data_bad.csv")
# Путь до delta lake таблицы
csv_table_path = os.path.join(PATH_TO_DELTA_TABLES, "csv_table")

In [None]:
df = pd.read_csv(my_csv_file_path)
df.head()

Unnamed: 0,requester_id,accepter_id,accept_date
0,4,9,2024-04-05
1,18,26,2024-09-24
2,7,26,2024-06-02
3,17,3,2024-02-09
4,17,12,2024-07-02


In [None]:
write_deltalake(csv_table_path, df)

In [None]:
DeltaTable(csv_table_path).to_pandas()

Unnamed: 0,requester_id,accepter_id,accept_date
0,4,9,2024-04-05
1,18,26,2024-09-24
2,7,26,2024-06-02
3,17,3,2024-02-09
4,17,12,2024-07-02
...,...,...,...
195,30,25,2024-04-06
196,9,24,2024-08-02
197,13,6,2024-04-04
198,11,21,2024-10-13


# Тестим duckdb

In [14]:
import duckdb

In [15]:
# Переменные
my_csv_file_path
errors_file_path = os.path.join(INPUT_PATH, 'errors.txt')

In [16]:
# Создаем in-memory соединение
con = duckdb.connect()

In [17]:
# Создаем таблицу для данных из csv файла
query = f"""
    CREATE TABLE tmp_request_accepted AS
    SELECT * FROM '{my_csv_file_path}';
    """
con.sql(query)


In [18]:
# Посмотрим на таблицу
con.sql(
    "SELECT * FROM tmp_request_accepted"
).show()

┌──────────────┬─────────────┬─────────────┐
│ requester_id │ accepter_id │ accept_date │
│    int64     │    int64    │    date     │
├──────────────┼─────────────┼─────────────┤
│            4 │           9 │ 2024-04-05  │
│           18 │          26 │ 2024-09-24  │
│            7 │          26 │ 2024-06-02  │
│           17 │           3 │ 2024-02-09  │
│           17 │          12 │ 2024-07-02  │
│           30 │           9 │ 2024-08-19  │
│           11 │           5 │ 2024-01-22  │
│           28 │          21 │ 2024-01-17  │
│            3 │          22 │ 2024-04-03  │
│           14 │          22 │ 2024-04-19  │
│            · │           · │     ·       │
│            · │           · │     ·       │
│            · │           · │     ·       │
│           17 │          19 │ 2024-07-20  │
│            9 │          15 │ 2024-10-23  │
│           11 │          12 │ 2024-04-14  │
│            2 │           9 │ 2024-06-16  │
│           19 │          25 │ 2024-10-21  │
│         

Создадим таблицу для ошибочных id (error_ids)

In [19]:
# Это мне не надо, но оставлю тут на всякий
# Сначала считаем txt в dataframe
errors = pd.read_csv(errors_file_path, header=None)
errors

Unnamed: 0,0
0,6
1,17
2,19
3,22
4,12


In [20]:
# Создаем структуру таблицы
con.sql(
    """
    CREATE TABLE error_ids (
        error_id integer
    )
    """
)

In [21]:
# Вставляем значения из файла errors.txt
query = f"""
    INSERT INTO error_ids
    SELECT * FROM read_csv('{errors_file_path}', header = false);
    """
con.sql(query)

In [22]:
# Посмотрим на полученную таблицу
con.sql("SELECT * FROM error_ids").show()

┌──────────┐
│ error_id │
│  int32   │
├──────────┤
│        6 │
│       17 │
│       19 │
│       22 │
│       12 │
└──────────┘



Выполняем скрипт для очистки данных

In [25]:
# Переменные
sql_scripts_path = os.path.join('input', 'sql')
data_cleansing_script_path = os.path.join(sql_scripts_path, 'data_cleansing.sql')

In [None]:
# Читаем файл с sql скриптом
with open(data_cleansing_script_path) as data:
    query = data.read()

In [27]:
# Выполняем запрос
con.sql(query)

BinderException: Binder Error: Referenced column "ctid" not found in FROM clause!
Candidate bindings: "accept_date", "accepter_id"

In [71]:
# Закрытие соединения
con.close()