# iFLYTEK Chinese Semantic Checker

This notebook is used to check the semantic of Chinese language unit. It is based on iFLYTEK's API.

Author: Zexin Xu, Zilu Zhang

In [1]:
import pandas as pd

## iFLYTEK API

This snippet invloves the iFLYTEK API. `API_key` and `Secret_key` are deleted for security reasons. Please use your own API key and secret key. For more details of the usage of iFLYTEK API, please refer to [iFLYTEK API](https://www.xfyun.cn/doc/nlp/textCorrection/API.html#%E8%BF%94%E5%9B%9E%E7%BB%93%E6%9E%9C). If you are a English user, there is a language switch button on the top right corner.

In [1]:
# -*- coding:utf-8 -*-
from datetime import datetime
from wsgiref.handlers import format_date_time
from time import mktime
import hashlib
import base64
import hmac
from urllib.parse import urlencode
import json
import requests


class AssembleHeaderException(Exception):
    def __init__(self, msg):
        self.message = msg


class Url:
    def __init__(this, host, path, schema):
        this.host = host
        this.path = path
        this.schema = schema
        pass


class WebsocketDemo:
    def __init__(self,APPId,APISecret,APIKey,Text):
        self.appid = APPId
        self.apisecret = APISecret
        self.apikey = APIKey
        self.text = Text
        self.url = 'https://api.xf-yun.com/v1/private/s9a87e3ec'

    # calculate sha256 and encode to base64
    def sha256base64(self,data):
        sha256 = hashlib.sha256()
        sha256.update(data)
        digest = base64.b64encode(sha256.digest()).decode(encoding='utf-8')
        return digest


    def parse_url(self,requset_url):
        stidx = requset_url.index("://")
        host = requset_url[stidx + 3:]
        schema = requset_url[:stidx + 3]
        edidx = host.index("/")
        if edidx <= 0:
            raise AssembleHeaderException("invalid request url:" + requset_url)
        path = host[edidx:]
        host = host[:edidx]
        u = Url(host, path, schema)
        return u


    # build websocket auth request url
    def assemble_ws_auth_url(self,requset_url, method="POST", api_key="", api_secret=""):
        u = self.parse_url(requset_url)
        host = u.host
        path = u.path
        now = datetime.now()
        date = format_date_time(mktime(now.timetuple()))
        #print(date)
        # date = "Thu, 12 Dec 2019 01:57:27 GMT"
        signature_origin = "host: {}\ndate: {}\n{} {} HTTP/1.1".format(host, date, method, path)
        #print(signature_origin)
        signature_sha = hmac.new(api_secret.encode('utf-8'), signature_origin.encode('utf-8'),
                                 digestmod=hashlib.sha256).digest()
        signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
        authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
            api_key, "hmac-sha256", "host date request-line", signature_sha)
        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
        #print(authorization_origin)
        values = {
            "host": host,
            "date": date,
            "authorization": authorization
        }

        return requset_url + "?" + urlencode(values)


    def get_body(self):
        body =  {
            "header": {
                "app_id": self.appid,
                "status": 3,
                #"uid":"your_uid"
            },
            "parameter": {
                "s9a87e3ec": {
                    #"res_id":"your_res_id",
                    "result": {
                        "encoding": "utf8",
                        "compress": "raw",
                        "format": "json"
                    }
                }
            },
            "payload": {
                "input": {
                    "encoding": "utf8",
                    "compress": "raw",
                    "format": "plain",
                    "status": 3,
                    "text": base64.b64encode(self.text.encode("utf-8")).decode('utf-8')
                }
            }
        }
        return body

    def get_result(self):
        request_url = self.assemble_ws_auth_url(self.url, "POST", self.apikey, self.apisecret)
        headers = {'content-type': "application/json", 'host':'api.xf-yun.com', 'app_id':self.appid}
        body = self.get_body()
        response = requests.post(request_url, data = json.dumps(body), headers = headers)
        # print('onMessage：\n' + response.content.decode())
        tempResult = json.loads(response.content.decode())
        # print('text字段解析：\n' + base64.b64decode(tempResult['payload']['result']['text']).decode())
        return json.loads(base64.b64decode(tempResult['payload']['result']['text']).decode())


In [None]:
#NOTE Put your dataframes here, and modify the column names accordingly
tunit_df = pd.DataFrame()

In [None]:
def get_result(result):
    if len(result) > 0:
        print(str(result))
        return str(result)
    else:
        return pd.NA
    
def generate_result(df):
    APPId = ""
    APISecret = ""
    APIKey = ""

    for i, row in df.loc[:, :].iterrows():
        demo = WebsocketDemo(APPId, APISecret, APIKey, row['sentences'])
        result = demo.get_result()
        df.loc[i, "政治术语纠错"] = get_result(result['pol']) 
        df.loc[i, "别字纠错"] = get_result(result['char']) 
        df.loc[i, "别词纠错"] = get_result(result['word'])
        df.loc[i, "语法纠错-冗余"] = get_result(result['redund']) 
        df.loc[i, "语法纠错-缺失"] = get_result(result['miss']) 
        df.loc[i, "语法纠错-乱序"] = get_result(result['order']) 
        df.loc[i, "搭配纠错"] = get_result(result['dapei']) 
        df.loc[i, "标点纠错"] = get_result(result['punc']) 
        df.loc[i, "成语纠错"] = get_result(result['idm']) 
        df.loc[i, "机构名纠错"] = get_result(result['org']) 
        df.loc[i, "领导人职称纠错"] = get_result(result['leader']) 
        df.loc[i, "数字纠错"] = get_result(result['number']) 
        df.loc[i, "地名纠错"] = get_result(result['addr'])
        df.loc[i, "全文人名纠错"] = get_result(result['name']) 
        df.loc[i, "句式杂糅/语义重复"] = get_result(result['grammar_pc']) 
        if i % 100 == 0:
            print(i, "iters done...")

kd_tunit_df = tunit_df.copy()
generate_result(kd_tunit_df)

## Result processsing

In [11]:
kd_tunit_result = pd.read_csv('kedaxunfei/tunit_df_result.csv', encoding='utf-8')
kd_sen_result = pd.read_csv('kedaxunfei/sen_df_result.csv', encoding='utf-8')

# Process the result to combine all types of error message into label
def result_processing(df, tru_df):
    df['纠错数'] = 15 - df.apply(lambda x: x.isnull().sum(), axis='columns')
    for i, row in df.iterrows():
        df.loc[i, 'prediction_label'] = False if row['纠错数'] > 0 else True
    df['ground_truth_label'] = tru_df['ground_truth_label']

# Plug in your ground truth dataframe here as second args
result_processing(kd_tunit_result, tunit_df)
kd_tunit_result.to_csv('kedaxunfei/tunit_df_result_mod.csv', index=False, encoding='utf-8-sig')
