<a href="https://colab.research.google.com/github/Sharkvault/AvscBuilder/blob/main/AvscBuilder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Avsc Builder
The avsc oprojet is used to dynamically generate avro schemas based on certain information. Normally we can say get csv contianing tables and based on that gfo build a schema.


In [67]:
# import everything
from logging import Logger, FileHandler
from pathlib import Path
from re import compile, escape
from typing import Union, List, Tuple
from datetime import datetime
from hashlib import md5
from json import dumps
from pandas import read_csv, DataFrame

In [37]:
# Create a logger
logger = Logger("me")

In [68]:
# Create file finder

def find_files(
    root_dir:Union[str,Path],
    regex_file_name_pattern:str,
    file_format:str="csv"
    ) -> List[Path]:
  """
    Using a specific directory traverse the directory
    and find all files ending with a specifc file format
    Each of those found files need to be valid regarding some pattern

    Args:
      root_dir: top level directory to check for metadata files
      regex_file_name_pattern: pattern to look for in file names
      file_format: format of the file

    Raises:
      FileNotFoundError:  root_dir does not exists
      NotADirectoryError:  root_dir is not a directory

    Returns:
      List of file paths

  """
  if isinstance(root_dir,str):
    root_dir = Path(root_dir)
  if not root_dir.exists():
    err_msg = f"The root directory {str(root_dir)} does not exist"
    logger.error(err_msg)
    raise FileNotFoundError(err_msg)
  if not root_dir.is_dir():
    err_msg = f"{root_dir} is not a directoy"
    logger.error(err_msg)
    raise NotADirectoryError(err_msg)

  pattern = compile(rf"{regex_file_name_pattern}.*\.{file_format}$")
  found_files:List[Path] = []
  for found_file in root_dir.rglob(f"*.{file_format}"):
    if pattern.search(found_file.name):
      found_files.append(found_file)

  return found_files



In [69]:
def write_file(
    path:Union[Path,str],
    file_name:str,
    content:str,
    logger:Logger):
  """
    Writing the file to a location, creating if it doesnt exists

    Args:
      path: Folder for the file
      file_name: Name of the file
      content: Content to be written to the file
      logger: Logger object for logging purposes

    Raises:
      FileNotFoundError:  File does not exists and is not of type file
      NotADirectoryError:  Path is not a directory

  """

  if isinstance(path,str):
    path = Path(path)
  if not path.exists():
    path.mkdir(f"The path {str(path)} will be created")
  if not path.is_dir():
    err_msg = f"{path} is not a directoy"
    logger.error(err_msg)
    raise NotADirectoryError(err_msg)
  # contains the full path to file inclusivly
  file_path = path.joinpath(file_name)
  if not file_path.exists():
    file_path.touch()
  if not file_path.is_file():
    err_msg = f"{file_path} is not a file"
    logger.error(err_msg)
    raise FileNotFoundError(err_msg)
  with open(file_path,'w') as file:
    file.writelines(content)


In [40]:
db2_to_avro = {
    'decimal': 'decimal',
    'numeric': 'decimal',
    'char': 'string',
    'int': 'int',
    'boolean': 'boolean',
    'date': 'date',
    'timestamp': 'timestamp',
    'clob':'string',
    'varchar':'string'
}

def map_to_avro(df, db_type):
    factory = {"db2": db2_to_avro}
    df['avrotype'] = df['datatype'].map(db2_to_avro)
    return df


In [61]:

def build_avsc_file(df, prefix:str="") -> Tuple[str,str]:
  """
    Building the avro schema file based on the df

    Args
      df: Dataframe for the table
      prefix: Prefix for the table name (e.g. rw)

    Returns
      str: Avroschema as json string
      str: Name of the table defined in the dataframe
  """
  avro_fields = []
  avro_table_name = None
  for i, row in df.iterrows():
      avro_data_type = row['avrotype']
      if avro_data_type == 'decimal':
          avro_type = {
              'type': 'bytes',
              'logicalType': 'decimal',
              'precision': 38,
              'scale': 18
          }
      elif avro_data_type == 'int':
          avro_type = {
              'type': 'bytes',
              'logicalType': 'decimal',
              'precision': 38,
              'scale': 0
          }
      else:
          avro_type = 'string'

      avro_field = {
          'name': row['fieldname'],
          'type': [avro_type, 'null']
      }
      avro_table_name = row['tablename']
      avro_fields.append(avro_field)

  avro_schema = {
        " type " : "record",
        " namespace " : "com.company.com",
        " name " : f"{prefix}_{avro_table_name}",
        " fields " : avro_fields
      }
  return dumps(avro_schema), avro_table_name


In [70]:
class AvscBuilder:
  def __init__(self, settings:dict, file_path:Path,logger:Logger):
    self.settings = settings
    self.logger = logger
    self.file_path = file_path
    self.df:DataFrame = None
    self.avsc = None
    self.table_name = None

  def build(self):
    build_schemas = (
        self._create_df()
        ._lower_column_names()
        ._add_avro_column()
        ._construct_avsc_content()
        ._write_avsc_file()
    )
    return build_schemas

  def _create_df(self):
    self.logger.info(f"Reading file: {self.file_path}")
    self.df = read_csv(self.file_path)
    return self

  def _lower_column_names(self):
    self.df.columns = self.df.columns.str.lower()
    return self


  def _add_avro_column(self):
    self.df = map_to_avro(df=self.df,db_type="db2")
    self.df["avrotype"].fillna("string", inplace=True)
    return self

  def _construct_avsc_content(self):
    self.avsc, self.table_name = build_avsc_file(self.df)
    return self

  def _build_table_name(self) -> str:
    current_time = datetime.now().strftime("%Y%m%d%H%M%S")
    md5_content = md5(self.avsc.encode())\
      .hexdigest()
    return f"{self.table_name}_{md5_content}_{current_time}.avsc"

  def _write_avsc_file(self):
    logger.info(self.avsc)
    write_file(
        path="./",
        file_name=self._build_table_name(),
        content=self.avsc,
        logger=logger
        )


for file_path in find_files(root_dir="./",file_format="csv",regex_file_name_pattern="test"):
  AvscBuilder(file_path=file_path,settings={},logger=logger)\
    .build()