In [96]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, lower, regexp_replace, split, trim
from decimal import Decimal
from termcolor import colored
import pyspark
import pyspark.sql.functions as F

In [97]:
def clean_text(text):
    print("Then: ", text)
    text = lower(text)
    text = trim(text)
    text = regexp_replace(text, "’", "'")
    text = regexp_replace(text, '"', "")
    text = regexp_replace(text, "[\s]+[\n]+", "")
    text = regexp_replace(text, "\t", "")
    text = regexp_replace(text, "\s", " ")
    print("Now: ", text)
    return text

In [98]:
# Datos de aplicación de Spark
appName = "es_kaq_dataset"
master = "spark://mynorxico:7077"

spark = SparkSession.builder\
    .appName(appName)\
    .master(master)\
    .getOrCreate()

In [99]:
# Folder de los datasets
file_path = "../Desktop/raw_sentences/"
a = '""'

## Dataset Español-Kaqchikel de Nuevo Testamento

In [100]:
cak_file_name = "Cakchiquel-NT.xml"
es_file_name = "Spanish.xml"

# Creación de dataframes
df_es = spark.read.format('xml')\
    .options(rowTag="seg")\
    .load(file_path+es_file_name)
df_cak = spark.read.format('xml')\
    .options(rowTag="seg")\
    .load(file_path+cak_file_name)
(df_es.schema, df_cak.schema)

(StructType(List(StructField(_id,StringType,true),StructField(_type,StringType,true),StructField(content,StringType,true))),
 StructType(List(StructField(_id,StringType,true),StructField(_type,StringType,true),StructField(content,StringType,true))))

In [101]:
#Primeros dos versos de cada dataframe
df_es.show(2)
df_cak.show(2)

+---------+-----+--------------------+
|      _id|_type|             content|
+---------+-----+--------------------+
|b.GEN.1.1|verse|
						En el prin...|
|b.GEN.1.2|verse|
						Y la tierr...|
+---------+-----+--------------------+
only showing top 2 rows

+---------+-----+--------------------+
|      _id|_type|             content|
+---------+-----+--------------------+
|b.MAT.1.1|verse|
						Re wuj ri ...|
|b.MAT.1.2|verse|
						Ri Abraham...|
+---------+-----+--------------------+
only showing top 2 rows



In [102]:
# Inner join para hacer match de dataframe de español y dataframe de kaqchikel
cak = df_cak.alias('cak')
es = df_es.alias('es')
df_new_testament = es.join(cak, es._id==cak._id)

In [103]:
print("#Kaqchikel verses: ", cak.count())
print("#Spanish verses: ", es.count())

print("#cak_es new testament verses: ", df_new_testament.count())

#Kaqchikel verses:  7852
#Spanish verses:  31100
#cak_es new testament verses:  7851


## Dataset Español-Kaqchikel de Historia y Leyes

In [104]:
history_filename = "es_kaq_dataset.txt"

In [105]:
df_history_law = spark.read.format("csv")\
    .option("header", "true")\
    .option("delimiter", "|")\
    .load(f"{file_path}{history_filename}")

In [106]:
df_history_law.count()

3098

## Concatenando Ambos Dataframes

In [107]:
es_kaq_dataset_xml_filename = "es_cak_dataset.xml"

# Uniendo ambos datasets
df_cak_es = df_history_law.select("es", "cak").union(df_new_testament.select("es.content", "cak.content"))
df_cak_es = df_cak_es.select(clean_text(col("es")).alias("es"),clean_text(col("cak")).alias("cak"))
df_cak_es.show()

## Barajando datasets
shuffled_df = df_cak_es.orderBy(F.rand(seed=2)).distinct()
nrows = shuffled_df.count()

## Separación en subconjuntos de datos
df_train_cak_es = shuffled_df.limit(int(round(0.8*nrows, 0)))
df_valid_cak_es = shuffled_df.subtract(df_train_cak_es).limit(int(round(0.1*nrows, 0)))

df_test_cak_es = shuffled_df.subtract(df_train_cak_es).subtract(df_valid_cak_es)



Then:  Column<'es'>
Now:  Column<'regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(trim(lower(es)), ’, ', 1), ", , 1), [\s]+[
]+, , 1), 	, , 1), \s,  , 1)'>
Then:  Column<'cak'>
Now:  Column<'regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(trim(lower(cak)), ’, ', 1), ", , 1), [\s]+[
]+, , 1), 	, , 1), \s,  , 1)'>
+--------------------+--------------------+
|                  es|                 cak|
+--------------------+--------------------+
|            abogado.|solonel mak/to'onel.|
|profesional del d...|ri etamanel rik'i...|
|            abortar.|           tzaqonïk.|
|provocar la muert...|rukamik jun ne'y ...|
|        absolutorio.|majun rumak/kuyun...|
| efecto de absolver.|rub'anik jun kuyu...|
|           absolver.|          kuyuj mak.|
|perdón o dar por ...|rukuyik rumak jun...|
|         posiciones.|         ch'ob'onïk.|
|cuando una de las...|toq jun chi ke ri...|
|         abstención.|choj k'ojlem/q'i'...|
|    falta de ac

In [108]:
df_cak_es.select("*").write\
    .format("com.databricks.spark.xml")\
    .option("rootTag", "data")\
    .option("rowTag", "sentence")\
    .mode("overwrite")\
    .save(f"{file_path}{es_kaq_dataset_xml_filename}")


## Generación de archivos de entrada para BPE

In [109]:
final_es_file = 'final_es_file.txt'
df_cak_es.select('es')\
    .repartition(1)\
    .write.format("text")\
    .mode("overwrite")\
    .option("header", "true")\
    .save(f"{file_path}{final_es_file}")



In [110]:
final_cak_file = 'final_cak_file.txt'

df_cak_es.select('cak')\
    .repartition(1)\
    .write.format("text")\
    .mode("overwrite")\
    .option("header", "true")\
    .save(f"{file_path}{final_cak_file}")

In [111]:
from os import listdir
from os.path import isfile, join
es_corpus_path = file_path+final_es_file+"/" + next(x for x in listdir(f"{file_path}{final_es_file}") if x.startswith("part"))
cak_corpus_path = file_path+final_cak_file+"/" +next(x for x in listdir(f"{file_path}{final_cak_file}") if x.startswith("part"))

In [112]:
from subword_nmt.learn_bpe import learn_bpe
import codecs
# Generación de archivo con subpalabras
print(es_corpus_path)
print(cak_corpus_path)
learn_bpe(codecs.open(es_corpus_path, encoding='utf-8'), codecs.open(file_path+"final_es_file.codes", 'w', encoding='utf-8'), 1000)
learn_bpe(codecs.open(cak_corpus_path, encoding='utf-8'), codecs.open(file_path+"final_cak_file.codes", 'w', encoding='utf-8'), 1000)

../Desktop/raw_sentences/final_es_file.txt/part-00000-783c8ed5-c2a3-412f-b70a-064e209c9f19-c000.txt
../Desktop/raw_sentences/final_cak_file.txt/part-00000-45a737d6-1c8b-4212-884a-ce4d7dc07f79-c000.txt


In [114]:
from subword_nmt.apply_bpe import BPE

# Codificación de archivos de entrenamiento con subpalabras
print(file_path+"final_es_file.codes")
es_codes_file = codecs.open(file_path+"final_es_file.codes", encoding='utf-8')
cak_codes_file = codecs.open(file_path+"final_cak_file.codes", encoding='utf-8')

print("es_corpus path: ", es_corpus_path)
es_corpus = codecs.open(es_corpus_path, encoding='utf-8')
cak_corpus = codecs.open(cak_corpus_path, encoding='utf-8')

es_bpe_corpus = codecs.open(file_path+"es_corpus.bpe", 'w', encoding='utf-8')
cak_bpe_corpus = codecs.open(file_path+"cak_corpus.bpe", 'w', encoding='utf-8')

es_bpe = BPE(es_codes_file)
cak_bpe = BPE(cak_codes_file)

n_es_corpus_lines = 0
n_cak_corpus_lines = 0

for line in es_corpus:
    es_bpe_corpus.write(es_bpe.process_line(line, None))
    n_es_corpus_lines = n_es_corpus_lines + 1
for line in cak_corpus:
    cak_bpe_corpus.write(cak_bpe.process_line(line, None))
    n_cak_corpus_lines = n_cak_corpus_lines + 1
print("#es corpus lines: ", n_es_corpus_lines)
print("#cak corpus lines: ", n_cak_corpus_lines)

../Desktop/raw_sentences/final_es_file.codes
es_corpus path:  ../Desktop/raw_sentences/final_es_file.txt/part-00000-783c8ed5-c2a3-412f-b70a-064e209c9f19-c000.txt
#es corpus lines:  10949
#cak corpus lines:  10949


In [37]:
from inspect import getmembers, isfunction
from subword_nmt import apply_bpe

## Generalización para generación de archivos de entrada al modelo
1. Se genera un archivo con las oraciones crudas
2. Se ingresa al modelo de BPE para entrenamiento y generación de vocabularios
3. Se pasa cada una de las líneas por el modelo BPE para generar los archivos de entrada al modelo

In [125]:

def df_preprocessing(df, alias, output_dir):
    df_text_dir = f"{output_dir}{alias}_dir"
    codes_file_path = f"{output_dir}{alias}.codes"
    bpe_path = f"{output_dir}{alias}.bpe"
    
    print("raw text dir: ", df_text_dir)
    print("codes file: ", codes_file_path)
    print("bpe file: ", bpe_path)
    
    print("Sample df rows: ")
    df.select('*').show()
    # Escribir df a archivo
    df.select('*')\
        .repartition(1)\
        .write.format('text')\
        .mode('overwrite')\
        .option('header', 'true')\
        .save(df_text_dir)
    text_file_path = df_text_dir+"/"+next(x for x in listdir(f"{df_text_dir}") if x.startswith("part"))
    learn_bpe(codecs.open(text_file_path, encoding='utf-8'), codecs.open(codes_file_path, "w", encoding='utf-8'), 1000)
    
    # Codificación de corpus a subpalabras
    codes_file = codecs.open(codes_file_path, encoding='utf-8')
    corpus_file = codecs.open(text_file_path, encoding='utf-8')
    bpe_corpus_file = codecs.open(bpe_path, 'w', encoding='utf-8')
    
    bpe = BPE(codes_file)
    n_lines = 0
    for line in corpus_file:
        n_lines += 1
        bpe_corpus_file.write(bpe.process_line(line, None))
    return n_lines

In [133]:
# Corpus de entrenamiento
df_preprocessing(df_train_cak_es.select('es'), 'es_train', '../Desktop/raw_sentences/preprocessed/')
df_preprocessing(df_train_cak_es.select('cak'), 'cak_train', '../Desktop/raw_sentences/preprocessed/')
# Corpus de validación
df_preprocessing(df_valid_cak_es.select('es'), 'es_valid', '../Desktop/raw_sentences/preprocessed/')
df_preprocessing(df_valid_cak_es.select('cak'), 'cak_valid', '../Desktop/raw_sentences/preprocessed/')
# Corpus de test
df_preprocessing(df_test_cak_es.select('es'), 'es_test', '../Desktop/raw_sentences/preprocessed/')
df_preprocessing(df_test_cak_es.select('cak'), 'cak_test', '../Desktop/raw_sentences/preprocessed/')

# Corpus general
df_preprocessing(df_cak_es.select('es'), 'es_general', '../Desktop/raw_sentences/preprocessed/')
df_preprocessing(df_cak_es.select('cak'), 'cak_general', '../Desktop/raw_sentences/preprocessed/')

raw text dir:  ../Desktop/raw_sentences/preprocessed/es_train_dir
codes file:  ../Desktop/raw_sentences/preprocessed/es_train.codes
bpe file:  ../Desktop/raw_sentences/preprocessed/es_train.bpe
Sample df rows: 
+--------------------+
|                  es|
+--------------------+
| efecto de absolver.|
|apoyo a la formac...|
|ausencia de culpa...|
|interés superior ...|
|          juramento.|
|es la unidad bási...|
|se le declara reb...|
|quebrantar, viola...|
|entre algunas per...|
|la información re...|
|listado de viajes...|
| y les dijo: --es...|
| recibidnos. a na...|
| porque cada árbo...|
| de cierto os dig...|
| les dijo jesús: ...|
| jesús recorría t...|
| por esto el mund...|
| asimismo, un bue...|
| sucedió que, est...|
+--------------------+
only showing top 20 rows

raw text dir:  ../Desktop/raw_sentences/preprocessed/cak_train_dir
codes file:  ../Desktop/raw_sentences/preprocessed/cak_train.codes
bpe file:  ../Desktop/raw_sentences/preprocessed/cak_train.bpe
Sample df rows

10949

In [136]:
!onmt-build-vocab --size 50000 --save_vocab ../Desktop/raw_sentences/preprocessed/es_vocab ../Desktop/raw_sentences/preprocessed/es_general.bpe

!onmt-build-vocab --size 50000 --save_vocab ../Desktop/raw_sentences/preprocessed/cak_vocab ../Desktop/raw_sentences/preprocessed/cak_general.bpe

2021-05-07 02:06:03.317203: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-05-07 02:06:04.294752: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-05-07 02:06:04.295480: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-05-07 02:06:04.335349: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-05-07 02:06:04.335409: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (mynorxico): /proc/driver/nvidia/version does not exist
2021-05-07 02:06:04.335676: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performa