In [23]:
import requests
from bs4 import BeautifulSoup as bs
import datetime
import polars as pl
import pandas as pd
import re


class CSVReader():
    def read_csv_data(self, filename: str) -> pl.DataFrame:
        # read the column names from the file
        with open(filename, 'r') as file:
            for _ in range(5):
                file.readline()
            column_names = file.readline().strip().split(';')

        # read the data into a polars dataframe
        df = pl.read_csv(filename, skip_rows=5, separator=';', skip_rows_after_header=2, encoding="ISO-8859-1", ignore_errors=True) #errors cause no actual issues

        return df


    def replace_unicode_chars(self, df: pl.DataFrame, col_name: str) -> pl.Series:
        # Get the column to be iterated
        col = df[col_name]
        regex = re.compile(r"\\u(\d+)\?")
        # Iterate through each row in the column
        for i, val in enumerate(col):
            # Check if the value contains a unicode character
            match = regex.search(val)
            if match:
                # Extract the unicode character code from the string
                hex_code = val.split("\\u")[1].split("?")[0]
                # Convert the hex code to an integer and then to its corresponding unicode character
                char = chr(int(hex_code))

                # Replace the original value with the new value containing the unicode character
                col[i] = val.replace(match.group(), char)

        return col


    def strip_strings(self, x: str) -> str:
        if isinstance(x, str):
            return x.strip()
        else:
            return x


    def rtf_data_processing(self, df: pl.DataFrame) -> pl.DataFrame:
        df = df.with_columns([
            pl.when(pl.col('Year(s) Weapon of Order').str.contains(r'\(.*\)'))
            .then(pl.lit("Yes"))
            .otherwise(pl.lit("No"))
            .alias("is estimated year order")
        ])

        df = df.with_columns([
            pl.col('Year(s) Weapon of Order').str.replace_all(r"[()]", "").alias('Year(s) Weapon of Order').cast(pl.Int64)
        ])

        return df


    def joined_table(self, df_rtf, csv_df):
        processed_dF = self.rtf_data_processing(df_rtf)
        # remove possible spaces for the designations
        csv_df = csv_df.with_columns(csv_df['Designation'].str.strip_chars())

        processed_dF = processed_dF.with_columns(self.replace_unicode_chars(df_rtf, "No. Designation"))
        processed_dF = processed_dF.with_columns(self.replace_unicode_chars(df_rtf, "No. Comments"))

        joined_dF = csv_df.join(processed_dF,
                            left_on=['Seller', 'Buyer',
                                        'Designation', 'Order date',
                                        ],
                            right_on=["Supplier", "Recipient",
                                        "No. Designation", 'Year(s) Weapon of Order',
                                        ],
                            how="left")
        joined_dF = joined_dF.select(
            ['Deal ID', 'Seller', 'Buyer', 'Designation', 'Description', 'Armament category', 'Order date',
            'Order date is estimate', 'Numbers delivered', 'Numbers delivered is estimate', 'Delivery year',
            'Delivery year is estimate', 'Status', 'SIPRI estimate', 'TIV deal unit', 'TIV delivery values',
            'Local production', 'No. Comments'])

        return joined_dF

In [24]:
def readFile(fileToRead):
    with open(fileToRead, 'r') as file:
        result = file.readlines()
        file.close()

        return result


# def read_file_binary(file_to_read):
#     with open(file_to_read, 'rb') as file:
#         contents = file.read().decode()
#     return contents.split("\r\n")


dateGatheredString = 'Information generated:\\b0  '
date = None
searchFor = 'Date'
country = None

rows = []

# TODO - This needs to find the file in the folder as a normal rtf with no special name
rtfLines = readFile('Trade-Register-1950-2021-downloaded.rtf')
for line in rtfLines:
    # At the start we only want to look for the date
    if searchFor == 'Date':
        if dateGatheredString in line:
            # Line looks like this: 'SIPRI Arms Transfers Database\par \b Information generated:\b0  10 March 2023\par \par }'
            date = line.split(dateGatheredString)[1].split("\\par")[0]

            # Now we have date we could look to find the headings of our table
            # But will assume that the headings are always in the same place and so will just hard code them below
            searchFor = 'Data'

    elif searchFor == 'Data':
        # We are looking for the data now and each line of data starts with a '{\b'
        # Use \\ as \ is an escape character so need to first escape it
        # Example: '{\b Albania}\par{\b R:} Burkina Faso\tab (12)\tab PM-43 120mm\tab mortar\tab (2011)\tab 2011\tab 12\tab Probably second-hand\par\pard\plain \s6\sb40\sl40\brdrt\brdrs'

        # There are another format which starts with \par{\b, it is a kind of continue from the previous line.
        # other formats basically the same, just keep the supplier read from the previous line
        # and skip line.split('}\\par{\\b R:} ')[1] this
        # Example: '\par{\b     } Iran\tab (413)\tab BMP-2\tab IFV\tab 1991\tab 1993-2001\tab (413)\tab 1500 ordered but probably only 413 delivered; 82 delivered direct, rest assembled in Iran; Iranian designation possibly BMT-2'
        if line[0:3] == r'{\b' or line[0:7] == r"\par{\b":
            if line[0:3] == '{\\b':
                supplier = line.split('}\\par')[0].split('{\\b ')[1]
                recipients = line.split('}\\par{\\b R:} ')[1].split('{\\b     } ')
            else:
                recipients = line.split('{\\b     } ')[1:]

            for recipient in recipients:
                # Two cases
                # 1. Recipient contains a country
                # 2. Recipient contains '\tab\tab' Which means to use the previous country
                if recipient[0:8] == '\\tab\\tab':
                    # Use the previous country
                    countryData = recipient.split('\\tab\\tab')[1].split('\\tab')
                    pass
                else:
                    country = recipient.split('\\tab')[0]
                    countryData = recipient.split('\\tab')[1:]
                row = [supplier, country, countryData[0], countryData[1], countryData[2], countryData[3], # type: ignore
                       countryData[4],
                       countryData[5], countryData[6].split('\\par')[0]]
                rows.append(
                    [element.strip() for element in row])


# Hard Coded as getting the actual value is a bit of a pain
df = pl.DataFrame(rows, schema=["Supplier", "Recipient", "Ordered", "No. Designation", "Weapon Description",

                                    "Year(s) Weapon of Order", "Year Delivery", "Of Delivered", "No. Comments"])
df.write_csv("processed_rtf.csv")
# TODO Set the type of the columns
# print(df)

# find if the unique values in 'col1' contain '-'
contains_dash = df["Year(s) Weapon of Order"].str.contains('-').unique()

print(f"contains dashes: {contains_dash}")
missing_count = df['No. Comments'].is_null().sum()
print(f"missing comments: {missing_count}")

reader = CSVReader()

csv_df = reader.read_csv_data("../csvReader/data.txt")
joinedDF = reader.joined_table(df, csv_df)
joinedDF.write_csv("joined_data_test.csv")

shape: (1,)
Series: 'Year(s) Weapon of Order' [bool]
[
	false
] dash == True
missing comments: 0


  df = pl.DataFrame(rows, schema=["Supplier", "Recipient", "Ordered", "No. Designation", "Weapon Description",
