Skip to content

Commit

Permalink
improved handling of escaped strings, testing
Browse files Browse the repository at this point in the history
  • Loading branch information
N-Coder committed Apr 10, 2020
1 parent 3cafe53 commit e44e4b3
Show file tree
Hide file tree
Showing 13 changed files with 586 additions and 220 deletions.
4 changes: 2 additions & 2 deletions doc/event-cmp.rst
Expand Up @@ -70,7 +70,7 @@ attributes.
>>> str(e)
'<floating Event>'
>>> e.serialize() # doctest: +ELLIPSIS
'BEGIN:VEVENT\r\nUID:...@....org\r\nDTSTAMP:2020...T...Z\r\nEND:VEVENT'
'BEGIN:VEVENT\r\nUID:...@...org\r\nDTSTAMP:2020...T...Z\r\nEND:VEVENT'
>>> import attr, pprint
>>> pprint.pprint(attr.asdict(e)) # doctest: +ELLIPSIS
{'_timespan': {'begin_time': None,
Expand All @@ -84,7 +84,7 @@ attributes.
'created': None,
'description': None,
'dtstamp': datetime.datetime(2020, ..., tzinfo=tzutc()),
'extra': [],
'extra': {'data': [], 'name': 'VEVENT'},
'extra_params': {},
'geo': None,
'last_modified': None,
Expand Down
3 changes: 2 additions & 1 deletion ics/__init__.py
Expand Up @@ -6,7 +6,8 @@ def load_converters():
from ics.converter.value import AttributeValueConverter
from ics.valuetype.base import ValueConverter
from ics.valuetype.datetime import DateConverter, DatetimeConverter, DurationConverter, PeriodConverter, TimeConverter, UTCOffsetConverter
from ics.valuetype.generic import BinaryConverter, BooleanConverter, CalendarUserAddressConverter, FloatConverter, IntegerConverter, RecurConverter, TextConverter, URIConverter
from ics.valuetype.generic import BinaryConverter, BooleanConverter, CalendarUserAddressConverter, FloatConverter, IntegerConverter, RecurConverter, URIConverter
from ics.valuetype.text import TextConverter
from ics.valuetype.special import GeoConverter


Expand Down
18 changes: 9 additions & 9 deletions ics/converter/value.py
Expand Up @@ -35,7 +35,7 @@ def ics_name(self) -> str:
name = self.attribute.name.upper().replace("_", "-").strip("-")
return name

def __parse_value(self, line: "ContentLine", value: str, context: ContextDict) -> Tuple[ExtraParams, ValueConverter]:
def __prepare_params(self, line: "ContentLine") -> Tuple[ExtraParams, ValueConverter]:
params = copy_extra_params(line.params)
value_type = params.pop("VALUE", None)
if value_type:
Expand All @@ -48,19 +48,19 @@ def __parse_value(self, line: "ContentLine", value: str, context: ContextDict) -
raise ValueError("can't convert %s with %s" % (line, self))
else:
converter = self.value_converters[0]
parsed = converter.parse(value, params, context) # might modify params and context
return params, parsed
return params, converter

# TODO make storing/writing extra values/params configurably optional, but warn when information is lost

def populate(self, component: "Component", item: ContainerItem, context: ContextDict) -> bool:
assert isinstance(item, ContentLine)
self._check_component(component, context)
if self.is_multi_value:
params = None
for value in item.value_list:
params, converter = self.__prepare_params(item)
for value in converter.split_value_list(item.value):
context[(self, "current_value_count")] += 1
params, parsed = self.__parse_value(item, value, context)
params = copy_extra_params(params)
parsed = converter.parse(value, params, context) # might modify params and context
params["__merge_next"] = True # type: ignore
self.set_or_append_extra_params(component, params)
self.set_or_append_value(component, parsed)
Expand All @@ -70,7 +70,8 @@ def populate(self, component: "Component", item: ContainerItem, context: Context
if context[(self, "current_value_count")] > 0:
raise ValueError("attribute %s can only be set once, second occurrence is %s" % (self.ics_name, item))
context[(self, "current_value_count")] += 1
params, parsed = self.__parse_value(item, item.value, context)
params, converter = self.__prepare_params(item)
parsed = converter.parse(item.value, params, context) # might modify params and context
self.set_or_append_extra_params(component, params)
self.set_or_append_value(component, parsed)
return True
Expand Down Expand Up @@ -129,8 +130,7 @@ def __serialize_multi(self, component: "Component", output: "Container", context
current_values.append(serialized)

if not merge_next:
cl = ContentLine(name=self.ics_name, params=params)
cl.value_list = current_values
cl = ContentLine(name=self.ics_name, params=params, value=converter.join_value_list(current_values))
output.append(cl)
current_params = None
current_values = []
Expand Down
187 changes: 125 additions & 62 deletions ics/grammar/__init__.py
@@ -1,15 +1,15 @@
import collections
import functools
import re
from pathlib import Path
from typing import List
import warnings
from typing import Generator, List, MutableSequence

import attr
import importlib_resources # type: ignore
import tatsu # type: ignore
from tatsu.exceptions import FailedToken # type: ignore

from ics.types import ContainerItem, ExtraParams, RuntimeAttrValidation, copy_extra_params
from ics.utils import limit_str_length
from ics.utils import limit_str_length, next_after_str_escape, validate_truthy

GRAMMAR = tatsu.compile(importlib_resources.read_text(__name__, "contentline.ebnf"))

Expand All @@ -35,28 +35,39 @@ class ContentLine(RuntimeAttrValidation):
value: str = attr.ib(default="")

# TODO store value type for jCal and line number for error messages
# TODO ensure (parameter) value escaping and name normalization

def serialize(self):
params_str = ''
return "".join(self.serialize_iter())

def serialize_iter(self, newline=False):
yield self.name
for pname in self.params:
params_str += ';{}={}'.format(pname, ','.join(self.params[pname]))
return "{}{}:{}".format(self.name, params_str, self.value)
yield ";"
yield pname
yield "="
for nr, pval in enumerate(self.params[pname]):
if nr > 0:
yield ","
if re.search("[:;,]", pval):
# Property parameter values that contain the COLON, SEMICOLON, or COMMA character separators
# MUST be specified as quoted-string text values.
# TODO The DQUOTE character is used as a delimiter for parameter values that contain
# restricted characters or URI text.
# TODO Property parameter values that are not in quoted-strings are case-insensitive.
yield '"%s"' % escape_param(pval)
else:
yield escape_param(pval)
yield ":"
yield self.value
if newline:
yield "\r\n"

def __getitem__(self, item):
return self.params[item]

def __setitem__(self, item, *values):
def __setitem__(self, item, values):
self.params[item] = list(values)

@property
def value_list(self) -> List[str]:
return re.split("(?<!\\\\),", self.value)

@value_list.setter
def value_list(self, list: List[str]):
self.value = ",".join(list)

@classmethod
def parse(cls, line):
"""Parse a single iCalendar-formatted line into a ContentLine"""
Expand All @@ -76,7 +87,7 @@ def interpret_ast(cls, ast):
params = ExtraParams(dict())
for param_ast in ast.get('params', []):
param_name = ''.join(param_ast["name"])
param_values = [''.join(x) for x in param_ast["values_"]]
param_values = [unescape_param(''.join(x)) for x in param_ast["values_"]]
params[param_name] = param_values
return cls(name, params, value)

Expand All @@ -88,7 +99,16 @@ def __str__(self):
return "%s%s='%s'" % (self.name, self.params or "", limit_str_length(self.value))


class Container(List[ContainerItem]):
def _wrap_list_func(list_func):
@functools.wraps(list_func)
def wrapper(self, *args, **kwargs):
return list_func(self.data, *args, **kwargs)

return wrapper


@attr.s(repr=False)
class Container(MutableSequence[ContainerItem]):
"""Represents an iCalendar object.
Contains a list of ContentLines or Containers.
Expand All @@ -98,97 +118,146 @@ class Container(List[ContainerItem]):
items: Containers or ContentLines
"""

def __init__(self, name: str, *items: ContainerItem):
self.check_items(*items)
super(Container, self).__init__(items)
self.name = name.upper()
name: str = attr.ib(converter=str.upper, validator=validate_truthy) # type:ignore
data: List[ContainerItem] = attr.ib(converter=list, default=[],
validator=lambda inst, attr, value: inst.check_items(*value))

def __str__(self):
return "%s[%s]" % (self.name, ", ".join(str(cl) for cl in self))
return "%s[%s]" % (self.name, ", ".join(str(cl) for cl in self.data))

def __repr__(self):
return "%s(%r, %s)" % (type(self).__name__, self.name, super(Container, self).__repr__())
return "%s(%r, %s)" % (type(self).__name__, self.name, repr(self.data))

def serialize(self):
name = self.name
ret = ['BEGIN:' + name]
return "".join(self.serialize_iter())

def serialize_iter(self, newline=False):
yield "BEGIN:"
yield self.name
yield "\r\n"
for line in self:
ret.append(line.serialize())
ret.append('END:' + name)
return "\r\n".join(ret)
yield from line.serialize_iter(newline=True)
yield "END:"
yield self.name
if newline:
yield "\r\n"

@classmethod
def parse(cls, name, tokenized_lines):
items = []
if not name.isupper():
warnings.warn("Container 'BEGIN:%s' is not all-uppercase" % name)
for line in tokenized_lines:
if line.name == 'BEGIN':
items.append(cls.parse(line.value, tokenized_lines))
elif line.name == 'END':
if line.value != name:
if line.value.upper() != name.upper():
raise ParseError(
"Expected END:{}, got END:{}".format(name, line.value))
if not name.isupper():
warnings.warn("Container 'END:%s' is not all-uppercase" % name)
break
else:
items.append(line)
return cls(name, *items)
else: # if break was not called
raise ParseError("Missing END:{}".format(name))
return cls(name, items)

def clone(self, items=None, deep=False):
"""Makes a copy of itself"""
if items is None:
items = self
items = self.data
if deep:
items = (item.clone() for item in items)
return type(self)(self.name, *items)
return attr.evolve(self, data=items)

def check_items(self, *items):
@staticmethod
def check_items(*items):
from ics.utils import check_is_instance
if len(items) == 1:
check_is_instance("item", items[0], (ContentLine, Container))
else:
for nr, item in enumerate(items):
check_is_instance("item %s" % nr, item, (ContentLine, Container))

def __setitem__(self, index, value):
self.check_items(value)
super(Container, self).__setitem__(index, value)
def __setitem__(self, index, value): # index might be slice and value might be iterable
self.data.__setitem__(index, value)
attr.validate(self)

def insert(self, index, value):
self.check_items(value)
super(Container, self).insert(index, value)
self.data.insert(index, value)

def append(self, value):
self.check_items(value)
super(Container, self).append(value)
self.data.append(value)

def extend(self, values):
self.check_items(*values)
super(Container, self).extend(values)

def __add__(self, values):
container = type(self)(self.name)
container.extend(self)
container.extend(values)
return container
self.data.extend(values)
attr.validate(self)

def __iadd__(self, values):
self.extend(values)
return self
def __getitem__(self, i):
if isinstance(i, slice):
return attr.evolve(self, data=self.data[i])
else:
return self.data[i]

__contains__ = _wrap_list_func(list.__contains__)
__delitem__ = _wrap_list_func(list.__delitem__)
__iter__ = _wrap_list_func(list.__iter__)
__len__ = _wrap_list_func(list.__len__)
__reversed__ = _wrap_list_func(list.__reversed__)
clear = _wrap_list_func(list.clear)
count = _wrap_list_func(list.count)
index = _wrap_list_func(list.index)
pop = _wrap_list_func(list.pop)
remove = _wrap_list_func(list.remove)
reverse = _wrap_list_func(list.reverse)


def escape_param(string: str) -> str:
return string.translate(
{ord("\""): "^'",
ord("^"): "^^",
ord("\n"): "^n",
ord("\r"): ""})


def unescape_param(string: str) -> str:
return "".join(unescape_param_iter(string))


def unescape_param_iter(string: str) -> Generator[str, None, None]:
it = iter(string)
for c1 in it:
if c1 == "^":
c2 = next_after_str_escape(it, full_str=string)
if c2 == "n":
yield "\n"
elif c2 == "^":
yield "^"
elif c2 == "'":
yield "\""
else:
yield c1
yield c2
else:
yield c1


def unfold_lines(physical_lines):
if not isinstance(physical_lines, collections.abc.Iterable):
raise ParseError('Parameter `physical_lines` must be an iterable')
current_line = ''
for line in physical_lines:
line = line.rstrip('\r')
if len(line.strip()) == 0:
continue
elif not current_line:
current_line = line.strip('\r')
current_line = line
elif line[0] in (' ', '\t'):
current_line += line[1:].strip('\r')
current_line += line[1:]
else:
yield current_line
current_line = line.strip('\r')
current_line = line
if current_line:
yield current_line

Expand Down Expand Up @@ -216,9 +285,3 @@ def lines_to_container(lines):

def string_to_container(txt):
return lines_to_container(txt.splitlines())


def calendar_string_to_containers(string):
if not isinstance(string, str):
raise TypeError("Expecting a string")
return string_to_container(string)

0 comments on commit e44e4b3

Please sign in to comment.