<a href="https://colab.research.google.com/github/Mat-O-Lab/CSVToCSVW/blob/main/csv_parser.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [72]:
#-*- coding: UTF-8 -*-
#@title Code - Run Once To Start { vertical-output: true, display-mode: "form" }
from numpy.core.numeric import NaN
import ipywidgets as widgets
from IPython.display import display, clear_output
import matplotlib.pyplot as plt
import pandas as pd 
import io
import sys
import ast, re
import base64
import json
from dateutil.parser import parse
from contextlib import redirect_stderr
from csv import Sniffer
import chardet
from urllib.request import urlopen
from urllib.parse import urlparse, unquote
from urllib.error import URLError

%matplotlib notebook

!pip install Owlready2
from owlready2 import *

#there is a bug in Owlready2 when having imports in turtle in a owl file
# if the error is thrown, load again and it is fine
try:
  mseo=get_ontology("https://purl.matolab.org/mseo/mid").load()
except:
  mseo=get_ontology("https://purl.matolab.org/mseo/mid").load()
  
cco_mu=get_ontology("http://www.ontologyrepository.com/CommonCoreOntologies/Mid/UnitsOfMeasureOntology/").load()
qudt=get_ontology('http://www.qudt.org/qudt/owl/1.0.0/unit.owl').load()

class CSV_Annotator():
  def __init__(self, csv_url=''):
      self.csv_url = csv_url
      self.json_ld_context=[
        "http://www.w3.org/ns/csvw", {
        "cco": "http://www.ontologyrepository.com/CommonCoreOntologies/",
        "mseo": mseo.base_iri,
        "label": "http://www.w3.org/2000/01/rdf-schema#label",
        "xsd": "http://www.w3.org/2001/XMLSchema#"}
        ]
      self.umlaute_dict = {
      '\u00e4': 'ae',  # U+00E4	   \xc3\xa4
      '\u00f6': 'oe',  # U+00F6	   \xc3\xb6
      '\u00fc': 'ue',  # U+00FC	   \xc3\xbc
      '\u00c4': 'Ae',  # U+00C4	   \xc3\x84
      '\u00d6': 'Oe',  # U+00D6	   \xc3\x96
      '\u00dc': 'Ue',  # U+00DC	   \xc3\x9c
      '\u00df': 'ss',  # U+00DF	   \xc3\x9f
      }
  def open_file(self,uri=''):
      try:
          uri_parsed = urlparse(uri)    
      except:
          print('not an uri - if local file add file:// as prefix')
          return None
      else:
        filename=unquote(uri_parsed.path).split('/')[-1]
        if uri_parsed.scheme in ['https', 'http']:
          filedata = urlopen(uri).read()
        elif uri_parsed.scheme == 'file':
          filedata=open(unquote(uri_parsed.path), 'rb').read()
        else:
          print('unknown scheme {}'.format(uri_parsed.scheme))
          return None
        return filedata, filename   
      
  def _create_initial_widgets(self):
      self.url_widget=widgets.Text(
          value='',
          placeholder='put ur url to a *-metadata.json here',
          description='Url:',
          disabled=False
          )
      self.uploader = widgets.FileUpload(accept='',  # Accepted file extension e.g. '.txt', '.pdf', 'image/*', 'image/*,.pdf'
                                          multiple=False,  # True to accept multiple files upload else False
                                          description='Upload'
                                          )
      self.clear_button = widgets.Button(description='Clear!', layout=widgets.Layout(width='100px')); 
      self.file_dialog= widgets.HBox([widgets.Label(value="File:"), self.url_widget ,self.uploader,self.clear_button])
      self.clear_button.on_click(self._on_clear)
      
      self.out = widgets.Output()  # this is the output widget in which the df is displayed
      self.encoding = widgets.Dropdown(
          options=['auto', 'ISO-8859-1', 'utf-8', 'ascii', 'latin-1','cp273'],
          value='auto',
          description='Encoding:',
          disabled=False,
      )
      self.separator = widgets.Dropdown(
          options=['auto', ',',';', '\t', '|', "\s+","\s+|\t+|\s+\t+|\t+\s+"],
          value='auto',
          description='separator:',
          disabled=False,
      )
      self.settings= widgets.HBox([self.encoding, self.separator])
      self.process_button = widgets.Button(description='Process!', layout=widgets.Layout(width='200px')); 
      self.process_button.on_click(self._on_process)
  def _on_clear(self,button):
    self.url_widget.value=''
    self.uploader.value.clear()
    self.uploader._counter = 0

  def _on_process(self,button):
    with self.out:
      clear_output()
      if not (self.url_widget.value or self.uploader.value.keys()):
          print('pls upload a file first or insert a url')
          return
      if self.url_widget.value:
        self.file_data, self.file_name=self.open_file(self.url_widget.value)
      else:
        input_file=self.uploader.value[list(self.uploader.value.keys())[0]]
        #self.file_name=input_file['metadata']['name']
        self.file_data=io.StringIO(input_file['content']).read(100000)
        self.file_data=input_file['content']
      if self.encoding.value=='auto' or not self.encoding:
        self.encoding.value=self.get_encoding(self.file_data[:256])
      #print('encoding: {}'.format(self.encoding.value))
      self.file_data=self.file_data.decode(self.encoding.value)

      if self.separator.value=='auto':
        try:
          self.separator.value=self.get_column_separator(self.file_data)
        except:
          print('cant find separator, pls manualy select')
      #print('separator: {}'.format(self.separator.value))
      metafile_name, result =self.process_file(self.file_name,self.file_data[:],self.separator.value,self.encoding.value)
      #print(result)
      res = result
      b64 = base64.b64encode(res.encode())
      payload = b64.decode()
      html_buttons = '''<html>
      <head>
      <meta name="viewport" content="width=device-width, initial-scale=1">
      </head>
      <body>
      <a download="{filename}" href="data:text/json;base64,{payload}" download>
      <button class="p-Widget jupyter-widgets jupyter-button widget-button mod-warning">Download File</button>
      </a>
      </body>
      </html>
      '''
      html_button = html_buttons.format(payload=payload,filename=metafile_name)
      display(widgets.HTML(html_button))

    
  def display_widgets(self):
    self._create_initial_widgets()
    display(widgets.VBox(
                [
                  self.file_dialog,
                  self.settings,
                  self.process_button,
                  self.out
                ]
            )
    )

  def get_encoding(self,file_data):
    result = chardet.detect(file_data)
    return result['encoding']

  def get_column_separator(self,file_data):
    #f.seek(0)
    #print(file_string) 
    sniffer = Sniffer()
    dialect = sniffer.sniff(file_data[:512])
    return dialect.delimiter

  def get_header_lenght(self,file_data, separator_string, encoding):
    string_stream = io.StringIO(file_data)
    #file.seek(0)
    f = io.StringIO()
    with redirect_stderr(f):
        df = pd.read_csv(string_stream,sep=separator_string,error_bad_lines=False,warn_bad_lines=True,header=None)
    f.seek(0)
    #without utf string code b' 
    warn_str=f.read()[2:-2]
    warnlist=warn_str.split('\\n')[:-1]
    #print(warnlist)
    # readout row index and column count in warnings
    line_numbers=[int(re.search('Skipping line (.+?):', line).group(1)) for line in warnlist]
    column_numbers=[int(line[-1]) for line in warnlist]
    column_numbersm1=column_numbers.copy()
    if not column_numbersm1:
      #no additional header
      return 0,0
    #pop lats element, so column_numbers is always lenght +1
    column_numbersm1.pop(-1)
    #assumes that the file ends with a uniform table with constant column count
    #determine changes in counted columns starting from the last line of file
    changed_column_count_line=[line_numbers[index+1] for index in reversed(range(len(column_numbersm1))) if column_numbersm1[index]!=column_numbers[index+1]]
    #print(changed_column_count_line)
    
    if changed_column_count_line:
      # additional header has ends in line before the last change of column count
      first_head_line=changed_column_count_line[0]-1
    elif line_numbers:
      first_head_line=line_numbers[0]-1
    else:
      first_head_line=0
    max_columns_additional_header=(max(column_numbers[:line_numbers.index(first_head_line+1)-1]))
    return first_head_line, max_columns_additional_header

  def get_num_header_rows_and_dataframe(self,file_data,separator_string, header_lenght, encoding):
    string_stream = io.StringIO(file_data)
    num_header_rows=1
    #decimal_delimiter='.'
    good_readout=False
    while not good_readout:
      string_stream.seek(0)
      table_data = pd.read_csv(string_stream,header=list(range(num_header_rows)),sep=separator_string,skiprows=header_lenght,encoding=encoding)    
      #test if all text values in first table row -> is a second header row
      all_text=all([self.get_value_type(value)=='TEXT' for column,value in table_data.iloc[0].items()])
      if all_text and num_header_rows<10:
        num_header_rows+=1
        continue
      else:
        good_readout=True
    return num_header_rows, table_data

  def get_unit(self,text):
    #first remove brackets if any
    to_test=text.strip('[]')
    found=list(cco_mu.search(alternative_label=to_test))\
            +list(cco_mu.search(SI_unit_symbol=to_test))\
            +list(mseo.search(alternative_label=to_test))\
            +list(mseo.search(SI_unit_symbol=to_test))\
            +list(qudt.search(symbol=to_test))\
            +list(qudt.search(abbreviation=to_test))\
            +list(qudt.search(ucumCode=to_test))
    if found:
      return {"cco:uses_measurement_unit": {"@id": str(found[0].iri), "@type": str(found[0].is_a)}}
    else:
      return {}

  def is_date(self,string, fuzzy=False):
      try: 
          parse(string, fuzzy=fuzzy)
          return True

      except ValueError:
          return False

  def get_value_type(self,string):
      string=str(string)
      #remove spaces and replace , with . and
      string=string.strip().replace(',','.')
      if len(string) == 0: return 'BLANK'
      try:
          t=ast.literal_eval(string)
      except ValueError:
          return 'TEXT'
      except SyntaxError:
          if self.is_date(string):
            return 'DATE'
          else:
            return 'TEXT'
      else:
          if type(t) in [int, float, bool]:
            if type(t) is int:
                return 'INT'
            if t in set((True,False)):
                return 'BOOL'
            if type(t) is float:
                return 'FLOAT'
          else:
              return 'TEXT' 

  def describe_value(self,value_string):
    if pd.isna(value_string):
      return {}
    elif self.get_value_type(value_string)=='INT':
      return {'cco:has_integer_value': {'@value':value_string, '@type': 'xsd:integer'}}
    elif self.get_value_type(value_string)=='BOOL':
      return {'cco:has_bolean_value': {'@value':value_string, '@type': 'xsd:boolean'}}
    elif self.get_value_type(value_string)=='FLOAT':
      return {'cco:has_decimal_value': {'@value':value_string, '@type': 'xsd:decimal'}}
    elif self.get_value_type(value_string)=='DATE':
      return {'cco:has_datetime_value': {'@value':str(parse(value_string)), '@type': 'xsd:dateTime'}}
    else:
      # check if its a unit
      unit_dict=self.get_unit(value_string)
      if unit_dict:
        return unit_dict
      else:
        return {'cco:has_text_value': {'@value':value_string, '@type': 'xsd:string'}}

  def make_id(self,string,namespace=None):
    for k in self.umlaute_dict.keys():
          string = string.replace(k, self.umlaute_dict[k])
    if namespace:
      return namespace+':'+re.sub('[^A-ZÜÖÄa-z0-9]+', '', string.title().replace(" ", ""))
    else:
      return './'+re.sub('[^A-ZÜÖÄa-z0-9]+', '', string.title().replace(" ", ""))

  def get_additional_header(self,file_data,separator,encoding):
    # get lenght of additional header
    header_lenght, max_columns_additional_header=self.get_header_lenght(file_data,separator,encoding)
    #print(header_lenght)
    if header_lenght:
      #print(header_lenght,max_columns_additional_header)
      string_stream = io.StringIO(file_data)
      header_data = pd.read_csv(string_stream,header=None,sep=separator,nrows=header_lenght,names=range(max_columns_additional_header),encoding=encoding,skip_blank_lines=False)
      #header_data['row']=header_data.index
      #header_data.dropna(how='all', inplace=True)
      header_data.rename(columns={0: 'label'}, inplace=True)
      #header_data.set_index('param',inplace=True)
      #header_data=header_data[~header_data.index.duplicated()]
      header_data.dropna(thresh=2, inplace=True)
      header_data.dropna(axis=1, how='all', inplace=True)
      return header_data, header_lenght
    else:
      return None, 0


  def serialize_header(self,header_data,file_namespace=None):
    params=list()
    info_line_iri="cco:InformationLine"
    for row, data in header_data.to_dict(orient='index').items():
      #describe_value(data['value'])
      para_dict={'@id': self.make_id(data['label']+str(row),file_namespace),'label':data['label'],'@type': info_line_iri}
      para_dict['mseo:has_row_index']={"@value": row,"@type": "xsd:integer"}
      for col_name, value in data.items():
        #print(parm_name,col_name, value)
        para_dict={**para_dict,**self.describe_value(value)}
      params.append(para_dict)
    #print(params)
    return params
    

  def process_file(self,file_name,file_data,separator,encoding):
    #init results dict
    data_root_url="https://github.com/Mat-O-Lab/resources/"
    #file_namespace=data_root_url+file_name.split('.')[0]
    file_namespace=None
    metadata_csvw = dict()
    metadata_csvw["@context"]=self.json_ld_context
    #metadata_csvw["@id"]=file_namespace
    metadata_csvw["url"]=file_name
    # read additional header lines and provide as meta in results dict
    header_data, header_lenght=self.get_additional_header(file_data,separator,encoding)
    #print(header_lenght)
    #print(header_data)
    #metadata_csvw["params"]=header_data.dropna().to_dict(orient='index')
    if header_lenght:
      #print("serialze additinal header")
      metadata_csvw["notes"]=self.serialize_header(header_data,file_namespace)
    # read tabular data structure, and determine number of header lines for column description used
    #print(get_num_header_rows_and_dataframe(file_data,separator,header_lenght,encoding))
    #print(header_lenght)
    header_lines, table_data=self.get_num_header_rows_and_dataframe(file_data,separator,header_lenght,encoding)
    #print(header_lines)
    # describe dialect
    metadata_csvw["dialect"]={"delimiter": separator,
    "skipRows": header_lenght, "headerRowCount": header_lines, "encoding": encoding}
    # describe columns
    if header_lines==1:
      # see if there might be a unit string at the end of each title
      column_json=list()
      for index, title in enumerate(table_data.columns):
        if len(title.split(' '))>1:
          #print('trying to find unit for string {}'.format(title.split(' ')[-1]))
          unit_json=self.get_unit(title.split(' ')[-1])  
          
        else:
          unit_json={}
        json_str={**{'titles': title,'@id': self.make_id(title), "@type": "Column"},**unit_json}
        column_json.append(json_str)
      metadata_csvw["tableSchema"]={"columns":column_json}
      #metadata_csvw["tableSchema"]={"columns":list({'titles':column, '@id': make_id(column), "@type": "Column"} for column in table_data.columns)}
    else:
      column_json=list()
      for index, (title,unit_str) in enumerate(table_data.columns):
        json_str={**{'titles': title,'@id': self.make_id(title), "@type": "Column"},**self.get_unit(unit_str)}
        #print(json_str)
        column_json.append(json_str)
      metadata_csvw["tableSchema"]={"columns":column_json}
    result=json.dumps(metadata_csvw, indent = 4)
    meta_file_name = file_name.split(sep='.')[0] + '-metadata.json'
    return meta_file_name, result



In [74]:
#@title Dialog - Run Cell to begin
dialog = CSV_Annotator()
dialog.display_widgets()

VBox(children=(HBox(children=(Label(value='File:'), Text(value='', description='Url:', placeholder='put ur url…