In [52]:
import sys

In [2]:
cd /Users/annabramslow/Documents/Company2Vec

/Users/annabramslow/Documents/Company2Vec


In [4]:
from dataclasses import dataclass, field
from pathlib import Path
from typing import List

import dask.dataframe as dd
import pandas as pd

#from ..decorators import save_parquet
#from ..ops import sort_partitions
#from ..serialize import DATA_ROOT
#from .base import FIELD_TYPE, TokenSource
#from src.decorators import save_parquet
#from src.ops import sort_partitions
#from base import FIELD_TYPE, TokenSource
#from data_processing.helpers import enrich_with_asof_values

In [5]:
def dd_enrich_with_asof_values(
    df: dd.DataFrame, 
    df_registrations: dd.DataFrame, 
    values=['Industry', 'CompanyType', 'Address', 'Status'], 
    date_col_df='FromDate', 
    date_col_registrations='FromDate'
) -> dd.DataFrame:
    """
    Adds the as-of values of df_registrations for the entries in df.
    Allows different date column names for both dataframes.
    """

    for value in values:
        # Load data by filtering df_registrations on ChangeType
        df_value = df_registrations.loc[df_registrations['ChangeType'] == value].reset_index(drop=True)

        # Special handling for 'Status' value: replace NaN dates with '2000-01-01'
        if value == 'Status':
            df_value[date_col_registrations] = df_value[date_col_registrations].fillna('2000-01-01')

        # Convert date columns to datetime if they are not already in datetime format
        df[date_col_df] = dd.to_datetime(df[date_col_df], errors='coerce')
        df_value[date_col_registrations] = dd.to_datetime(df_value[date_col_registrations], errors='coerce')

        # Select relevant columns and rename 'NewValue' to the current value
        df_value = df_value[['CVR', date_col_registrations, 'NewValue']].rename(columns={
            date_col_registrations: date_col_df,  # Align the date column names
            'NewValue': value  # Rename 'NewValue' to the specific value being processed
        })

        # Perform the asof merge using Dask's merge_asof function
        df = dd.merge_asof(
            df.sort_values(date_col_df),
            df_value.sort_values(date_col_df),
            on=date_col_df,
            by='CVR',
            direction='backward'
        )

    return df


In [6]:
def convert_currency( df: dd.DataFrame, lookup_table: dd.DataFrame, 
                     amount_cols=['amount'],  # List of columns to convert
                     currency_col='currency', 
                     date_col='PublicationDate',  # The datetime column in df
                     ) -> dd.DataFrame:
    """
    Convert multiple currency columns in the DataFrame based on a lookup table for rows where currency is not 'DKK'.
    
    Args:
        df (dd.DataFrame): The main dataframe with amounts to convert.
        lookup_table (dd.DataFrame): The lookup table containing conversion rates.
        amount_cols (list): List of column names in df containing amounts to be converted.
        currency_col (str): Column name in df containing currency information.
        date_col (str): Column name for datetime in df to extract year and month from.
    
    Returns:
        dd.DataFrame: DataFrame with currency conversion applied to the specified amount columns.
    """

    # 1. Filter rows where the currency is not 'DKK'
    non_dkk_df = df[df[currency_col] != 'DKK']
    dkk_df = df[df[currency_col] == 'DKK']  # Keep these rows unchanged

    # 2. Extract year and month from the PublicationDate column
    non_dkk_df['year'] = non_dkk_df[date_col].dt.year
    non_dkk_df['month'] = non_dkk_df[date_col].dt.month

    # 3. Perform a join to find the correct rate for each non-DKK row
    merged_df = dd.merge(
        non_dkk_df,
        lookup_table,
        left_on=[currency_col, 'year', 'month'],
        right_on=['from_currency', 'year', 'month'],
        how='left'
    )

    # 4. Apply currency conversion for each amount column
    for col in amount_cols:
        merged_df[col] = merged_df[col] * merged_df['rate']

    # 5. Drop unnecessary columns (from the lookup table like 'rate', 'from_currency')
    merged_df = merged_df.drop(columns=['rate', 'from_currency', 'year', 'month'])

    # 6. Concatenate the DKK and non-DKK dataframes
    final_df = dd.concat([dkk_df, merged_df])

    return final_df

In [7]:
DATA_ROOT = Path.home() / "Library" / "CloudStorage" / "Dropbox" / "DTU" / "Virk2Vec" / "Tables"
path_financials = DATA_ROOT / "Financials"
path_registrations = DATA_ROOT / "Registrations"
path_currency = DATA_ROOT / "Currency"

In [14]:
#load files from the path_financials folder

# Example: Get all .txt files in the folder
financials_csv = [file for file in path_financials.iterdir() if file.is_file() and file.suffix == '.csv']
registrations_csv = [file for file in path_registrations.iterdir() if file.is_file() and file.suffix == '.csv']
currency_csv = [file for file in path_currency.iterdir() if file.is_file() and file.suffix == '.csv']


In [9]:
columns_registrations = [
    "CVR",
    "FromDate",
    "ChangeType",
    "NewValue"
]

columns_annualreport = [
    "CVR",
    "PublicationDate",
    "ProfitLoss",
    "Equity", 
    "Assets", 
    "LiabilitiesAndEquity"
]

columns_currency = [
    "year",
    "month",
    "from_currency",
    "rate"
]

In [10]:
ddf_registrations = dd.read_csv(
    registrations_csv,
    usecols=columns_registrations,
    on_bad_lines="error",
    assume_missing=True,
    dtype={
        "CVR": int,
        "FromDate": str,
        "ChangeType": str,
        "NewValue": str
    },
    blocksize="256MB")

In [26]:
ddf_annualreport = dd.read_csv(
    financials_csv,
    usecols=columns_annualreport,
    on_bad_lines="error",
    assume_missing=True,
    dtype={
        "CVR": int,
        "PublicationDate": str,
        "Currency": str,
        "ProfitLoss": float,  # Deal with missing values
        "Equity": float,
        "Assets": float,
        "LiabilitiesAndEquity": float,
    },
    blocksize="256MB")

In [27]:
ddf_currency = dd.read_csv(
    currency_csv,
    usecols=columns_currency,
    on_bad_lines="error",
    assume_missing=True,
    dtype={
        "year": int,
        "month": int,
        "from_currency": str,
        "rate": float
    },
    blocksize="256MB")

In [16]:
ddf_currency.compute()

Unnamed: 0,year,month,from_currency,rate
0,2013,1,EUR,7.461371
1,2013,2,EUR,7.459788
2,2013,3,EUR,7.455241
3,2013,4,EUR,7.452721
4,2013,5,EUR,7.453578
...,...,...,...,...
787,2023,8,NOK,0.653141
788,2023,9,NOK,0.650722
789,2023,10,NOK,0.643006
790,2023,11,NOK,0.632077


In [17]:
#print columns
print(ddf_annualreport.columns)
print(ddf_registrations.columns)
print(ddf_currency.columns)

Index(['CVR', 'PublicationDate', 'ProfitLoss', 'Assets', 'Equity',
       'LiabilitiesAndEquity'],
      dtype='object')
Index(['CVR', 'FromDate', 'ChangeType', 'NewValue'], dtype='object')
Index(['year', 'month', 'from_currency', 'rate'], dtype='object')


In [18]:
#how many rows in dd_test
print(ddf_annualreport.shape[0].compute())
print(ddf_registrations.shape[0].compute())
print(ddf_currency.shape[0].compute())

2531733
4858471
792


In [None]:
# convert currency

In [19]:
#enrich the annual report with asof values from the registrations
ddf = dd_enrich_with_asof_values(
    ddf_annualreport, 
    ddf_registrations, 
    values=['Industry', 'CompanyType', 'Address', 'Status'], 
    date_col_df='PublicationDate', 
    date_col_registrations='FromDate'
    )

In [20]:
#show the first 5 rows of test
print(ddf.head())

  PublicationDate       CVR  ProfitLoss     Assets     Equity  \
0      2013-01-01  31476836    426447.0  1137021.0   614372.0   
1      2013-01-01  33160186   -138981.0   130682.0   -58981.0   
2      2013-01-01  32146651     40309.0   297813.0   137137.0   
3      2013-01-01  32827306    100208.0   847037.0    64665.0   
4      2013-01-01  27193838   -248638.0  1809089.0 -1480356.0   

   LiabilitiesAndEquity Industry CompanyType Address                 Status  
0             1137021.0   711290         APS    7500                 NORMAL  
1              130682.0   563000         APS    2100  UNDER TVANGSOPLØSNING  
2              297813.0   464100         APS    8700                 NORMAL  
3              847037.0   561010         APS    7400                 NORMAL  
4             1809089.0   476430         APS    2150                 NORMAL  


In [21]:
#length of test
print(len(ddf))

2531733


In [24]:
#print list of column names
columns = ddf.columns

In [23]:
output_columns = [
    "START_DATE",
    "CVR",
    "PROFIT_LOSS",
    "ASSETS",
    "EQUTY",
    "LIABILITIES_AND_EQUITY",
    "INDUSTRY",
    "COMPANY_TYPE",
    "MUNICIPALITY",
    "STATUS"
]

In [25]:
dict(zip(columns, output_columns))

{'PublicationDate': 'START_DATE',
 'CVR': 'CVR',
 'ProfitLoss': 'PROFIT_LOSS',
 'Assets': 'ASSETS',
 'Equity': 'EQUTY',
 'LiabilitiesAndEquity': 'LIABILITIES_AND_EQUITY',
 'Industry': 'INDUSTRY',
 'CompanyType': 'COMPANY_TYPE',
 'Address': 'MUNICIPALITY',
 'Status': 'STATUS'}

In [None]:
def parsed(self) -> dd.DataFrame:
    """Parses the CSV file, applies some basic filtering, then saves the result
    as compressed parquet file, as this is easier to parse than the CSV for the
    next steps"""

    columns_registrations = [
        "CVR",
        "FromDate",
        "ChangeType",
        "NewValue"
    ]

    columns_annualreport = [
        "CVR",
        "PublicationDate",
        "ProfitLoss",
        "Equity", 
        "Assets", 
        "LiabilitiesAndEquity"
    ]

    columns_currency = [
    "year",
    "month",
    "from_currency",
    "rate"
    ]

    output_columns = [
    "START_DATE",
    "CVR",
    "PROFIT_LOSS",
    "ASSETS",
    "EQUTY",
    "LIABILITIES_AND_EQUITY",
    "INDUSTRY",
    "COMPANY_TYPE",
    "MUNICIPALITY",
    "STATUS"
    ]
    
    # Update the path to the data
    path_financials = self.input_csv / "Financials"
    path_registrations = self.input_csv  / "Registrations"
    path_currency = self.input_csv / "Currency"
    
    # Load files
    financials_csv = [file for file in path_financials.iterdir() if file.is_file() and file.suffix == '.csv']
    registrations_csv = [file for file in path_registrations.iterdir() if file.is_file() and file.suffix == '.csv']
    currency_csv = [file for file in path_currency.iterdir() if file.is_file() and file.suffix == '.csv']
    
    # Load data
    ddf_registrations = dd.read_csv(
        registrations_csv,
        usecols=columns_registrations,
        on_bad_lines="error",
        assume_missing=True,
        dtype={
            "CVR": int,
            "FromDate": str,
            "ChangeType": str,
            "NewValue": str
        },
        blocksize="256MB"
    )
    
    ddf_annualreport = dd.read_csv(
        financials_csv,
        usecols=columns_annualreport,
        on_bad_lines="error",
        assume_missing=True,
        dtype={
            "CVR": int,
            "PublicationDate": str,
            "ProfitLoss": float,
            "Equity": float,
            "Assets": float,
            "LiabilitiesAndEquity": float,
        },
        blocksize="256MB"
        )
    
    ddf_currency = dd.read_csv(
        currency_csv,
        usecols=columns_currency,
        on_bad_lines="error",
        assume_missing=True,
        dtype={
            "year": int,
            "month": int,
            "from_currency": str,
            "rate": float
        },
        blocksize="256MB"
        )
    
    # enrich the annual report with asof values from the registrations
    ddf = dd_enrich_with_asof_values(
        ddf_annualreport, 
        ddf_registrations, 
        values=['Industry', 'CompanyType', 'Address', 'Status'], 
        date_col_df='PublicationDate', 
        date_col_registrations='FromDate'
        )
    
    # convert currency

    
    columns = ddf.columns

    #nået hertil ----------------------------------------------------------
    
    # Drop missing values and deal with datatypes
    ddf = (
        ddf.dropna(subset=["PERSON_ID", "C_ADIAG"])
        .assign(
            PERSON_ID=lambda x: x.PERSON_ID.astype(int),
            D_INDDTO=lambda x: dd.to_datetime(
                x.D_INDDTO,
                format="%d%b%Y:%X",
                errors="coerce",
            ),
        )
        .rename(columns=dict(zip(columns, output_columns)))
        .loc[lambda x: x.START_DATE >= self.earliest_start]
    )

    if self.downsample:
        ddf = self.downsample_persons(ddf)

    assert isinstance(ddf, dd.DataFrame)

    return ddf