In [None]:
!pip install elephas

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

In [None]:
!tar xf /content/spark-3.0.3-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import Row
from pyspark.sql import SparkSession

In [None]:
ss = SparkSession.builder.master("local[*]").getOrCreate()
SpContext = ss.sparkContext

In [None]:
import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
from nltk.tokenize import word_tokenize
nltk.download('punkt')
nltk.download('stopwords')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
PATH = '/content/drive/MyDrive/BigData/dataset\ proyecto/myfile.csv'
df = ss.read.csv(PATH, header=True)

In [None]:
df = df.select('id', 'labels', 'tweet_text', 'labels_str')
df.take(3)

In [None]:
_stopwords = set(stopwords.words("english") + ['\'', '’'])

def limpiar(row):
  t = row['tweet_text']
  without_links = ' '.join([word for word in t.split(' ') if word[:4] != 'http']) # quitar los links
  tokens = word_tokenize(without_links) # tokenizar (dividir en tokens)
  tokens = map(lambda token: token.encode('ascii', 'ignore').decode('ascii'), tokens)
  tokens_without_sw = [word.lower() for word in tokens if word.isalpha() and not word in _stopwords]
  new_row = Row(id=row['id'], labels=row['labels'], tweet_text=' '.join(tokens_without_sw), labels_str=row['labels_str'])
  return new_row

In [None]:
df_limpio = df.rdd.map(limpiar)
df_limpio.take(2)

In [None]:
def input_texts(row):
  labels_list = row['labels']
  label = max(set(labels_list), key = labels_list.count)
  if label == ' ' or label == '[':
    label = 1
  if int(label) > 0:
    label = 1
  return Row(row['tweet_text'], str(label))

In [None]:
rdd_rows = df_limpio.map(input_texts)
columns = ["tweets","labels"]
rdd_df = ss.createDataFrame(rdd_rows).toDF(*columns)
rdd_df.show()

In [None]:
tweetValues = rdd_df.select('tweets').rdd.flatMap(lambda x: x).collect()

In [None]:
from keras.preprocessing.text import Tokenizer

# The maximum number of words to be used. (most frequent)
MAX_NB_WORDS = 100
# Max number of words in each complaint.
MAX_SEQUENCE_LENGTH = 5
# This is fixed.
EMBEDDING_DIM = 100
tokenizer = Tokenizer(num_words=MAX_NB_WORDS, filters='!"#$%&()*+,-./:;<=>?@[\]^_`{|}~', lower=True)
tokenizer.fit_on_texts(tweetValues)
word_index = tokenizer.word_index
print('Found %s unique tokens.' % len(word_index))

In [None]:
from keras.preprocessing.sequence import pad_sequences

X = tokenizer.texts_to_sequences(tweetValues)
X = pad_sequences(X, maxlen=MAX_SEQUENCE_LENGTH)
print('Shape of data tensor:', X.shape)

In [None]:
import pandas as pd
import numpy as np

Y = pd.get_dummies(rdd_df.select('labels').rdd.flatMap(lambda x: x).collect())
print('Shape of label tensor:', Y.shape)

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, Y_train, Y_test = train_test_split(X,Y, test_size = 0.10, random_state = 42)
print(X_train.shape,Y_train.shape)
print(X_test.shape,Y_test.shape)

In [None]:
from keras.models import Sequential
from keras.layers import Dense, Embedding, LSTM, SpatialDropout1D
from keras.callbacks import EarlyStopping

model = Sequential()
model.add(Embedding(MAX_NB_WORDS, EMBEDDING_DIM, input_length=X.shape[1]))
model.add(SpatialDropout1D(0.2))
model.add(LSTM(100, dropout=0.2, recurrent_dropout=0.2))
model.add(Dense(2, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
!pip install systemml

In [None]:
from systemml.mllearn import Keras2DML
import math

epochs = 5
batch_size = 64
samples = 134840
max_iter = int(epochs*math.ceil(samples/batch_size))
sysml_model = Keras2DML(ss, model, input_shape=(1,28,28), weights='weights_dir', batch_size=batch_size, max_iter=max_iter, test_interval=0, display=10)
sysml_model.fit(X_train, Y_train)