# Imports used in the project.

In [39]:
from dataclasses import dataclass
import pathlib
import base64
from typing import *
import tensorflow as tf
import keras as k

import io
import keras
from PIL import Image
import xmltodict


# Data Sets.
- first set : [Playing Cards Labelized Dataset](https://www.kaggle.com/hugopaigneau/playing-cards-dataset).
- second set : [Playing Cards](https://www.kaggle.com/vdntdesai11/playing-cards).

## Loading.

### Playing Cards

#### Classes

In [41]:
Paths: Dict[str, str] = {
  'resources': 'resources/',
  'cards': 'resources/cards/',
  'yolo': 'resources/cards/yolo-labeled/',
  'xml': 'resources/cards/xml-labeled/',
  'record': 'resources/cards/cards'
}
Classes = {
  'As': 1, 'Ac': 13, 'Ad': 26, 'Ah': 39,
  'Ks': 2, 'Kc': 14, 'Kd': 27, 'Kh': 40,
  'Qs': 3, 'Qc': 15, 'Qd': 28, 'Qh': 41,
  'Js': 4, 'Jc': 16, 'Jd': 29, 'Jh': 42,
  '10s': 5, '10c': 17, '10d': 30, '10h': 43,
  '9s': 6, '9c': 18, '9d': 31, '9h': 44,
  '8s': 7, '8c': 19, '8d': 32, '8h': 45,
  '7s': 8, '7c': 20, '7d': 33, '7h': 46,
  '6s': 9, '6c': 21, '6d': 34, '6h': 47,
  '5s': 10, '5c': 22, '5d': 35, '5h': 48,
  '4s': 11, '4c': 23, '4d': 36, '4h': 49,
  '3s': 12, '3c': 24, '3d': 37, '3h': 50,
  '2s': 25, '2c': 38, '2d': 51, '2h': 52,
}

Xml = Dict[str, 'Xml']
class FileReader(object):
  @classmethod
  def read(cls, filepath: str) -> bytes:
    with tf.io.gfile.GFile(filepath, 'rb') as file: return file.read()
class YoloReader(FileReader):
  pass
class XmlReader(FileReader):
  @classmethod
  def parse(cls, xmlpath: str) -> Xml:
    return xmltodict.parse(super().read(xmlpath))
class ImageReader(FileReader):
  @classmethod
  def encoded_file(cls, imagepath: str) -> bytes:
    raw = super().read(imagepath)
    return base64.b85encode(raw).decode('utf-8')

  @classmethod
  def encoded(cls, image: Image.Image) -> bytes:
    with io.BytesIO() as buffer:
      image.save(buffer, image.format)
      return base64.b85encode(buffer.getvalue())

  @classmethod
  def decoded(cls, imageraw: str) -> Image.Image:
    return Image.open(io.BytesIO(base64.b85decode(imageraw)))

  @classmethod
  def read(cls, imagepath: str) -> Image.Image:
    return Image.open(io.BytesIO(super().read(imagepath)))

  @classmethod
  def read_resized(cls, imagepath: str, size: Tuple[int, int] = (400, 400)) -> Image.Image:
    image = cls.read(imagepath)
    new = image
    new = new.resize(size, Image.ANTIALIAS)
    new.format = image.format
    return new

class FeatureProvider(object):
  @classmethod
  def i64(cls, value) -> tf.train.Feature:
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
  @classmethod
  def i64_list(cls, value) -> tf.train.Feature:
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))
  @classmethod
  def u8(cls, value) -> tf.train.Feature:
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
  @classmethod
  def u8_list(cls, value) -> tf.train.Feature:
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))
  @classmethod
  def f32(cls, value) -> tf.train.Feature:
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
  @classmethod
  def f32_list(cls, value) -> tf.train.Feature:
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))
class RecordProvider(FeatureProvider):
  @classmethod
  def record(cls, features: Dict[str, tf.train.Feature]) -> tf.train.Example:
    return tf.train.Example(features=tf.train.Features(feature=features))
class XMlRecordProvider(RecordProvider):
  @classmethod
  def to_record(cls, image: Image.Image, xml: Xml) -> tf.train.Example:
    return cls.record(cls._parse(image, xml))

  @classmethod
  def _parse(cls, image: Image.Image, xml: Xml):
    filename = xml['annotation']['filename']
    (width, height) = map(int, (xml['annotation']['size']['width'], xml['annotation']['size']['height']))
    xmins = []
    xmaxs = []
    ymins = []
    ymaxs = []
    texts = []
    labels = []
    for card in xml['annotation']['object']:
      xmins.append(int(card['bndbox']['xmin']) / width)
      xmaxs.append(int(card['bndbox']['xmax']) / width)
      ymins.append(int(card['bndbox']['ymin']) / height)
      ymaxs.append(int(card['bndbox']['ymax']) / height)
      texts.append(card['name'].encode('utf8'))
      labels.append(Classes[card['name']])

    return {
      'name': cls.u8(filename.encode('utf8')),
      'encoded': cls.u8(ImageReader.encoded(image)),
      'format': cls.u8(image.format.encode('utf8')),
      'height': cls.i64(image.height),
      'width': cls.i64(image.width),
      'cards/bbox/xmin': cls.f32_list(xmins),
      'cards/bbox/xmax': cls.f32_list(xmaxs),
      'cards/bbox/ymin': cls.f32_list(ymins),
      'cards/bbox/ymax': cls.f32_list(ymaxs),
      'cards/class/text': cls.u8_list(texts),
      'cards/class/label': cls.i64_list(labels),
    }

class FeatureWriter(object):
  @staticmethod
  def write(records: Iterable[tf.train.Example], target: str):
    with tf.io.TFRecordWriter(f"{target}.record") as writer:
      for record in records: writer.write(record.SerializeToString())

  @staticmethod
  def combine_write(record_iterators: Iterable[Iterable[tf.train.Example]], target: str, total: int = None):
    with tf.io.TFRecordWriter(f"{target}.record") as writer:
      for (index, record) in enumerate((record for iterator in record_iterators for record in iterator), start=1):
        if (index % 100) == 0: print(f"{index}{f'/{total}' if total else ''} records written.")
        writer.write(record.SerializeToString())
class FeatureReader(object):
  @classmethod
  def read(cls, source: str) -> tf.data.TFRecordDataset:
    return tf.data.TFRecordDataset(f"{source}.record")

xml_directory = pathlib.Path(Paths['xml'])
image_count = len(list(xml_directory.glob('*.jpg')))
images = map(ImageReader.read, xml_directory.glob('*.jpg'))
xmls = map(XmlReader.parse, xml_directory.glob('*.xml'))

xml_count = 2 * image_count
xml_records = map(lambda data: XMlRecordProvider.to_record(*data), zip(images, xmls))
FeatureWriter.combine_write((xml_records, xml_records), Paths['record'], total=xml_count)

100/12000 records written.
200/12000 records written.
300/12000 records written.
400/12000 records written.
500/12000 records written.
600/12000 records written.
700/12000 records written.


KeyboardInterrupt: 

#### Example

In [None]:
# print(f"TensorFlow version: {tf.__version__}")
# print(f"|> Training set row example <|")
# print(dfs['train'].head())
#
# print(f"|> Test set row example <|")
# print(dfs['test'].head())
# print(len(dfs['test']))
#
# print(f"|> Dataset classes <|")
# print(dfs['train']['class'].unique())
# print(f"len(dfs['train'])")
#
# print("|> Example paths <|")
# for cardpath in cardpaths.take(5): print(cardpath.numpy())

#### Features

## Augmenting.

### Read saved data from record

In [None]:
test_data = tf.data.TFRecordDataset(Paths['record'])

for data in test_data.take(1):
  example = tf.train.Example()
  example.ParseFromString(data.numpy())
  bytes_ = example.features.feature['image/encoded'].bytes_list.value[0]
  Image.open(io.BytesIO(bytes_)).save('temp.png')
  print(example)

## Saving.

## Loading augmented.

# Model.

## Creating.

In [None]:

model: keras.Sequential = keras.Sequential([
  tf.keras.layers.Rescaling(1. / 255),
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.MaxPooling2D(),
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.MaxPooling2D(),
  tf.keras.layers.Conv2D(32, 3, activation='relu'),
  tf.keras.layers.MaxPooling2D(),
  tf.keras.layers.Flatten(),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dense(len(Classes))
])

## Compiling

In [None]:
model.compile(
  optimizer='adam',
  loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
  metrics=['accuracy']
)

## Training.

In [None]:
model.fit(
  dfs['train']
)

## Validating.

## Wniosking.