Imports

In [2]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType, FloatType, IntegerType, DateType, BooleanType
from pyspark.sql.functions import col, explode, when, abs, regexp_replace, struct, concat, lit

StatementMeta(, 4cdf2036-d8ba-44f4-be50-946360510d86, 3, Finished, Available, Finished)

Retrieve bronze data (vainu_data.json and vainu_data_financial.json) from the lakehouse

In [None]:
lakehouse_path_general = "your_lakehouse_path_here"  # Replace with your actual lakehouse path
lakehouse_path_fina = "your_lakehouse_path_here"  # Replace with your actual lakehouse path

df_raw_general = spark.read.text(lakehouse_path_general)
df_raw_fina = spark.read.text(lakehouse_path_fina)


# Convert DataFrame to JSON string
raw_json_general_str = "\n".join(df_raw_general.toPandas()["value"])
raw_json_fina_str = "\n".join(df_raw_fina.toPandas()["value"])

# Convert JSON string to Python dictionary
data_general = json.loads(raw_json_general_str)
data_fina = json.loads(raw_json_fina_str)

StatementMeta(, 4cdf2036-d8ba-44f4-be50-946360510d86, 4, Finished, Available, Finished)

Form the general dataframe

In [4]:
# Get relevant columns for general dataset

general_data = [{"company_name": c.get("company_name", ""),
                 "business_id": c.get("business_id", ""),
                 "foundation_date": c.get("foundation_date",""),
                 "description": c.get("description",""),
                 "staff_number": c.get("staff_number",""),
                 "staff_number_estimate": c.get("staff_number_estimate",""),
                 "career_page_job_count": c.get("career_page_job_count",""),
                 "city": c.get("city", ""),
                 "lat": c.get("lat", ""),
                 "lng": c.get("lng", ""),
                 "status": c.get("status", ""),
                 "lng": c.get("lng", ""),
                 "organization_size_indicators": c.get("organization_size_indicators","")} for c in data_general]

# Manually define schema

schema_general = StructType([
    StructField("company_name", StringType(), True),
    StructField("business_id", StringType(), True),
    StructField("foundation_date", StringType(), True),
    StructField("description", StringType(), True),
    StructField("staff_number", StringType(), True),
    StructField("staff_number_estimate", StringType(), True),
    StructField("career_page_job_count", StringType(), True),
    StructField("city", StringType(), True),
    StructField("lat", StringType(), True),
    StructField("lng", StringType(), True),
    StructField("status", StringType(), True),
    StructField(
        "organization_size_indicators",
        ArrayType(StructType([
            StructField("indicator_type", StringType(), True),
            StructField("size_classes", StringType(), True),
            StructField("value_range", StructType([
                StructField("lower_limit", StringType(), True),
                StructField("upper_limit", StringType(), True)
            ]), True)
        ])),
        True
    )
])


# Create dataframe
df_general = spark.createDataFrame(general_data, schema=schema_general)

# Explode JSON structured column
df_general = (
        df_general
        .withColumn("size", explode(col("organization_size_indicators")))
        )

# Making a separate dataframe to investigate limits for sizew classes
df_org = df_general.select(
    col("size.indicator_type").alias("size_indicator"),
    col("size.size_classes").alias("size_class"),
    col("size.value_range.lower_limit").alias("lower"),
    col("size.value_range.upper_limit").alias("upper")
)

# df_org.printSchema()

# Making a SQL query to define size classes
df_org.createOrReplaceTempView("companies")
sql = spark.sql(
    """SELECT DISTINCT size_class,
    lower,
    upper
    FROM companies
    WHERE size_indicator = "company_size_indicator_model"
    """
    )

sql.show()


# Selecting only relevant columns for the general dataframe
df_general = df_general.select(
    col("business_id"),
    col("company_name"),
    col("foundation_date"),
    col("description"),
    col("staff_number"),
    col("staff_number_estimate"),
    col("career_page_job_count"),
    col("city"),
    col("lat"),
    col("lng"),
    col("size.indicator_type").alias("size_indicator"),
    col("size.size_classes").alias("size_class")
)


# Filtering the dataframe to only include the company size indicator model based on employee count
df_general = df_general.filter(col("size_indicator") == "company_size_indicator_model")


# New select with changes
df_general = df_general.select(
    col("business_id"),
    col("company_name"),
    col("foundation_date"),
    col("description"),
    col("staff_number"),
    col("staff_number_estimate"),
    col("career_page_job_count"),
    col("city"),
    col("lat"),
    col("lng"),
    col("size_class").alias("company_size")
)


# Make a new column based on staff number information
df_general = df_general.withColumn(
    "staff_number_final",
    when(col("staff_number").isNotNull(), col("staff_number"))
    .otherwise(col("staff_number_estimate"))
)

# Select only staff number
df_general = df_general.select(
    col("business_id"),
    col("company_name"),
    col("foundation_date"),
    col("description"),
    col("staff_number_final").alias("staff_number"),
    col("career_page_job_count"),
    col("city"),
    col("lat"),
    col("lng"),
    col("company_size")
)

# Create a coordinates column
df_general = df_general.withColumn("coordinates", concat(lit("("), col("lat"), lit(", "), col("lng"), lit(")")))

# Define types
df_general = df_general.withColumn("foundation_date", col("foundation_date").cast(DateType())) \
                    .withColumn("staff_number", col("staff_number").cast(IntegerType())) \
                    .withColumn("career_page_job_count", col("career_page_job_count").cast(IntegerType()))

# Select relevant tables
df_general = df_general.select(
    col("business_id"),
    col("company_name"),
    col("foundation_date"),
    col("description"),
    col("staff_number"),
    col("career_page_job_count"),
    col("city"),
    col("coordinates"),
    col("company_size")
)



# Remove [] from company_size colum
df_general = df_general.withColumn("company_size", regexp_replace(col("company_size"), "[\\[\\]]", ""))

df_general.printSchema()
df_general.show()

# Save to lakehouse
df_general.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("vainu_general_silver")

StatementMeta(, 4cdf2036-d8ba-44f4-be50-946360510d86, 5, Finished, Available, Finished)

+------------+-----+-----+
|  size_class|lower|upper|
+------------+-----+-----+
|     [large]|  201| 1000|
|     [small]|   11|   50|
|     [micro]|    1|   10|
|    [medium]|   51|  200|
|[enterprise]| 1000| NULL|
+------------+-----+-----+

root
 |-- business_id: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- foundation_date: date (nullable = true)
 |-- description: string (nullable = true)
 |-- staff_number: integer (nullable = true)
 |-- career_page_job_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- company_size: string (nullable = true)

+-----------+------------------+---------------+--------------------+------------+---------------------+--------+--------------------+------------+
|business_id|      company_name|foundation_date|         description|staff_number|career_page_job_count|    city|         coordinates|company_size|
+-----------+------------------+---------------+---------

In [5]:
# df_general.createOrReplaceTempView("gene")
# sql = spark.sql(
#     """SELECT COUNT (business_id)
#     FROM gene
#     WHERE status != "active"
#     """
#     )

# sql.show()

StatementMeta(, 4cdf2036-d8ba-44f4-be50-946360510d86, 6, Finished, Available, Finished)

Form the financial dataframe

In [6]:
# Get relevant data for financial dataset

fina_data = [{"business_id": c.get("business_id", ""),
            "financial_statements": c.get("financial_statements")} for c in data_fina]


# Manually define the schema
schema_fina = StructType([
    StructField("business_id", StringType(), True),
    StructField("financial_statements", ArrayType(MapType(StringType(), StringType())), True)
])

# Create dataframe
df_fina = spark.createDataFrame(fina_data, schema=schema_fina)

# df_fina.show()

df_fina = (
    df_fina
    .withColumn("fina", explode(col("financial_statements")))
    )

df_fina = df_fina.select(
    col("business_id"),
    col("fina.year").alias("year"),
    col("fina.turn_over_eur").alias("turn_over_eur"),
    col("fina.profit").alias("profit"),
    col("fina.employee_salary_local").alias("employee_salary_local"),
    col("fina.net_income_local").alias("net_income_local")
)


df_fina = df_fina.withColumn("year", col("year").cast(IntegerType())) \
                .withColumn("turn_over_eur", col("turn_over_eur").cast(IntegerType())) \
                .withColumn("profit", col("profit").cast(FloatType())) \
                .withColumn("employee_salary_local", col("employee_salary_local").cast(IntegerType())) \
                .withColumn("net_income_local", col("net_income_local").cast(IntegerType()))
                       



df_fina = df_fina.select(
    col("business_id"),
    col("year"),
    col("turn_over_eur").alias("turn_over"),
    col("profit").alias("profit_margin"),
    col("employee_salary_local").alias("employee_salary"),
    col("net_income_local").alias("net_income")
)


# Filter out rows where turnover AND profit are NULL values
df_fina = df_fina.filter((col("turn_over").isNotNull()) | (col("profit_margin").isNotNull()))

# Change percentage values to decimal values
df_fina = df_fina.withColumn("profit_margin_deci", col("profit_margin") / 100).withColumn("profit_margin_deci", col("profit_margin_deci").cast(FloatType()))

# Taking absolute value from employee salaries and turnover
df_fina = df_fina.withColumn("employee_salary", abs(col("employee_salary")))
df_fina = df_fina.withColumn("turn_over", abs(col("turn_over")))

# Show result
df_fina.printSchema()
df_fina.show(n = 200,truncate=False)

# Save to lakehouse
df_fina.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("vainu_financial_silver")

StatementMeta(, 4cdf2036-d8ba-44f4-be50-946360510d86, 7, Finished, Available, Finished)

root
 |-- business_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- turn_over: integer (nullable = true)
 |-- profit_margin: float (nullable = true)
 |-- employee_salary: integer (nullable = true)
 |-- net_income: integer (nullable = true)
 |-- profit_margin_deci: float (nullable = true)

+-----------+----+----------+-------------+---------------+----------+------------------+
|business_id|year|turn_over |profit_margin|employee_salary|net_income|profit_margin_deci|
+-----------+----+----------+-------------+---------------+----------+------------------+
|27323889   |2023|4618500   |18.67        |1618582        |2762511   |0.1867            |
|27323889   |2022|1627000   |63.33        |987258         |3687381   |0.6333            |
|27323889   |2021|1630144   |61.44        |832948         |3755521   |0.61439997        |
|27323889   |2020|1006848   |56.84        |626166         |2625158   |0.5684            |
|27323889   |2019|1111440   |-28.05       |630517         

test filtering for fina

In [7]:
df_fina.createOrReplaceTempView("fina")
sql = spark.sql(
    """SELECT COUNT (business_id)
    FROM fina
    """
    )

sql.show()

StatementMeta(, 4cdf2036-d8ba-44f4-be50-946360510d86, 8, Finished, Available, Finished)

+------------------+
|count(business_id)|
+------------------+
|             71412|
+------------------+



Form the industry dataframe

In [8]:
# Get relevant data for industry dataset

industry_data = [{"business_id": c.get("business_id", ""),
                 "official_industries": c.get("official_industries"),
                 "vainu_custom_industry": c.get("vainu_custom_industry")} for c in data_general]


# Manually define the schema
schema_industry = StructType([
    StructField("business_id", StringType(), True),
    StructField("official_industries", ArrayType(MapType(StringType(), StringType())), True),
    StructField("vainu_custom_industry", ArrayType(MapType(StringType(), StringType())), True)
])

df_industry = spark.createDataFrame(industry_data, schema=schema_industry)

df_industry = (
    df_industry
    .withColumn("off_ind", explode(col("official_industries")))
    .withColumn("vainu_ind", explode(col("vainu_custom_industry")))
)

df_industry = df_industry.select(
    col("business_id"),
    col("off_ind.code_2_level_descriptions").alias("industry"),
    col("off_ind.code_2_level").alias("industry_code_2"),
    col("off_ind.code_1_level").alias("industry_code_1"),
    col("vainu_ind.confidence_class").alias("vainu_class"),
    col("vainu_ind.value").alias("vainu_industry"),
    col("vainu_ind.confidence_score").alias("vainu_score"),
    col("vainu_ind.confidence_thresholds").alias("vainu_threshold"),
)

df_industry = df_industry.filter(col("vainu_class") == "high")

df_industry = df_industry.select(
    col("business_id"),
    col("industry").alias("industry_official"),
    col("industry_code_2"),
    col("industry_code_1"),
    col("vainu_industry")
)


df_industry = df_industry.withColumn("industry_code_2", col("industry_code_2").cast(IntegerType()))

df_industry = df_industry.withColumn("industry_official", regexp_replace(col("industry_official"), r"^\{en=|\}$", ""))

df_industry.printSchema()
# Show result
df_industry.show(n = 50,truncate=False)


# Save to lakehouse
df_industry.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("vainu_industry_silver")


StatementMeta(, 4cdf2036-d8ba-44f4-be50-946360510d86, 9, Finished, Available, Finished)

root
 |-- business_id: string (nullable = true)
 |-- industry_official: string (nullable = true)
 |-- industry_code_2: integer (nullable = true)
 |-- industry_code_1: string (nullable = true)
 |-- vainu_industry: string (nullable = true)

+-----------+---------------------------------------------------------------------------+---------------+---------------+----------------------------------+
|business_id|industry_official                                                          |industry_code_2|industry_code_1|vainu_industry                    |
+-----------+---------------------------------------------------------------------------+---------------+---------------+----------------------------------+
|27323889   |Wholesale trade, except of motor vehicles and motorcycles                  |46             |G              |B2C                               |
|27323889   |Wholesale trade, except of motor vehicles and motorcycles                  |46             |G              |B2B         

sql querying

In [9]:

# Check categories
# df_industry.createOrReplaceTempView("industrytable")
# sql = spark.sql(
#     """SELECT DISTINCT industry
#     FROM industrytable
#     """
#     )

# sql.show(truncate=False)

StatementMeta(, 4cdf2036-d8ba-44f4-be50-946360510d86, 10, Finished, Available, Finished)