In [3]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz


Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:7 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:14 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:15 http://ppa.launchpad.net/gra

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [5]:
!pip install -q findspark

In [6]:
import findspark
findspark.init() 

In [7]:
!pip install pyspark



In [8]:
!pip install sparknlp 



In [9]:
import pyspark
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

from pyspark.sql import SparkSession 

import pandas as pd




import re 
from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString,CountVectorizer 

from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline ,PipelineModel #Build a pipeline

from pyspark.ml.evaluation import MulticlassClassificationEvaluator 

import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp import DocumentAssembler


import os
import gc





In [10]:
from pyspark.sql import SparkSession #Import the spark session
from pyspark import SparkContext #Create a spark context
from pyspark.sql import SQLContext #Create an SQL context

import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("dialect classification")\
    .master("local[*]")\
    .config("spark.executor.memory", "12g").config("spark.driver.memory", "12g")\
    .config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","16g")\
    .config('spark.executor.cores', '3').config('spark.cores.max', '3')\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.2.3").getOrCreate()

In [11]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)


In [12]:
import pandas as pd
temp_pandas_df  = pd.read_csv('/content/drive/MyDrive/tweets_dialect_dataset.csv')
# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

spark_df=pandas_to_spark(temp_pandas_df)

In [13]:
spark_df.groupBy("dialect").count().orderBy("count").show()

+-------+-----+
|dialect|count|
+-------+-----+
|     TN| 9246|
|     YE| 9927|
|     MA|11539|
|     SD|14434|
|     IQ|15497|
|     DZ|16183|
|     SY|16242|
|     OM|19116|
|     BH|26292|
|     AE|26296|
|     SA|26832|
|     LB|27617|
|     JO|27921|
|     QA|31069|
|     LY|36499|
|     KW|42109|
|     PL|43742|
|     EG|57636|
+-------+-----+



In [14]:
indexer = StringIndexer(inputCol="dialect", outputCol="dialect_encoded") 
spark_df = indexer.fit(spark_df).transform(spark_df) 
spark_df.show()

+--------------------+-------+---------------+
|               tweet|dialect|dialect_encoded|
+--------------------+-------+---------------+
|@Nw8ieJUwaCAAreT ...|     IQ|           13.0|
|@7zNqXP0yrODdRjK ...|     IQ|           13.0|
|@KanaanRema مبين ...|     IQ|           13.0|
|@HAIDER76128900 ي...|     IQ|           13.0|
|@hmo2406 وين هل ا...|     IQ|           13.0|
|@Badi9595 @Kanaan...|     IQ|           13.0|
|@SarahNadhum90 @n...|     IQ|           13.0|
|@KanaanRema @Badi...|     IQ|           13.0|
|@SalahAlarbawi يم...|     IQ|           13.0|
|@Eng_alow91 @cb4L...|     IQ|           13.0|
|@kamal1277New وال...|     IQ|           13.0|
|@cb4LwpWrS1hT5lb ...|     IQ|           13.0|
|@QSHRXxV36EfuNXV ...|     IQ|           13.0|
|@3Obeidi ههههه عد...|     IQ|           13.0|
|@0b9lxe0ZNEUlnQm ...|     IQ|           13.0|
|@aaddssfr يسعد مس...|     IQ|           13.0|
|@ha___m___ed مااخ...|     IQ|           13.0|
|@kamal1277New اتر...|     IQ|           13.0|
|@sfer661 يأك

In [15]:
spark_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spark_df.columns]).show()

+-----+-------+---------------+
|tweet|dialect|dialect_encoded|
+-----+-------+---------------+
|    0|      0|              0|
+-----+-------+---------------+



In [16]:
spark_df.select("tweet").show(30,truncate= False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tweet                                                                                                                                                                                                                                               |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|@Nw8ieJUwaCAAreT لكن بالنهاية .. ينتفض .. يغير .                                                                                                                                                                                                    |
|@7zNqXP0yrO

In [17]:
def Clean_tweets(data):
     
    data = data.withColumn('tweet', F.regexp_replace('tweet', "(?:\@|http?\://|https?\://|www)\S+", '')) 
    data = data.withColumn('tweet', F.regexp_replace('tweet', '@\w+', '')) 
    data = data.withColumn('tweet', F.regexp_replace('tweet', '#', ''))
    data = data.withColumn('tweet', F.regexp_replace('tweet', 'RT', ''))

    data = data.withColumn('tweet', F.regexp_replace('tweet', "\s*[A-Za-z]+\b", '')) 
    data = data.withColumn('tweet', F.regexp_replace('tweet', '[0-9]+', '')) 


    data = data.withColumn('tweet', F.regexp_replace('tweet', '&amp;', ''))
    data = data.withColumn('tweet', F.regexp_replace('tweet', '&quot;', ''))
    data = data.withColumn('tweet', F.regexp_replace('tweet', '&gt;', ''))
    data = data.withColumn('tweet', F.regexp_replace('tweet', '&lt;', ''))

    data = data.withColumn('tweet', F.regexp_replace('tweet', '-', ''))
    return data



In [18]:
Clean_df = Clean_tweets(spark_df)

In [19]:
Clean_df.select("tweet").show(100,truncate= False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tweet                                                                                                                                                                                                                           |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| لكن بالنهاية .. ينتفض .. يغير .                                                                                                                                                                                                |
| يعني هذا محسوب على البشر .. حيونه ووحشيه .. وتطلبون من الغرب يحترمكم ويؤمن بدينكم ولاينعتك

In [20]:
document_assembler = DocumentAssembler() \
    .setInputCol("tweet") \
    .setOutputCol("document")

In [21]:
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")

stop_words = StopWordsCleaner.pretrained("stopwords_ar", "ar") \
        .setInputCols(["token"]) \
        .setOutputCol("cleanTokens")\
        .setCaseSensitive(False)
        
normalizer = Normalizer() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("normalized")\
    .setLowercase(True)

stopwords_ar download started this may take some time.
Approximate size to download 1.9 KB
[OK!]


In [22]:

finisher = Finisher() \
    .setInputCols(["normalized"]) \
    .setOutputCols(["token_features"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures")

idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

embeddings = WordEmbeddingsModel.pretrained("arabic_w2v_cc_300d", "ar") \
        .setInputCols(["document", "token"]) \
        .setOutputCol("embeddings")


arabic_w2v_cc_300d download started this may take some time.
Approximate size to download 1.1 GB
[OK!]


In [23]:
PreProcess_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            stop_words, 
            normalizer,
            finisher,
            hashingTF,
            idf,
            embeddings
])

In [24]:
PreProcessing_pipeline = PreProcess_pipeline.fit(Clean_df)

In [25]:
PreProcessed_dataset= PreProcessing_pipeline.transform(Clean_df)

In [26]:
PreProcessed_dataset.select('token_features' ,'embeddings' , 'features').show(truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [30]:
PreProcessing_pipeline.save('/content/drive/MyDrive/PreProcessing_Pipeline_')

In [36]:
train_dataset,test_dataset  = PreProcessed_dataset.select('tweet','features' , 'embeddings' , 'dialect' , 'dialect_encoded').randomSplit([0.7, 0.3],seed = 46)

In [30]:
train_dataset.write.format('parquet').save('train_dataset.parquet')
test_dataset.write.format('parquet').save('test_dataset.parquet')