In [1]:
import polars as pl

## 1. Pipeline for test dataframe

In [2]:
df = pl.read_parquet("../data/batches/batch1.parquet")
df.columns

['tin',
 'year',
 'reg_number',
 'kind',
 'category',
 'org_name',
 'org_short_name',
 'activity_code_main',
 'region_iso_code',
 'region_code',
 'region',
 'area',
 'settlement',
 'settlement_type',
 'oktmo',
 'lat',
 'lon',
 'start_date',
 'end_date',
 'revenue',
 'expenditure',
 'employees_count']

**Write to clickhouse**: tin, reg_number, year, kind, category, activity_code_main.split('.')[0], region_code, settlement_type, lat, lon, revenue,  expenditure, profitability=(revenue - expenditure)/revenue, employees_count

**Drop**: org_name, org_short_name, region_iso_code, area, settlement, settlement_type, oktmo, start_date, end_date, 

In [3]:
df.head()

tin,year,reg_number,kind,category,org_name,org_short_name,activity_code_main,region_iso_code,region_code,region,area,settlement,settlement_type,oktmo,lat,lon,start_date,end_date,revenue,expenditure,employees_count
i64,i64,str,i64,i64,str,str,str,str,i64,str,str,str,str,str,str,str,str,str,str,str,str
1659060204,2018,"""1051637088788""",1,2,"""ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТС…","""ООО ""ЭКОСТРОЙ""""","""41.2""","""RU-UD""",18,"""Удмуртская республика""","""""","""Ижевск""","""г""","""94701000001""","""56.852737""","""53.21149""","""2017-07-10""","""2020-07-10""","""27439000.0""","""27954000.0""","""3"""
1659060204,2019,"""1051637088788""",1,2,"""ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТС…","""ООО ""ЭКОСТРОЙ""""","""41.2""","""RU-UD""",18,"""Удмуртская республика""","""""","""Ижевск""","""г""","""94701000001""","""56.852737""","""53.21149""","""2017-07-10""","""2020-07-10""","""0.0""","""0.0""","""0"""
1659060204,2020,"""1051637088788""",1,2,"""ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТС…","""ООО ""ЭКОСТРОЙ""""","""41.2""","""RU-UD""",18,"""Удмуртская республика""","""""","""Ижевск""","""г""","""94701000001""","""56.852737""","""53.21149""","""2017-07-10""","""2020-07-10""","""0.0""","""0.0""","""0"""
1659060204,2020,"""1051637088788""",1,1,"""ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТС…","""ООО ""ЭКОСТРОЙ""""","""41.2""","""RU-UD""",18,"""Удмуртская республика""","""""","""Ижевск""","""г""","""94701000001""","""56.852737""","""53.21149""","""2020-08-10""","""2021-08-10""","""0.0""","""0.0""","""0"""
1659060250,2018,"""1051637089041""",1,2,"""ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТС…","""ООО ""МЕТАЛЛИСТ""""","""26.60.1""","""RU-TA""",16,"""Республика Татарстан""","""""","""Казань""","""г""","""92701000001""","""55.794357""","""49.111496""","""2016-08-10""","""2021-06-10""","""6156000.0""","""6067000.0""","""1"""


Split activity code:

In [4]:
df = df.with_columns(
    pl.col("activity_code_main").str.split(".").list.get(0).alias("activity_code_main")
)

Cast columns to preferred types:

In [5]:
for column in ["lat", "lon", "employees_count", "activity_code_main", "reg_number"]:
    df = df.filter(pl.col(column) != "")

df = df.with_columns(
    pl.col("revenue").cast(pl.Float32),
    pl.col("expenditure").cast(pl.Float32),
    pl.col("lat").cast(pl.Float32),
    pl.col("lon").cast(pl.Float32),
    pl.col("employees_count").cast(pl.Int32),
    pl.col("activity_code_main").cast(pl.Int32),
    pl.col("reg_number").cast(pl.Int64),
)

Add profitability column:

In [6]:
df = df.with_columns(
    ((pl.col("revenue") - pl.col("expenditure")) / pl.col("revenue")).alias(
    "profitability")
)
df.head()

tin,year,reg_number,kind,category,org_name,org_short_name,activity_code_main,region_iso_code,region_code,region,area,settlement,settlement_type,oktmo,lat,lon,start_date,end_date,revenue,expenditure,employees_count,profitability
i64,i64,i64,i64,i64,str,str,i32,str,i64,str,str,str,str,str,f32,f32,str,str,f32,f32,i32,f32
1659060204,2018,1051637088788,1,2,"""ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТС…","""ООО ""ЭКОСТРОЙ""""",41,"""RU-UD""",18,"""Удмуртская республика""","""""","""Ижевск""","""г""","""94701000001""",56.852737,53.211491,"""2017-07-10""","""2020-07-10""",27439000.0,27954000.0,3,-0.018769
1659060204,2019,1051637088788,1,2,"""ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТС…","""ООО ""ЭКОСТРОЙ""""",41,"""RU-UD""",18,"""Удмуртская республика""","""""","""Ижевск""","""г""","""94701000001""",56.852737,53.211491,"""2017-07-10""","""2020-07-10""",0.0,0.0,0,
1659060204,2020,1051637088788,1,2,"""ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТС…","""ООО ""ЭКОСТРОЙ""""",41,"""RU-UD""",18,"""Удмуртская республика""","""""","""Ижевск""","""г""","""94701000001""",56.852737,53.211491,"""2017-07-10""","""2020-07-10""",0.0,0.0,0,
1659060204,2020,1051637088788,1,1,"""ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТС…","""ООО ""ЭКОСТРОЙ""""",41,"""RU-UD""",18,"""Удмуртская республика""","""""","""Ижевск""","""г""","""94701000001""",56.852737,53.211491,"""2020-08-10""","""2021-08-10""",0.0,0.0,0,
1659060250,2018,1051637089041,1,2,"""ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТС…","""ООО ""МЕТАЛЛИСТ""""",26,"""RU-TA""",16,"""Республика Татарстан""","""""","""Казань""","""г""","""92701000001""",55.794357,49.111496,"""2016-08-10""","""2021-06-10""",6156000.0,6067000.0,1,0.014457


Drop columns:

In [9]:
df = df.drop([
    "org_name",
    "org_short_name",
    "region_iso_code",
    "region",
    "area",
    "settlement",
    "oktmo",
    "start_date",
    "end_date"
])

The dataframe should have the structure presented in the figma diagram:

In [10]:
df.columns

['tin',
 'year',
 'reg_number',
 'kind',
 'category',
 'activity_code_main',
 'region_code',
 'settlement_type',
 'lat',
 'lon',
 'revenue',
 'expenditure',
 'employees_count',
 'profitability']

## 2. Process all parquet files using the pipeline

In [11]:
from clickhouse_driver import Client
from dotenv import load_dotenv
import os

load_dotenv()
os.makedirs("../data/batches_msp/", exist_ok=True)

host = os.getenv("CLICKHOUSE_HOST")
user = os.getenv("CLICKHOUSE_USER")
password = os.getenv("CLICKHOUSE_PASSWORD")
port = int(os.getenv("CLICKHOUSE_PORT", 9440))

client = Client(
    host=host,
    user=user,
    password=password,
    port=port,
    secure=True,
    verify=True,
    ca_certs="/usr/local/share/ca-certificates/Yandex/RootCA.crt",
)
client.execute("""
    DROP TABLE IF EXISTS db1.MSP
""")
client.execute("""
    CREATE TABLE IF NOT EXISTS db1.MSP (
    tin Int64,
    year Int64,
    reg_number Int64,
    kind Int64,
    category Int64, 
    activity_code_main Int32,
    region_code Int64,
    settlement_type String,
    lat Float32,
    lon Float32,
    revenue Float32,
    expenditure Float32,
    employees_count Int32,
    profitability Float32
    ) ENGINE = MergeTree()
    ORDER BY (year, reg_number, tin)
""")

[]

In [14]:
for file in os.listdir("../data/batches"):
    df = pl.read_parquet(f"../data/batches/{file}")
    print(f"File: {file}")

    df = df.with_columns(
        pl.col("activity_code_main").str.split(".").list.get(0).alias("activity_code_main")
    )
    for column in ["lat", "lon", "employees_count", "activity_code_main", "reg_number"]:
        df = df.filter(pl.col(column) != "")
    df = df.with_columns(
        pl.col("revenue").cast(pl.Float32),
        pl.col("expenditure").cast(pl.Float32),
        pl.col("lat").cast(pl.Float32),
        pl.col("lon").cast(pl.Float32),
        pl.col("employees_count").cast(pl.Int32),
        pl.col("activity_code_main").cast(pl.Int32),
        pl.col("reg_number").cast(pl.Int64),
    )
    df = df.with_columns(
        ((pl.col("revenue") - pl.col("expenditure")) / pl.col("revenue")).alias(
        "profitability")
    )
    df = df.drop([
        "org_name",
        "org_short_name",
        "region_iso_code",
        "region",
        "area",
        "settlement",
        "oktmo",
        "start_date",
        "end_date"
    ])
    
    df.write_parquet(f"../data/batches_msp/{file}")

File: batch8.parquet
File: batch4.parquet
File: batch2.parquet
File: batch7.parquet
File: batch3.parquet
File: batch6.parquet
File: batch1.parquet
File: batch0.parquet
File: batch5.parquet


To upload the batches to the clickhouse database, use this command (Requires clickhouse client):
```bash
echo "insert into MSP from infile 'data/batches_msp/batch0.parquet' FORMAT Parquet;" | clickhouse-client --host clickhouse-example-host \
                  --secure \
                  --user admin \
                  --database db1 \
                  --port 9440 \
                  --ask-password

``` 