In [1]:
import os
import re
import time
import sqlite3
import requests
import pandas as pd
from persiantools.jdatetime import JalaliDate
from IPython.display import display, HTML
import plotly.graph_objects as go

In [2]:
# Constants
DB_FILE = 'market_data.db'
EVENTS_FILE = "events.csv"

MARKET_URLS = {
    'bourse': 'https://api.tgju.org/v1/stocks/instrument/history-data/%D8%B4-%DA%A9%D9%84-%D8%A8%D9%88%D8%B1%D8%B3?order_dir=asc&market=index',
    'fara-bourse': 'https://api.tgju.org/v1/stocks/instrument/history-data/%D8%B4-%DA%A9%D9%84-%D9%81%D8%B1%D8%A7%D8%A8%D9%88%D8%B1%D8%B3?order_dir=asc&market=index',
    'Gold': 'https://api.tgju.org/v1/market/indicator/summary-table-data/geram18?lang=fa&order_dir=asc&convert_to_ad=1&_=',
    'Dollar': 'https://api.tgju.org/v1/market/indicator/summary-table-data/price_dollar_rl?lang=fa&order_dir=asc&convert_to_ad=1&_=',
    'Coin': 'https://api.tgju.org/v1/market/indicator/summary-table-data/sekee?lang=fa&order_dir=asc&convert_to_ad=1&_=',
    'Nim-Coin': 'https://api.tgju.org/v1/market/indicator/summary-table-data/nim?lang=fa&order_dir=asc&convert_to_ad=1&_=',
    'Coin-Gerami': 'https://api.tgju.org/v1/market/indicator/summary-table-data/gerami?lang=fa&order_dir=asc&convert_to_ad=1&_=',
    'Bitcoin': 'https://api.tgju.org/v1/market/indicator/summary-table-data/crypto-bitcoin?lang=fa&order_dir=asc&convert_to_ad=1&_=',
    'Rob-Coin': 'https://api.tgju.org/v1/market/indicator/summary-table-data/rob?lang=fa&order_dir=asc&convert_to_ad=1&_='
}

MARKET_NAMES = {
    "bourse": "بورس",
    "fara-bourse": "فرابورس",
    "Gold": "طلا",
    "Dollar": "دلار",
    "Coin": "سکه امامی",
    "Nim-Coin": "نیم سکه",
    "Coin-Gerami": "سکه گرمی",
    "Bitcoin": "بیت کوین",
    "Rob-Coin": "ربع سکه",
}

COLORS = {
    "بیت کوین": "violet",
    "طلا": "gold",
    "دلار": "green",
    "سکه امامی": "blue",
    "نیم سکه": "purple",
    "ربع سکه": "cyan",
    "سکه گرمی": "pink",
    "بورس": "lime",
    "فرابورس": "salmon"
}

In [3]:
# Utility function to clean raw values
def clean_value(value):
    if value == '-' or not value:
        return None

    # Extract value from "low" or "high" spans
    match = re.search(r'<span class="(?:low|high)" dir="ltr">([\d%,]+)<', value)
    if match:
        number = match.group(1).replace(',', '')
        return f'-{number}' if 'class="low"' in value else number

    # Handle "میلیون" values
    match = re.search(r'([\d.,]+)\s*<span class="currency-type">میلیون</span>', value)
    if match:
        number = float(match.group(1).replace(',', '').replace('.', '.'))
        return int(number * 1_000_000)

    return value.strip()

# Fetch raw data from a given URL
def fetch_data(url):
    response = requests.get(url)
    response.raise_for_status()  # Raise HTTP errors
    return response.json().get('data', [])

# Process data into a DataFrame based on market type
def process_market_data(market_name, url):
    raw_data = fetch_data(url)
    if not raw_data:
        print(f"No data available for {market_name}")
        return pd.DataFrame()

    # Handle specific market structures
    if market_name in ['bourse', 'fara-bourse']:
        columns = ['persian_date', 'closing', 'lowest', 'highest']
    else:
        columns = ['opening', 'lowest', 'highest', 'closing', 'change_amount',
                   'change_percentage', 'gregorian_date', 'persian_date']

    processed_data = [[clean_value(item) for item in row] for row in raw_data]
    df = pd.DataFrame(processed_data, columns=columns[:len(processed_data[0])])

    # Standardize and clean DataFrame
    if 'closing' in df.columns:
        df['closing'] = df['closing'].replace({',': ''}, regex=True).astype(float)
    if 'persian_date' in df.columns:
        df['year_month'] = df['persian_date'].str[:7]
    df['market_type'] = market_name
    return df[['closing', 'persian_date', 'year_month', 'market_type']].fillna('')

# Update the database with new data
def update_database(df, conn, table_name='market_data'):
    try:
        existing_data = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
    except Exception:
        existing_data = pd.DataFrame()

    if not existing_data.empty:
        new_data = df.merge(existing_data, how='outer', indicator=True).query('_merge == "left_only"').drop(columns='_merge')
    else:
        new_data = df

    if not new_data.empty:
        new_data.to_sql(table_name, conn, if_exists='append', index=False)
        print(f"Inserted {len(new_data)} new records.")
    else:
        print("No new records to insert.")


def convert_persian_to_jalali(persian_date):
    year, month, day = map(int, persian_date.split('/'))  # Split and convert to integers
    return JalaliDate(year, month, day)

def convert_persian_to_weekday(persian_date):
    return date.strftime('%A')


# Set custom height for the output cell
def set_custom_output_height(height=800):
    display(HTML(f'<style>.output {{ height: {height}px; overflow-y: scroll; }}</style>'))

# Generate hover text, marker colors, and sizes based on event matches
def generate_event_annotations(monthly_data, events_df):
    hover_texts, marker_colors, marker_sizes = [], [], []
    for date in monthly_data['year_month']:
        matching_events = events_df[events_df['Date'] == date]
        if not matching_events.empty:
            # Combine all matching event names into one hover text
            event_names = "<br>".join(matching_events['Event'].tolist())
            hover_texts.append(f"<b>{event_names}</b>")
            marker_colors.append('yellow')  # Highlight event points
            marker_sizes.append(2)  # Larger marker size for events
        else:
            hover_texts.append("")  # No event for this date
            marker_colors.append('rgba(0,0,0,0)')  # Transparent for non-events
            marker_sizes.append(0)  # Invisible marker size
    return hover_texts, marker_colors, marker_sizes

# Create a reusable function to generate the chart
def create_market_chart(monthly_data_dict, events_df, y_field, title, yaxis_title, colors, height=800):
    fig = go.Figure()

    # Iterate over each market and add traces
    for name, monthly_data in monthly_data_dict.items():
        hover_texts, marker_colors, marker_sizes = generate_event_annotations(monthly_data, events_df)

        fig.add_trace(go.Scatter(
            x=monthly_data['year_month'],
            y=monthly_data[y_field],
            mode='lines+markers',
            name=name,
            hoverinfo="text",  # Only show custom hover text
            text=hover_texts,  # Custom hover text
            marker=dict(
                color=marker_colors,  # Assign colors based on event occurrence
                size=marker_sizes,  # Show marker size only for events
                line=dict(width=1, color="darkred")  # Optional: outline markers
            ),
            line=dict(color=colors.get(name, "black"), width=2)  # Default to black if not in colors
        ))

    # Chart settings
    fig.update_layout(
        title=title,
        xaxis_title="Date (Month to Month)",
        yaxis_title=yaxis_title,
        xaxis=dict(
            type="category",
            tickangle=-90
        ),
        template='plotly_white',
        showlegend=True,
        height=height
    )

    fig.show()
    return fig

In [None]:
conn = sqlite3.connect(DB_FILE)

# Main Workflow
all_data_frames = []

for market_name, url in MARKET_URLS.items():
    market_df = process_market_data(market_name, url)
    all_data_frames.append(market_df)

# Combine all market data into a single DataFrame
final_df = pd.concat(all_data_frames, ignore_index=True)

# Update database
update_database(final_df, conn)

# Commit and close connection
conn.commit()

print("Database update completed.")

In [None]:
conn = sqlite3.connect(DB_FILE)

df = pd.read_sql_query("SELECT * FROM market_data order by year_month", conn)

In [None]:
df['persian_date_jalali'] = df['persian_date'].apply(convert_persian_to_jalali)
df['week_day'] = df['persian_date_jalali'].apply(lambda x: x.strftime('%A'))

all_data_df = df

df = df[df['persian_date_jalali'] > JalaliDate(1395, 1, 1)]

monthly_data_dict = {}

for market_type in df['market_type'].unique():
    market_data = df[df['market_type'] == market_type]

    # Calculate monthly average price and percentage change
    grouped = market_data.groupby('year_month').agg(
        avg_closing_price=('closing', 'mean')
    )
    grouped['price_change_in_percentage'] = (grouped['avg_closing_price'] / grouped['avg_closing_price'].iloc[0] - 1) * 100
    grouped['price_change'] = grouped['avg_closing_price'] / grouped['avg_closing_price'].iloc[0]

    # Add data to dictionary
    monthly_data_dict[MARKET_NAMES.get(str(market_type), 'Unknown')] = {
        'year_month': grouped.index.tolist(),
        'price_change_in_percentage': grouped['price_change_in_percentage'].tolist(),
        'price_change': grouped['price_change'].tolist()
    }


monthly_data_dict = {
    k: v for k, v in sorted(monthly_data_dict.items(), key=lambda item: item[1]['year_month'])
}

In [None]:
pd.DataFrame.from_dict(monthly_data_dict).T

In [None]:
# Load events
events_df = pd.read_csv(EVENTS_FILE)

In [None]:
# Set output cell height
set_custom_output_height(height=800)

# Generate charts
fig = create_market_chart(
    monthly_data_dict,
    events_df,
    y_field='price_change_in_percentage',
    title="Percentage Change by Market",
    yaxis_title="Percentage Change",
    colors=COLORS
)

fig.write_html('interactive_chart.html')

fig = create_market_chart(
    monthly_data_dict,
    events_df,
    y_field='price_change',
    title="Price Change by Market",
    yaxis_title="Price Change",
    colors=COLORS
)

In [None]:
# Function to get the start date of the Persian year for a given Persian date
def get_persian_year_start(persian_date):
    year, _, _ = map(int, persian_date.split('/'))
    return JalaliDate(year, 1, 1)  # 1st day of the Persian year

# Function to calculate the number of weeks since the beginning of the Persian year
def get_jalali_week_number(persian_date):
    # Convert the Persian date to JalaliDate object
    year, month, day = map(int, persian_date.split('/'))
    jalali_date = JalaliDate(year, month, day)

    # Get the start of the Persian year for this date
    persian_year_start = get_persian_year_start(persian_date)
    
    # Calculate the number of days between the given date and the start of the year
    days_diff = (jalali_date - persian_year_start).days

    # Calculate the week number (days divided by 7)
    return (days_diff // 7) + 1  # Adding 1 to start the count from 1

# Function to calculate sequential week numbers, handling week changes
def calculate_sequential_week_numbers(df, date_column):
    df = df.copy()  # Avoid modifying the original DataFrame

    # Calculate week numbers for each date
    df['jalali_week'] = df[date_column].apply(get_jalali_week_number)

    # Sequential week numbering
    sequential_week = 0
    last_week = None

    week_numbers = []
    for week in df['jalali_week']:
        if week != last_week:
            sequential_week += 1
        week_numbers.append(sequential_week)
        last_week = week

    df['week_num'] = week_numbers
    return df

df = calculate_sequential_week_numbers(df, 'persian_date')

df['weekly_rank'] = df.groupby('week_num')['closing'].rank(ascending=False, method='dense').astype(int)
weekly_rankings = df.groupby(['week_day', 'market_type'])['weekly_rank'].sum().reset_index().sort_values(by='weekly_rank')
weekly_rankings.to_csv('weekly_rankings.csv')

In [None]:
conn.close()