In [1]:
!pip install kaggle
from google.colab import files
files.upload()



Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"keithdang1610","key":"c04fa7a1ec52edbffccbb57713ed3e55"}'}

In [2]:
import os
os.environ['KAGGLE_CONFIG_DIR'] = "/root/.kaggle"
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 /root/.kaggle/kaggle.json

In [3]:
!kaggle datasets download -d rzykov/uk-corporate-data-company-house-2023

Dataset URL: https://www.kaggle.com/datasets/rzykov/uk-corporate-data-company-house-2023
License(s): MIT
Downloading uk-corporate-data-company-house-2023.zip to /content
 99% 1.10G/1.11G [00:11<00:00, 99.5MB/s]
100% 1.11G/1.11G [00:11<00:00, 102MB/s] 


In [4]:
!ls

sample_data  uk-corporate-data-company-house-2023.zip


In [5]:
!unzip uk-corporate-data-company-house-2023.zip

Archive:  uk-corporate-data-company-house-2023.zip
  inflating: corporate_uk/companies.csv  
  inflating: corporate_uk/companies_sic_codes.csv  
  inflating: corporate_uk/filings.csv  
  inflating: corporate_uk/officers_and_owners.csv  


In [6]:
!pip install dask[complete] modin[all] ray

Collecting ray
  Downloading ray-2.39.0-cp310-cp310-manylinux2014_x86_64.whl.metadata (17 kB)
Collecting modin[all]
  Downloading modin-0.32.0-py3-none-any.whl.metadata (17 kB)
Collecting lz4>=4.3.2 (from dask[complete])
  Downloading lz4-4.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.7 kB)
Collecting distributed>=2.22.0 (from modin[all])
  Downloading distributed-2024.11.2-py3-none-any.whl.metadata (3.3 kB)
Collecting modin-spreadsheet>=0.1.0 (from modin[all])
  Downloading modin_spreadsheet-0.1.2-py2.py3-none-any.whl.metadata (587 bytes)
Collecting dataframe-api-compat>=0.2.7 (from modin[all])
  Downloading dataframe_api_compat-0.2.7-py3-none-any.whl.metadata (1.6 kB)
INFO: pip is looking at multiple versions of distributed to determine which version is compatible with other requirements. This could take a while.
Collecting distributed>=2.22.0 (from modin[all])
  Downloading distributed-2024.11.1-py3-none-any.whl.metadata (3.3 kB)
  Downloading distribut

In [7]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd


In [8]:
import time

# Pandas timing
start = time.time()
df_pandas = pd.read_csv('corporate_uk/companies.csv',delimiter=';')
end = time.time()
print(f"Pandas time: {end - start} seconds")

# Dask timing
start = time.time()
df_dask = dd.read_csv('corporate_uk/companies.csv',delimiter=';')
end = time.time()
print(f"Dask time: {end - start} seconds")

# Modin timing
start = time.time()
df_modin = mpd.read_csv('corporate_uk/companies.csv',delimiter=';')
end = time.time()
print(f"Modin time: {end - start} seconds")




Pandas time: 78.55128645896912 seconds
Dask time: 0.09124350547790527 seconds


2024-11-21 08:37:39,422	INFO worker.py:1819 -- Started a local Ray instance.


Modin time: 87.59892702102661 seconds


In [9]:
import ray

ray.init(ignore_reinit_error=True)

# Using Ray directly
import ray.data
# Ray timing
start = time.time()
df_ray = ray.data.read_csv('corporate_uk/companies.csv',delimiter=';')
end = time.time()
print(f"Ray time: {end - start} seconds")

2024-11-21 08:38:57,777	INFO worker.py:1652 -- Calling ray.init() again after it has already been called.


Ray time: 4.118912220001221 seconds


## Clean column names

In [10]:
df_pandas.columns = df_pandas.columns.str.replace('[^A-Za-z0-9_]', '', regex=True).str.strip()

# For Dask, Modin, and Ray (this step should work similarly with Dask/Modin/Ray DataFrames)
df_dask.columns = df_dask.columns.str.replace('[^A-Za-z0-9_]', '', regex=True).str.strip()
df_modin.columns = df_modin.columns.str.replace('[^A-Za-z0-9_]', '', regex=True).str.strip()

## Create utility file

In [19]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re

##############
# Utilities  #
##############

def read_config_file(filepath):
  # Reads a YAML configuration file and returns the parsed content.
    if not os.path.exists(filepath):
        logging.error(f"Config file not found: {filepath}")
        raise FileNotFoundError(f"Config file not found: {filepath}")
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(f"Error reading YAML file: {exc}")
            raise

def replacer(string, char):
  # Replaces consecutive occurrences of a character in a string with a single instance.
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

##################
# Data Validation #
##################

def col_header_val(df, table_config):
    logging.info("Starting column header validation.")

    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace(r'[^\w]', '_', regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x, '_'), list(df.columns)))

    expected_col = list(map(lambda x: x.lower(), table_config['columns']))
    expected_col.sort()

    df.columns = list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)

    if len(df.columns) == len(expected_col) and list(expected_col) == list(df.columns):
        logging.info("Column name and length validation passed.")
        print("Column name and column length validation passed.")
        return 1
    else:
        logging.warning("Column name and column length validation failed.")
        print("Column name and column length validation failed.")

        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        missing_YAML_file = list(set(expected_col).difference(df.columns))

        print("Following File columns are not in the YAML file:", mismatched_columns_file)
        print("Following YAML columns are not in the uploaded file:", missing_YAML_file)

        logging.error(f"File columns not in YAML: {mismatched_columns_file}")
        logging.error(f"YAML columns missing in file: {missing_YAML_file}")

        return 0

####################
# File Processing  #
####################

def read_large_file(filepath, sep=',', chunksize=10**6):
    logging.info(f"Reading file: {filepath}")
    try:
        chunk_list = []
        for chunk in pd.read_csv(filepath, sep=sep, chunksize=chunksize):
            chunk_list.append(chunk)
        logging.info(f"File read successfully: {filepath}")
        return pd.concat(chunk_list, axis=0)
    except Exception as e:
        logging.error(f"Error reading file {filepath}: {e}")
        raise

def write_file(df, filepath, sep='|', compression='gzip'):
    logging.info(f"Writing file to {filepath}")
    try:
        df.to_csv(filepath, sep=sep, index=False, compression=compression)
        logging.info(f"File written successfully: {filepath}")
    except Exception as e:
        logging.error(f"Error writing file {filepath}: {e}")
        raise

####################
# Summary Function #
####################

def generate_file_summary(df, filepath):
    total_rows, total_columns = df.shape
    file_size = os.path.getsize(filepath)

    summary = {
        "Total Rows": total_rows,
        "Total Columns": total_columns,
        "File Size (MB)": file_size / (1024 ** 2)
    }

    print(f"Summary: {summary}")
    logging.info(f"File Summary: {summary}")
    return summary


Writing utility.py


## Write yaml file

In [16]:
df_pandas.columns

Index(['company_number', 'company_type', 'office_address',
       'incorporation_date', 'jurisdiction', 'company_status', 'account_type',
       'company_name', 'sic_codes', 'date_of_cessation',
       'next_accounts_overdue', 'confirmation_statement_overdue', 'owners',
       'officers', 'average_number_employees_during_period', 'current_assets',
       'last_accounts_period_end', 'company_url'],
      dtype='object')

In [37]:
%%writefile file.yaml
file_type: csv
dataset_name: uk-corporate-data-company-house-2023
file_name: companies
table_name: companies_tab
inbound_delimiter: ";"
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - company_number
    - company_type
    - office_address
    - incorporation_date
    - jurisdiction
    - company_status
    - account_type
    - company_name
    - sic_codes
    - date_of_cessation
    - next_accounts_overdue
    - confirmation_statement_overdue
    - owners
    - places
    - average_number_employees_during_period
    - current_assets
    - last_accounts_period_end
    - city_name


Overwriting file.yaml


In [32]:
!ls

corporate_uk  __pycache__  schema.yaml				     utility.py
file.yaml     sample_data  uk-corporate-data-company-house-2023.zip


In [20]:
!pip install pyyaml



In [21]:
!pip install python-utils



In [38]:
import utility as util
config_data = util.read_config_file("file.yaml")
config_data

{'file_type': 'csv',
 'dataset_name': 'uk-corporate-data-company-house-2023',
 'file_name': 'companies',
 'table_name': 'companies_tab',
 'inbound_delimiter': ';',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['company_number',
  'company_type',
  'office_address',
  'incorporation_date',
  'jurisdiction',
  'company_status',
  'account_type',
  'company_name',
  'sic_codes',
  'date_of_cessation',
  'next_accounts_overdue',
  'confirmation_statement_overdue',
  'owners',
  'places',
  'average_number_employees_during_period',
  'current_assets',
  'last_accounts_period_end',
  'city_name']}

In [34]:
config_data['inbound_delimiter']

';'

In [35]:
#read file using config file
file_type = config_data['file_type']
source_file = "corporate_uk/" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file, delimiter=config_data['inbound_delimiter'])
df.head()



Unnamed: 0,company_number,company_type,office_address,incorporation_date,jurisdiction,company_status,account_type,company_name,sic_codes,date_of_cessation,next_accounts_overdue,confirmation_statement_overdue,owners,officers,average_number_employees_during_period,current_assets,last_accounts_period_end,company_url
0,13511884,Private limited company,"CM20 1YS, England, Harlow, Essex, C/O Aacsl Ac...",2021-07-15,England/Wales,Dissolved,Micro Entity,Nocik Health Ltd,"[86210, 86900]",2024-06-18,,,1.0,1.0,0.0,17602.0,2023-12-31,https://corpsignals.com/companies/uk/nocik-hea...
1,13511885,Private limited company,"RH2 7JN, England, Reigate, Surrey, Chart House...",2021-07-15,England/Wales,Active,Micro Entity,Jfs Group Limited,[64203],,True,,1.0,1.0,0.0,,2022-07-31,https://corpsignals.com/companies/uk/jfs-group...
2,13511887,Private limited company,"RM8 1PS, England, Dagenham, 31 Lamberhurst Road",2021-07-15,England/Wales,Active,Micro Entity,Samara Trading Ltd,"[49320, 49410, 51102]",,,,1.0,1.0,1.0,1307.0,2023-03-31,https://corpsignals.com/companies/uk/samara-tr...
3,13511889,Private limited company,"DA12 2RX, England, Gravesend, 11 Norfolk Road",2021-07-15,England/Wales,Active,Micro Entity,Quantum Brooks Logistics Ltd,"[49410, 50200, 51210, 52101]",,,,2.0,2.0,2.0,236.0,2023-07-31,https://corpsignals.com/companies/uk/quantum-b...
4,13511892,Private limited company,"UB10 0NX, England, Hillingdon, 1 Agincourt Vil...",2021-07-15,England/Wales,Active,Total Exemption Full,Pbkn Ltd,[56101],,,,1.0,1.0,5.0,37863.0,2023-07-31,https://corpsignals.com/companies/uk/pbkn-ltd-...


## validate

In [39]:
util.col_header_val(df,config_data)

ERROR:root:File columns not in YAML: ['officers', 'company_url']
ERROR:root:YAML columns missing in file: ['places', 'city_name']


Column name and column length validation failed.
Following File columns are not in the YAML file: ['officers', 'company_url']
Following YAML columns are not in the uploaded file: ['places', 'city_name']


0

In [40]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['company_number', 'company_type', 'office_address',
       'incorporation_date', 'jurisdiction', 'company_status', 'account_type',
       'company_name', 'sic_codes', 'date_of_cessation',
       'next_accounts_overdue', 'confirmation_statement_overdue', 'owners',
       'officers', 'average_number_employees_during_period', 'current_assets',
       'last_accounts_period_end', 'company_url'],
      dtype='object')
columns of YAML are: ['company_number', 'company_type', 'office_address', 'incorporation_date', 'jurisdiction', 'company_status', 'account_type', 'company_name', 'sic_codes', 'date_of_cessation', 'next_accounts_overdue', 'confirmation_statement_overdue', 'owners', 'places', 'average_number_employees_during_period', 'current_assets', 'last_accounts_period_end', 'city_name']


In [41]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")

ERROR:root:File columns not in YAML: ['officers', 'company_url']
ERROR:root:YAML columns missing in file: ['places', 'city_name']


Column name and column length validation failed.
Following File columns are not in the YAML file: ['officers', 'company_url']
Following YAML columns are not in the uploaded file: ['places', 'city_name']
validation failed


In [42]:
import csv
import gzip

In [44]:
from dask import dataframe as dd
df = dd.read_csv('corporate_uk/companies.csv',delimiter='\t')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('companies.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True)

['/content/companies.csv.gz/00.part',
 '/content/companies.csv.gz/01.part',
 '/content/companies.csv.gz/02.part',
 '/content/companies.csv.gz/03.part',
 '/content/companies.csv.gz/04.part',
 '/content/companies.csv.gz/05.part',
 '/content/companies.csv.gz/06.part',
 '/content/companies.csv.gz/07.part',
 '/content/companies.csv.gz/08.part',
 '/content/companies.csv.gz/09.part',
 '/content/companies.csv.gz/10.part',
 '/content/companies.csv.gz/11.part',
 '/content/companies.csv.gz/12.part',
 '/content/companies.csv.gz/13.part',
 '/content/companies.csv.gz/14.part',
 '/content/companies.csv.gz/15.part',
 '/content/companies.csv.gz/16.part',
 '/content/companies.csv.gz/17.part',
 '/content/companies.csv.gz/18.part',
 '/content/companies.csv.gz/19.part',
 '/content/companies.csv.gz/20.part',
 '/content/companies.csv.gz/21.part',
 '/content/companies.csv.gz/22.part']

In [45]:

# Get file summary
file_size = os.path.getsize('companies.csv.gz')
num_rows = len(df)
num_cols = len(df.columns)

# Print file summary
print("File summary:")
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_cols}")
print(f"File size: {file_size} bytes")

File summary:
Number of rows: 5428900
Number of columns: 1
File size: 4096 bytes


In [46]:
!ls

companies.csv.gz  file.yaml    sample_data  uk-corporate-data-company-house-2023.zip
corporate_uk	  __pycache__  schema.yaml  utility.py
