In [None]:
# ! pip install -q pyspark==3.1.2 spark-nlp

In [1]:
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp

In [2]:
# import pandas as pd
# pd.set_option('display.max_columns', None)
# # pd.set_option('display.max_rows', None)
# pd.set_option('display.max_colwidth', None)

In [3]:

spark = sparknlp.start()


print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

spark

Spark NLP version 3.3.4
Apache Spark version: 3.1.3


In [None]:
import pyspark.sql.functions as F

df = spark.read\
                .option("header", "True")\
                .csv("all_filtered_final.csv")\
                .withColumnRenamed("description", "text")

df.show()

In [5]:
df.printSchema()

root
 |-- code: string (nullable = true)
 |-- text: string (nullable = true)
 |-- y: string (nullable = true)



In [6]:
documentAssembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")\
    #.setCleanupMode("shrink")

sentenceDetector = SentenceDetector()\
      .setInputCols(['document'])\
      .setOutputCol('sentences')

tokenizer = Tokenizer() \
  .setInputCols(["sentences"]) \
  .setOutputCol("token")\

normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("token_normalized")\
    #.setCleanupPatterns(["[\\.|,|:|-|;|!|_|\\?]+"])

stop_words = StopWordsCleaner.load("pretrained/stopwords_ar_ar_2.5.4_2.4_1594742440256") \
      .setInputCols(["token_normalized"]) \
      .setOutputCol("cleanTokens")


lemmatizer = LemmatizerModel.load("pretrained/lemma_ar_2.7.0_2.4_1606572966993") \
      .setInputCols(["cleanTokens"]) \
      .setOutputCol("lemma")

# embeddings = WordEmbeddingsModel.load("pretrained/arabic_w2v_cc_300d_ar_2.7.0_2.4_1607168354606") \
#       .setInputCols(["document", "lemma"]) \
#       .setOutputCol("embeddings")

embeddings = AlbertEmbeddings.load("pretrained/albert_embeddings_albert_xlarge_arabic_ar_3.4.2_3.0_1649954299286") \
    .setInputCols(["document", "lemma"]) \
    .setOutputCol("embeddings")
    

In [7]:
nlpPipeline = Pipeline(stages=[
    documentAssembler,
    sentenceDetector,
    tokenizer,
    normalizer,
    stop_words,
    lemmatizer,
    #pos,
    embeddings
])
empty_df = spark.createDataFrame([['']]).toDF("text")

pipelineModel = nlpPipeline.fit(empty_df)
result = pipelineModel.transform(df)


In [8]:
# result.select('lemma.result').head()

In [9]:
# result

In [10]:
# result.select(F.explode(F.arrays_zip(result.lemma.result, result.embeddings.embeddings)).alias("cols")) \
#                   .select(F.expr("cols['0']").alias("token"),
#                           F.expr("cols['1']").alias("embeddings")).show()

In [11]:
# result_df = result.select(F.explode(F.arrays_zip(result.lemma.result, result.embeddings.embeddings)).alias("cols")) \
#                   .select(F.expr("cols['0']").alias("token"),
#                           F.expr("cols['1']").alias("embeddings"))

In [12]:
# result_df.head()

In [13]:
# x = result_df.toPandas()

In [14]:
# list(x.sample(frac = True))

In [15]:
# result.select('lemma.embeddings', 'lemma').toPandas()

In [16]:
final_to_run = result.select('embeddings.embeddings','y' )

In [17]:
# final_to_run.show()

In [18]:
# final_to_run.printSchema()


In [19]:
from pyspark.sql.functions import flatten
final_to_run = final_to_run.select(flatten(final_to_run.embeddings),final_to_run.y)

In [20]:
final_to_run= final_to_run.withColumnRenamed("flatten(embeddings)","embeddings")

In [21]:
final_to_run.printSchema()


root
 |-- embeddings: array (nullable = true)
 |    |-- element: float (containsNull = false)
 |-- y: string (nullable = true)



In [22]:
length = len(final_to_run.select('embeddings').take(1)[0][0])
length

88064

In [23]:
#final_to_run = final_to_run.select([final_to_run.embeddings[i] for i in range(length)])
final_to_run = final_to_run.select([final_to_run.embeddings[i] for i in range(length)])

In [24]:
# final_to_run.count()

In [25]:
# len(final_to_run.columns)

In [26]:
# final_to_run.show()

In [None]:
final_to_run.write.format("csv").save("alldata_Albertembeddings")

# merging the output to one file

In [26]:
import pandas as pd
import glob
import os

In [28]:
files = os.path.join("alldata_embeddings.csv", "*.csv")
files = glob.glob(files)
files

['alldata_embeddings.csv\\part-00000-3a2f20a1-c41c-4ffa-9d42-812a16d5d80f-c000.csv',
 'alldata_embeddings.csv\\part-00001-3a2f20a1-c41c-4ffa-9d42-812a16d5d80f-c000.csv',
 'alldata_embeddings.csv\\part-00002-3a2f20a1-c41c-4ffa-9d42-812a16d5d80f-c000.csv']

In [29]:
df = pd.concat(map(pd.read_csv, files), ignore_index=True)

In [34]:
df.shape

(10501, 69584)

In [31]:
col = []
for i in range(df.shape[1]):
    col.append(str(i))
    
df.columns = col

In [32]:
df.head(1)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,69574,69575,69576,69577,69578,69579,69580,69581,69582,69583
0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,,,,,,,,,,


In [33]:
df = df.fillna(0)

# Adding y column

In [35]:
out = pd.read_csv('all_filtered_final.csv')
out = out['y']
df['y'] = out

In [36]:
df.head(1)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,69575,69576,69577,69578,69579,69580,69581,69582,69583,y
0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,تكفل


In [37]:
df.shape

(10501, 69585)

# save the final embeddings (ready to be trained)

In [38]:
df.to_csv('all_final_embeddings.csv', index = False)