In [8]:
import requests
import pandas as pd
import json
import re
import polars as pl
import Enum_data as ed
pl.Config.set_tbl_rows(1000)
from persiantools import characters, digits
import sqlite3

import general_functions as gf

In [9]:
def get_table(url: str, table: int):
    headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:143.0) Gecko/20100101 Firefox/143.0',
    'Accept': 'application/json, text/plain, */*',
    'Accept-Language': 'en-US,en;q=0.5',
    'Accept-Encoding': 'gzip, deflate, br, zstd',
    'Origin': 'https://www.codal.ir',
    'Connection': 'keep-alive',
    'Referer': 'https://www.codal.ir/',
    'Sec-Fetch-Dest': 'empty',
    'Sec-Fetch-Mode': 'cors',
    'Sec-Fetch-Site': 'same-site',
    'Cookie': 'TS018fb0f7=01f9930bd2e2675d04882f623c888052df60031e7775d8e3c459dc4cb96bf8e870e0a9bf0e0eedff68bcce23fd9701c7a7fc0855c4; Unknown=1076170924.20480.0000'
    }
    response = requests.request("GET", url)
    statement = response.text
    pattern = r"var datasource = (.*?});"
    match = re.search(pattern, statement)
    if match:
        text = match.group(1)
    records = []
    records.append(
        (statement, text))
    for _, data in records:
        continue
    items = json.loads(data)['sheets']

    if isinstance(table, list):
        cells = []
        for t in table:
            raw_cells = items[0]['tables'][t]['cells']
            cells.append([(i['columnSequence'], i['rowSequence'], i['value'], i['periodEndToDate']) for i in raw_cells])
        return [x for xs in cells for x in xs]
    
    cells = items[0]['tables'][table]['cells']
    return [(i['columnSequence'], i['rowSequence'], i['value'], i['periodEndToDate']) for i in cells]



In [None]:
def create_dict_dataframes(url: str, date: int, report_type: str) -> dict:
    all_data ={'report_this_year': pl.DataFrame(),
           'report_last_year': pl.DataFrame()}

    cells_tuples = get_table(url, ed.tabels[report_type].value)
    dates = sorted(list(set([i[-1] for i in cells_tuples if i[-1] != ''])))
    if len(dates) > 2:
        dates = dates[1:]
    for date_ in dates:
        filtered_cells = [(i[0], i[1], i[2]) for i in cells_tuples if i[-1] == '' or i[-1] == (date_)]
        df = pl.from_records(filtered_cells, schema=["col", "row", "value"], orient="row")
        df = df.pivot(values="value", on="col", index="row").sort("row")
        df = df.with_columns(pl.col('1').map_elements(characters.ar_to_fa, return_dtype=pl.String))
        for i, col in enumerate(df.columns):
            cols = ed.cols[report_type].value
            value = df[col][0]        # value from first row
            if(isinstance(value, str)):
                for val in value.split():
                    if'/' in val:
                        num = int(val.replace('/', ''))
                        if num == date :
                            all_data['report_this_year'] = df[df.columns[:3]]
                        else:
                            if len(df.columns) > 5:
                                df = df.drop(df.columns[2])
                            all_data['report_last_year'] = df[df.columns[:3]]
    for key, value in all_data.items():
        if(value.is_empty()):
            all_data[key] = None
    return all_data


In [65]:
def create_Incoeme_Statment_dataframe(symbol: str, url: str, date: int, period: int, publish: int) -> dict:
    all_data = create_dict_dataframes(url, date, 'Incoeme_Statment')
    col_name = [" Sales",
                "Cost of Goods Sold (COGS)",
                "Gross Profit (Loss)",
                "General and Administrative Expenses (G&A)",
                "Net Other Operating Income (Expenses)",
                "Operating Profit (Loss)",
                "Finance Costs ",
                "Net Other Non-Operating Income (Expenses)",
                "Profit (Loss) from Continuing Operations Before Tax",
                "Income Tax Expense",
                "Net Profit (Loss) from Continuing Operations",
                "Profit (Loss) from Discontinued Operations, Net of Tax",
                "Net Profit (Loss)",
                "Earnings per Share (EPS) - After Tax"
                ]
    for key, data in all_data.items():
        data = data.rename({c: str(i) for i, c in enumerate(data.columns)})
        data = data.rename({'0': 'row'})
        table_date = [d for d in str(data[data.columns[2]][0]).split() if '/' in d][0]
        data = pl.concat([data[0], data[3:]])
        data = data.with_columns(pl.int_range(0, pl.len()).alias("row"))
        data = data.filter(pl.col("2") != "")
        data = create_standard_df(data)
        data = data.transpose(include_header=True)
        data = data.drop(['column','column_0'])
        data = data.rename(dict(zip(data.columns, col_name)))
        data = data.rename(dict(zip(data.columns, col_name)))
        data = data.insert_column(0, pl.lit(symbol).alias("Symbol"))
        data = data.insert_column(1, pl.lit(int(digits.fa_to_en(table_date).replace('/', ''))).alias("Date"))
        data = data.insert_column(2, pl.lit(period).alias("Period"))
        data = data.insert_column(3, pl.lit(publish).alias("Publish"))
        data = data[-1]
        all_data[key] = data
    return all_data    
def merge_rows_with_sum(frame: pl.DataFrame, first : int , count: int , name : str)  ->pl.DataFrame:
    last = first + count
    top = frame.filter(pl.col("row") < first)
    bottom = frame.filter(pl.col("row")> last)
    result = frame.filter(pl.col("row").is_in(list(range(first,last+1)))).select(pl.col("2").cast(pl.Int64, strict=False).sum()).item()
    middle = pl.DataFrame({"row": [None],"1": [name],"2": [str(result)]}) 
    df_final = pl.concat([top, middle, bottom], how="vertical").with_columns(pl.int_range(0, pl.len()).alias("row"))
    return df_final

def create_standard_df(data: pl.DataFrame) -> pl.DataFrame:
    data = merge_rows_with_sum(data, 5 , 2 , 'خالص سایر درامدها (هزینه ها) ی عملیاتی')
    data = merge_rows_with_sum(data, 10 , 1 , 'مالیات')
    data = data.filter(~pl.col("row").is_in(list(range(13,18)))).with_columns(pl.int_range(0, pl.len()).alias("row"))[:-1]
    return data



In [66]:
create_Incoeme_Statment_dataframe(symbol, url, date, period, publish)

{'report_this_year': shape: (1, 19)
 ┌────────┬──────────┬────────┬──────────┬───┬──────────────┬─────────────┬─────────────┬───────────┐
 │ Symbol ┆ Date     ┆ Period ┆ Publish  ┆ … ┆ Profit       ┆ Net Profit  ┆ Earnings    ┆ column_15 │
 │ ---    ┆ ---      ┆ ---    ┆ ---      ┆   ┆ (Loss) from  ┆ (Loss)      ┆ per Share   ┆ ---       │
 │ str    ┆ i32      ┆ i32    ┆ i32      ┆   ┆ Discontinue… ┆ ---         ┆ (EPS) -     ┆ str       │
 │        ┆          ┆        ┆          ┆   ┆ ---          ┆ str         ┆ Aft…        ┆           │
 │        ┆          ┆        ┆          ┆   ┆ str          ┆             ┆ ---         ┆           │
 │        ┆          ┆        ┆          ┆   ┆              ┆             ┆ str         ┆           │
 ╞════════╪══════════╪════════╪══════════╪═══╪══════════════╪═════════════╪═════════════╪═══════════╡
 │ فخوز   ┆ 14030930 ┆ 9      ┆ 14031030 ┆ … ┆ 0            ┆ 184         ┆ 183         ┆ 190000000 │
 └────────┴──────────┴────────┴──────────┴───┴

In [22]:
#---------------------------------------
symbol = "فخوز"
financial_year = 14031230
report_type = ed.names.Incoeme_Statment.value
#----------------------------------------

sheet_num = ed.sheets[report_type].value
table = ed.tabels[report_type].value
parse_date = gf.parse_date_persian(financial_year)
reports = gf.get_results(symbol, parse_date,report_type , sheet_num)
#reports
date = 14030930
period = int(reports[date]['period'])
publish = int(reports[date]['publish'])
url = reports[date]['url']


In [28]:
print(create_Incoeme_Statment_dataframe(symbol, url, date, period, publish))

None
