In [1]:
import pathlib
import os
import zipfile

import pandas as pd
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import coalesce, desc, first, last, lead, lower, lpad, row_number, max, min, upper, year
from pyspark.sql.types import StructField, StructType, ByteType, DateType, FloatType, IntegerType, ShortType, StringType
import tqdm

In [2]:
pd.set_option("display.max_columns", None)

# Spark Configuration

In [3]:
conf = SparkConf()
conf.setMaster("local[*]").setAppName("Dataset Maker")

<pyspark.conf.SparkConf at 0x7fe218fc2fe0>

In [4]:
sc = SparkContext(conf=conf)

23/11/13 11:45:44 WARN Utils: Your hostname, asus-notebook resolves to a loopback address: 127.0.1.1; using 192.168.1.186 instead (on interface wlp3s0)
23/11/13 11:45:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/13 11:45:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
sc.uiWebUrl

'http://192.168.1.186:4040'

In [6]:
session = SparkSession(sc)

# Small&Meduim Business Data

In [7]:
rsmp_schema = StructType([
    StructField("kind", ByteType(), False),
    StructField("category", ByteType(), False),
    StructField("reestr_date", DateType(), False),
    StructField("data_date", DateType(), False),
    StructField("ind_tin", StringType(), True),
    StructField("ind_number", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("patronymic", StringType(), True),
    StructField("org_name", StringType(), True),
    StructField("org_short_name", StringType(), True),
    StructField("org_tin", StringType(), True),
    StructField("org_number", StringType(), True),
    StructField("region_code", ByteType(), True),
    StructField("region_name", StringType(), True),
    StructField("region_type", StringType(), True),
    StructField("district_name", StringType(), True),
    StructField("district_type", StringType(), True),
    StructField("city_name", StringType(), True),    
    StructField("city_type", StringType(), True),
    StructField("settlement_name", StringType(), True),
    StructField("settlement_type", StringType(), True),
    StructField("activity_code_main", StringType(), False),
    StructField("activity_codes_additional", StringType(), True),
    StructField("total", ShortType(), True), 
    StructField("file_id", StringType(), True), 
])
rsmp_path = pathlib.Path("rsmp/csv")
rsmp_csv_files = [str(fn) for fn in rsmp_path.glob("data-*.csv")]

In [8]:
rsmp = session.read.options(
    header=True, dateFormat="dd.MM.yyyy", escape='"'
).schema(rsmp_schema).csv(rsmp_csv_files)
rsmp.printSchema()

                                                                                

root
 |-- kind: byte (nullable = true)
 |-- category: byte (nullable = true)
 |-- reestr_date: date (nullable = true)
 |-- data_date: date (nullable = true)
 |-- ind_tin: string (nullable = true)
 |-- ind_number: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- patronymic: string (nullable = true)
 |-- org_name: string (nullable = true)
 |-- org_short_name: string (nullable = true)
 |-- org_tin: string (nullable = true)
 |-- org_number: string (nullable = true)
 |-- region_code: byte (nullable = true)
 |-- region_name: string (nullable = true)
 |-- region_type: string (nullable = true)
 |-- district_name: string (nullable = true)
 |-- district_type: string (nullable = true)
 |-- city_name: string (nullable = true)
 |-- city_type: string (nullable = true)
 |-- settlement_name: string (nullable = true)
 |-- settlement_type: string (nullable = true)
 |-- activity_code_main: string (nullable = true)
 |-- activity_codes_additio

In [9]:
initial_count = rsmp.count()
initial_count

                                                                                

5485042

In [10]:
cols_to_check_for_duplicates = [
    "kind", "category", "tin", "reg_number",
    "first_name", "last_name", "patronymic",
    "org_name", "org_short_name",
    "region_name",
    "district_name", "city_name", "settlement_name",
    "activity_code_main"
]
cols_to_select = [
    "kind",
    "category",
    "tin",
    "reg_number",
    "first_name",
    "last_name",
    "patronymic",
    "org_name",
    "org_short_name",
    "region_code",
    "region_name",
    "region_type",
    "district_name",
    "district_type",
    "city_name",
    "city_type",
    "settlement_name",
    "settlement_type",
    "activity_code_main",
    "start_date",
    "end_date",
]
cols_to_uppercase = [
    "first_name", "last_name", "patronymic",
    "org_name", "org_short_name",
    "region_name", "region_type",
    "district_name", "district_type",
    "city_name", "city_type",
    "settlement_name", "settlement_type",
]
excluded_regions = [
    "Крым",
    "Севастополь",
    "Донецкая",
    "Луганская",
    "Запорожская",
    "Херсонская"
]
excluded_regions_condition = (
    "not ("
    + " or ".join(f"region_name ilike '%{region.upper()}%'" for region in excluded_regions)
    + ")"
)
w_for_row_number = (
    Window
    .partitionBy(cols_to_check_for_duplicates)
    .orderBy("data_date")
)
w_for_end_date = w_for_row_number.rowsBetween(0, Window.unboundedFollowing)
w_for_reg_number = (
    Window
    .partitionBy(["tin"])
    .orderBy("data_date")
    .rowsBetween(0, Window.unboundedFollowing)
)

rsmp_table = (
    rsmp
    .filter(excluded_regions_condition)
    .withColumns({
        colname: upper(colname)
        for colname in cols_to_uppercase
    })
    .withColumns({
        "ind_tin": lpad("ind_tin", 12, "0"),
        "org_tin": lpad("org_tin", 10, "0"),
    })
    .withColumns({
        "tin": coalesce("ind_tin", "org_tin"),
        "reg_number": coalesce("ind_number", "org_number"),
    })
    .withColumn("reg_number", first("reg_number", ignorenulls=True).over(w_for_reg_number))
    .withColumn("row_number", row_number().over(w_for_row_number))
    .withColumn("end_date", last("data_date").over(w_for_end_date))
    .filter("row_number = 1")
    .withColumnRenamed("data_date", "start_date")
    .select(*cols_to_select)
    .cache()
)

In [11]:
count_after = rsmp_table.count()
count_after

                                                                                

160847

In [12]:
rsmp_table.limit(10).toPandas()

Unnamed: 0,kind,category,tin,reg_number,first_name,last_name,patronymic,org_name,org_short_name,region_code,region_name,region_type,district_name,district_type,city_name,city_type,settlement_name,settlement_type,activity_code_main,start_date,end_date
0,0,0,6910020514,1116910001669.0,,,,"ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ""ЦЕНТ...","ООО ""ЦЮУ ""ПРИОРИТЕТ""",69,ТВЕРСКАЯ,ОБЛАСТЬ,,,КИМРЫ,ГОРОД,,,69.1,2020-11-10,2020-11-10
1,1,1,273080245,1100280033897.0,,,,"ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ""АВАН...","ООО ""АВАНТАЖ""",2,БАШКОРТОСТАН,РЕСПУБЛИКА,,,УФА,ГОРОД,,,69.1,2017-04-10,2023-07-10
2,1,1,276115295,1080276003026.0,,,,"ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ""АКЦЕНТ""","ООО ""АКЦЕНТ""",2,БАШКОРТОСТАН,РЕСПУБЛИКА,,,УФА,ГОРОД,,,69.1,2016-12-10,2023-07-10
3,1,1,276909812,1150280080884.0,,,,"ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ""ИМПЕ...","ООО ""ИМПЕРИЯ НЕДВИЖИМОСТИ""",2,БАШКОРТОСТАН,РЕСПУБЛИКА,,,УФА,ГОРОД,,,69.1,2016-08-10,2021-11-10
4,1,1,276918454,1160280132836.0,,,,ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ЮРИДИ...,"ООО ЮК ""БЕЛЫЙ ДОМ""",2,БАШКОРТОСТАН,РЕСПУБЛИКА,,,УФА,ГОРОД,,,69.1,2018-07-10,2023-07-10
5,1,1,276918454,1160280132836.0,,,,ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ЮРИДИ...,"ООО ЮКЦ ""АКТИС""",2,БАШКОРТОСТАН,РЕСПУБЛИКА,,,УФА,ГОРОД,,,69.1,2016-12-10,2018-06-10
6,1,1,277070791,1050204485781.0,,,,"ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ""ЦЕНТ...","ООО ""ЦЕНТР КАДРОВОГО КОНСАЛТИНГА ""ВАШИ КАДРЫ""",2,БАШКОРТОСТАН,РЕСПУБЛИКА,,,УФА,ГОРОД,,,69.1,2019-08-10,2023-07-10
7,1,1,277107233,1090280035867.0,,,,"ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ""ЦЕНТ...","ООО ""ЦЕНТР ЮРИДИЧЕСКОЙ ПОМОЩИ""",2,БАШКОРТОСТАН,РЕСПУБЛИКА,,,УФА,ГОРОД,,,69.1,2016-08-10,2020-08-10
8,1,1,277107233,1090280035867.0,,,,"ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ""ЦЕНТ...","ООО ""ЦЕНТР ЮРИДИЧЕСКОЙ ПОМОЩИ""",2,БАШКОРТОСТАН,РЕСПУБЛИКА,УФИМСКИЙ,РАЙОН,,,БУЛГАКОВО,СЕЛО,69.1,2020-09-10,2023-07-10
9,1,1,277126959,,,,,"ОБЩЕСТВО С ОГРАНИЧЕННОЙ ОТВЕТСТВЕННОСТЬЮ ""СКОР...","ООО ""СКОРАЯ ЮРИДИЧЕСКАЯ ПОМОЩЬ""",2,БАШКОРТОСТАН,РЕСПУБЛИКА,,,УФА,ГОРОД,,,69.1,2016-08-10,2019-04-10


In [13]:
rsmp_table.toPandas().to_csv(
    "rsmp/csv/data.csv", index=False, na_rep="NA", float_format="%.0f"
)

                                                                                

In [14]:
target_tins = rsmp_table.filter("kind == 1").select("tin")
target_tins.count()

                                                                                

79635

# Number of Employees

In [15]:
staff_schema = StructType([
    StructField("org_tin", StringType(), False),
    StructField("employees_count", IntegerType(), True),
    StructField("data_date", DateType(), True),
    StructField("doc_date", DateType(), True),
    StructField("file_id", StringType(), True),
])
staff_path = pathlib.Path("sshr/csv")
staff_csv_files = [str(fn) for fn in staff_path.glob("data-*.csv")]

staff = session.read.options(
    header=True, dateFormat="dd.MM.yyyy"
).schema(staff_schema).csv(staff_csv_files)
staff.printSchema()

root
 |-- org_tin: string (nullable = true)
 |-- employees_count: integer (nullable = true)
 |-- data_date: date (nullable = true)
 |-- doc_date: date (nullable = true)
 |-- file_id: string (nullable = true)



In [17]:
window = Window.partitionBy("tin", "data_date").orderBy(desc("doc_date"))

staff_table = (
    staff
    .withColumnRenamed("org_tin", "tin")
    .join(target_tins, on="tin", how="leftsemi")
    .withColumn("row_number", row_number().over(window))
    .filter("row_number = 1")
    .select("tin", year("data_date").alias("year"), "employees_count")
    .orderBy("tin", "year")
    .cache()
)

In [18]:
staff_table.count()

                                                                                

172875

In [19]:
staff_table.limit(10).toPandas()

Unnamed: 0,tin,year,employees_count
0,101013292,2021,1
1,104015040,2018,2
2,104015040,2019,2
3,104015040,2020,2
4,104015040,2021,2
5,104015040,2022,2
6,105006257,2018,24
7,105006257,2020,1
8,105036692,2018,2
9,105036692,2019,2


In [20]:
staff_table.toPandas().to_csv(
    "sshr/csv/data.csv", index=False, na_rep="NA", float_format="%.0f")

                                                                                

In [21]:
revexp_schema = StructType([
    StructField("org_tin", StringType(), False),
    StructField("revenue", FloatType(), True),
    StructField("expediture", FloatType(), True),
    StructField("data_date", DateType(), True),
    StructField("doc_date", DateType(), True),
    StructField("file_id", StringType(), True),
])
revexp_path = pathlib.Path("revexp/csv")
revexp_csv_files = [str(fn) for fn in revexp_path.glob("data-*.csv")]

revexp = session.read.options(
    header=True, dateFormat="dd.MM.yyyy").schema(revexp_schema).csv(revexp_csv_files)
revexp.printSchema()

root
 |-- org_tin: string (nullable = true)
 |-- revenue: float (nullable = true)
 |-- expediture: float (nullable = true)
 |-- data_date: date (nullable = true)
 |-- doc_date: date (nullable = true)
 |-- file_id: string (nullable = true)



In [22]:
window = Window.partitionBy("tin", "data_date").orderBy(desc("doc_date"))

revexp_table = (
    revexp
    .withColumnRenamed("org_tin", "tin")
    .join(target_tins, on="tin", how="leftsemi")
    .withColumn("row_number", row_number().over(window))
    .filter("row_number == 1")
    .select("tin", year("data_date").alias("year"), "revenue", "expediture")
    .orderBy("tin", "year")
    .cache()
)

In [23]:
revexp_table.count()

                                                                                

161560

In [24]:
revexp_table.limit(10).toPandas()

Unnamed: 0,tin,year,revenue,expediture
0,104015040,2018,864000.0,719000.0
1,104015040,2019,424000.0,657000.0
2,104015040,2020,0.0,0.0
3,104015040,2021,0.0,171000.0
4,104015040,2022,0.0,136000.0
5,105006257,2018,3560000.0,3215000.0
6,105006257,2019,1260000.0,1260000.0
7,105006257,2020,0.0,0.0
8,105036692,2018,276000.0,264000.0
9,105036692,2019,300000.0,271000.0


In [25]:
revexp_table.toPandas().to_csv(
    "revexp/csv/data.csv", index=False, na_rep="NA", float_format="%.0f")

                                                                                