Skip to content

Commit

Permalink
Merge pull request #1 from mautrix/master
Browse files Browse the repository at this point in the history
Update to upstream
  • Loading branch information
bdvllrs committed Mar 5, 2023
2 parents 029303c + 1e760d3 commit 7deb7e1
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 35 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/python-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: "3.10"
python-version: "3.11"
- uses: isort/isort-action@master
with:
sortPaths: "./maufbapi ./mautrix_facebook"
- uses: psf/black@stable
with:
src: "./maufbapi ./mautrix_facebook"
version: "22.3.0"
version: "23.1.0"
- name: pre-commit
run: |
pip install pre-commit
Expand Down
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.1.0
rev: v4.4.0
hooks:
- id: trailing-whitespace
exclude_types: [markdown]
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/psf/black
rev: 22.3.0
rev: 23.1.0
hooks:
- id: black
language_version: python3
files: ^(maufbapi|mautrix_facebook)/.*\.pyi?$
- repo: https://github.com/PyCQA/isort
rev: 5.10.1
rev: 5.12.0
hooks:
- id: isort
files: ^(maufbapi|mautrix_facebook)/.*\.pyi?$
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pre-commit>=2.10.1,<3
isort>=5.10.1,<6
black>=22.3,<23
black>=23,<24
14 changes: 7 additions & 7 deletions maufbapi/mqtt/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
from yarl import URL
import paho.mqtt.client as pmc

from mautrix.util import background_task
from mautrix.util.logging import TraceLogger

from ..proxy import ProxyHandler
from ..state import AndroidState
from ..thrift import ThriftObject
from ..types import (
MarkReadRequest,
MessageSyncError,
MessageSyncPayload,
NTContext,
PHPOverride,
Expand Down Expand Up @@ -282,7 +282,7 @@ def _on_connect_handler(
self.connection_unauthorized_callback()
return

asyncio.create_task(self._post_connect())
background_task.create(self._post_connect())

def _on_disconnect_handler(self, client: MQTToTClient, _: Any, rc: int) -> None:
err_str = "Generic error." if rc == pmc.MQTT_ERR_NOMEM else pmc.error_string(rc)
Expand Down Expand Up @@ -354,7 +354,7 @@ def _sync_resume_queue_data(self) -> ResumeQueueRequest:

async def _post_connect(self) -> None:
self._opened_thread = None
self.log.debug("Re-creating sync queue after reconnect")
self.log.debug(f"Re-creating sync queue after reconnect (seq_id={self.seq_id})")
await self._dispatch(Connect())
await self.publish(
"/ls_req",
Expand Down Expand Up @@ -399,7 +399,7 @@ def _on_message_sync(self, payload: bytes) -> None:
return
self._update_seq_id(parsed)
if parsed.error:
asyncio.create_task(self._dispatch(parsed.error))
background_task.create(self._dispatch(parsed.error))
for item in parsed.items:
for event in item.get_parts():
self._outgoing_events.put_nowait(event)
Expand All @@ -412,12 +412,12 @@ def _on_typing_notification(self, payload: bytes) -> None:
except Exception:
self.log.debug("Failed to parse typing notification %s", payload, exc_info=True)
return
asyncio.create_task(self._dispatch(parsed))
background_task.create(self._dispatch(parsed))

def _on_presence(self, payload: bytes) -> None:
try:
presence = Presence.deserialize(json.loads(payload))
asyncio.create_task(self._dispatch(presence))
background_task.create(self._dispatch(presence))
except Exception:
self.log.debug("Failed to parse presence payload %s", payload, exc_info=True)
return
Expand Down Expand Up @@ -552,7 +552,7 @@ async def listen(self, seq_id: int, retry_limit: int = 5) -> None:
elif rc == pmc.MQTT_ERR_NO_CONN:
if connection_retries > retry_limit:
raise MQTTNotConnected(f"Connection failed {connection_retries} times")
if self.proxy_handler.update_proxy_url():
if self.proxy_handler.update_proxy_url("MQTT_ERR_NO_CONN"):
self.setup_proxy()
await self._dispatch(ProxyUpdate())
sleep = connection_retries * 2
Expand Down
14 changes: 9 additions & 5 deletions maufbapi/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import logging
import urllib.request

from yarl import URL


class ProxyHandler:
current_proxy_url: str | None = None
Expand All @@ -12,11 +14,13 @@ class ProxyHandler:
def __init__(self, api_url: str | None) -> None:
self.api_url = api_url

def get_proxy_url_from_api(self) -> str | None:
def get_proxy_url_from_api(self, reason: str | None = None) -> str | None:
assert self.api_url is not None

request = urllib.request.Request(self.api_url, method="GET")
self.log.debug("Requesting proxy from: %s", self.api_url)
api_url = str(URL(self.api_url).update_query({"reason": reason} if reason else {}))

request = urllib.request.Request(api_url, method="GET")
self.log.debug("Requesting proxy from: %s", api_url)

try:
with urllib.request.urlopen(request) as f:
Expand All @@ -28,12 +32,12 @@ def get_proxy_url_from_api(self) -> str | None:

return None

def update_proxy_url(self) -> bool:
def update_proxy_url(self, reason: str | None = None) -> bool:
old_proxy = self.current_proxy_url
new_proxy = None

if self.api_url is not None:
new_proxy = self.get_proxy_url_from_api()
new_proxy = self.get_proxy_url_from_api(reason)
else:
new_proxy = urllib.request.getproxies().get("http")

Expand Down
2 changes: 2 additions & 0 deletions maufbapi/types/graphql/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ class Thread(SerializableAttrs):
can_viewer_reply: bool
can_participants_claim_admin: bool

sync_sequence_id: Optional[str] = None

@property
def updated_timestamp(self) -> int:
return int(self.updated_time_precise)
Expand Down
1 change: 1 addition & 0 deletions mautrix_facebook/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def do_update(self, helper: ConfigUpdateHelper) -> None:
copy("bridge.max_startup_thread_sync_count")
copy("bridge.temporary_disconnect_notices")
copy("bridge.disable_bridge_notices")
copy("bridge.bridge_matrix_notices")
if "bridge.refresh_on_reconnection_fail" in self:
base["bridge.on_reconnection_fail.action"] = (
"refresh" if self["bridge.refresh_on_reconnection_fail"] else None
Expand Down
2 changes: 2 additions & 0 deletions mautrix_facebook/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ bridge:
temporary_disconnect_notices: false
# Disable bridge notices entirely
disable_bridge_notices: false
# Should Matrix m.notice-type messages be bridged to Facebook?
bridge_matrix_notices: true
on_reconnection_fail:
# What to do if a reconnection attempt fails? Options: reconnect, refresh, null
action: reconnect
Expand Down
14 changes: 10 additions & 4 deletions mautrix_facebook/portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
UserID,
VideoInfo,
)
from mautrix.util import ffmpeg, magic, variation_selector
from mautrix.util import background_task, ffmpeg, magic, variation_selector
from mautrix.util.formatter import parse_html
from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus

Expand Down Expand Up @@ -1168,7 +1168,7 @@ async def _send_bridge_success(
event_type=event_type,
message_type=msgtype,
)
asyncio.create_task(self._send_message_status(event_id, err=None))
background_task.create(self._send_message_status(event_id, err=None))
await self._send_delivery_receipt(event_id)

async def _send_bridge_error(
Expand Down Expand Up @@ -1201,7 +1201,7 @@ async def _send_bridge_error(
body=f"\u26a0 Your {event_type_str} may not have been bridged: {str(err)}",
),
)
asyncio.create_task(self._send_message_status(event_id, err))
background_task.create(self._send_message_status(event_id, err))

async def _send_message_status(self, event_id: EventID, err: Exception | None) -> None:
if not self.config["bridge.message_status_events"]:
Expand Down Expand Up @@ -1290,8 +1290,14 @@ async def _make_dbm(self, sender: u.User, event_id: EventID) -> DBMessage:
async def _handle_matrix_text(
self, event_id: EventID, sender: u.User, message: TextMessageEventContent
) -> None:
if (
message.msgtype == MessageType.NOTICE
and not self.config["bridge.bridge_matrix_notices"]
):
return
converted = await matrix_to_facebook(message, self.mxid, self.log)
dbm = await self._make_dbm(sender, event_id)

resp = await sender.mqtt.send_message(
self.fbid,
self.fb_type != ThreadType.USER,
Expand Down Expand Up @@ -1592,7 +1598,7 @@ async def _handle_facebook_message(
timestamp = message.timestamp

def backfill_reactions(dbm: DBMessage | None):
asyncio.create_task(
background_task.create(
self._try_handle_graphql_reactions(
source, dbm or msg_id, message.message_reactions
)
Expand Down
34 changes: 24 additions & 10 deletions mautrix_facebook/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@

from maufbapi import AndroidAPI, AndroidMQTT, AndroidState, ProxyHandler
from maufbapi.http import InvalidAccessToken, ResponseError
from maufbapi.http.errors import GraphQLError, RateLimitExceeded
from maufbapi.mqtt import Connect, Disconnect, MQTTNotConnected, MQTTNotLoggedIn, ProxyUpdate
from maufbapi.types import graphql, mqtt as mqtt_t
from maufbapi.types.graphql.responses import Message, Thread, ThreadListResponse
from maufbapi.types.graphql.responses import Message, Thread
from mautrix.bridge import BaseUser, async_getter_lock
from mautrix.errors import MNotFound
from mautrix.types import (
Expand All @@ -44,6 +43,7 @@
TextMessageEventContent,
UserID,
)
from mautrix.util import background_task
from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
from mautrix.util.opt_prometheus import Gauge, Summary, async_time
from mautrix.util.simple_lock import SimpleLock
Expand Down Expand Up @@ -318,7 +318,7 @@ async def _load_session(self, is_startup: bool) -> bool:
self.is_connected = None
self.stop_listen()
self.stop_backfill_tasks()
asyncio.create_task(self.post_login(is_startup=is_startup))
background_task.create(self.post_login(is_startup=is_startup))
return True
return False

Expand Down Expand Up @@ -368,7 +368,9 @@ async def fetch_logged_in_user(
)
await asyncio.sleep(wait)
if refresh_proxy_on_failure:
self.proxy_handler.update_proxy_url()
self.proxy_handler.update_proxy_url(
f"{e.__class__.__name__} while trying to {action}"
)
await self.on_proxy_update()
except ResponseError:
if action != "restore session":
Expand Down Expand Up @@ -635,8 +637,20 @@ async def _sync_recent_threads(self, increment_total_backfilled_portals: bool =
# We need to get the sequence ID before we start the listener task.
resp = await self.client.fetch_thread_list()
self.seq_id = int(resp.sync_sequence_id)
thread_seq_ids = list(
{int(thread.sync_sequence_id) for thread in resp.nodes if thread.sync_sequence_id}
)
if len(thread_seq_ids) > 1 or (
len(thread_seq_ids) == 1 and thread_seq_ids[0] != self.seq_id
):
self.seq_id = max(*thread_seq_ids, self.seq_id)
self.log.warning(
f"Got more than one sequence ID in thread list: primary={resp.sync_sequence_id}, "
f"threads={thread_seq_ids}. Using highest value ({self.seq_id})"
)
if self.mqtt:
self.mqtt.seq_id = self.seq_id
self.log.debug(f"Got new seq_id {self.seq_id}")
await self.save_seq_id()
self.start_listen()

Expand Down Expand Up @@ -1004,7 +1018,7 @@ def _update_region_hint(self, region_hint: str) -> None:
self.log.debug(f"Got region hint {region_hint}")
if region_hint:
self.state.session.region_hint = region_hint
asyncio.create_task(self.save())
background_task.create(self.save())

async def _try_listen(self) -> None:
try:
Expand Down Expand Up @@ -1078,9 +1092,9 @@ async def _try_listen(self) -> None:
await asyncio.sleep(sleep_time)
self._prev_reconnect_fail_refresh = time.monotonic()
if action == "refresh":
asyncio.create_task(self.refresh())
background_task.create(self.refresh())
else:
asyncio.create_task(self.reconnect(fetch_user=True))
background_task.create(self.reconnect(fetch_user=True))
else:
self._disconnect_listener_after_error()
except Exception:
Expand Down Expand Up @@ -1154,7 +1168,7 @@ async def on_logged_in(self, state: AndroidState) -> None:
self.log.exception("Failed to fetch post-login info")
self.stop_listen()
self.stop_backfill_tasks()
asyncio.create_task(self.post_login(is_startup=True, from_login=True))
background_task.create(self.post_login(is_startup=True, from_login=True))

@async_time(METRIC_MESSAGE)
async def on_message(self, evt: mqtt_t.Message | mqtt_t.ExtendedMessage) -> None:
Expand Down Expand Up @@ -1219,7 +1233,7 @@ async def on_reaction(self, evt: mqtt_t.Reaction) -> None:
await portal.handle_facebook_reaction_add(self, puppet, evt.message_id, evt.reaction)

async def on_forced_fetch(self, evt: mqtt_t.ForcedFetch) -> None:
asyncio.create_task(self._try_on_forced_fetch(evt))
background_task.create(self._try_on_forced_fetch(evt))

async def _try_on_forced_fetch(self, evt: mqtt_t.ForcedFetch) -> None:
try:
Expand Down Expand Up @@ -1311,6 +1325,6 @@ async def on_message_sync_error(self, evt: mqtt_t.MessageSyncError) -> None:
def on_connection_not_authorized(self) -> None:
self.log.debug("Stopping listener and reloading session after MQTT not authorized error")
self.stop_listen()
asyncio.create_task(self.reload_session())
background_task.create(self.reload_session())

# endregion
5 changes: 3 additions & 2 deletions mautrix_facebook/web/segment_analytics.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from __future__ import annotations

import asyncio
import logging

from yarl import URL
import aiohttp

from mautrix.util import background_task

from .. import user as u

log = logging.getLogger("mau.web.public.analytics")
Expand All @@ -30,7 +31,7 @@ async def _track(user: u.User, event: str, properties: dict) -> None:

def track(user: u.User, event: str, properties: dict | None = None):
if segment_key:
asyncio.create_task(_track(user, event, properties or {}))
background_task.create(_track(user, event, properties or {}))


def init(key, user_id: str | None = None):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ asyncpg>=0.20,<0.28
ruamel.yaml>=0.15.94,<0.18
commonmark>=0.8,<0.10
python-magic>=0.4,<0.5
mautrix>=0.19.3,<0.20
mautrix>=0.19.4,<0.20
pycryptodome>=3,<4
paho-mqtt>=1.5,<2
zstandard

0 comments on commit 7deb7e1

Please sign in to comment.