<a href="https://colab.research.google.com/github/ekarla/AED2-2022/blob/main/PI3_BGD_2022_02_ELENKARLA_LEIDE_MARINA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

 <img src="https://drive.google.com/uc?id=18x0Fa9XWHlnH5OWkZ-UMrJVQCdsEmYQw" width=800/>

## Big Data - Projeto de Implementação 3 [2022/02] </br> 
### Dupla: Elenkarla Silva / Leide Marina

In [None]:
!wget -q https://raw.githubusercontent.com/ekarla/ProjetoBigData/main/train.csv

In [None]:
GCP_REGION = "us-central1"
GCP_ZONE = "us-central1-c"

PROJECT_ID = "ufam-bgd-2022-02-lmsf-evs"
PROJECT_NUMBER = "1076753341179"
PROJECT_NAME = "ufam-bgd-2022-02-lmsf-evs"

CLUSTER_NAME = "leide-elenkarla"

BUCKET_NAME = "lmsf-ufam-bucket-1"
BUCKET_URL = f"gs://{BUCKET_NAME}"

NOME_ARQUIVO_SCRIPT = "clusteringtext.py"
NOME_ARQUIVO_ENTRADA = "train.csv"
CAMINHO_ARQUIVO_ENTRADA = f"gs://{BUCKET_URL}/{NOME_ARQUIVO_ENTRADA}"

In [None]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth
    
    auth.authenticate_user()

In [None]:
#!gsutil mb -c standard -l $GCP_REGION -p $PROJECT_ID $BUCKET_URL
!gsutil cp $NOME_ARQUIVO_ENTRADA $BUCKET_URL/

In [None]:
!gcloud dataproc clusters create $CLUSTER_NAME --project=$PROJECT_NAME --region=$GCP_REGION --subnet default --zone=$GCP_ZONE --master-machine-type n1-standard-1 \
 --master-boot-disk-size 32 \
 --num-workers 5 \
 --worker-machine-type n1-standard-1 \
 --worker-boot-disk-size 32 \
 --image-version 1.4-ubuntu18 \
 --optional-components ANACONDA,JUPYTER \
 --bucket=$BUCKET_NAME


In [None]:
%%writefile $NOME_ARQUIVO_SCRIPT

from pyspark import *
from pyspark.sql import *
from pyspark.sql.functions import split, col
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark import SparkContext
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF
from pyspark.ml.feature import HashingTF
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

spark = SparkSession.builder.appName('LaboratorioPBD').getOrCreate()

def dataframe_token(df,tipo):
  sc =SparkContext.getOrCreate()
  locale = sc._jvm.java.util.Locale
  locale.setDefault(locale.forLanguageTag("en-US"))
  
  reviews = df.select("uid",tipo)
  tokenizer = Tokenizer(inputCol=tipo, outputCol="tokens")
  reviews = tokenizer.transform(reviews)
  swremover = StopWordsRemover(inputCol="tokens", outputCol="words")
  reviews = swremover.transform(reviews)

  return reviews

def cria_contador_de_ocorrencia(dataframe):
  cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 100)
  cvmodel = cv.fit(dataframe)
  feat_reviews = cvmodel.transform(dataframe)
  return feat_reviews

def calcula_valores_TF_IDF(dataframe):
  idf = IDF(inputCol="rawFeatures", outputCol="features")
  idfModel = idf.fit(dataframe)
  dataframe = idfModel.transform(dataframe)
  return dataframe

rawdata = spark.read.load("gs://lmsf-ufam-bucket-1/train.csv", format="csv", header=True)

rawdata = rawdata.withColumn('texto', split(rawdata['tweet'], '#').getItem(0)).withColumn('hashtag', split(rawdata['tweet'], '#').getItem(1))

rawdata = rawdata.fillna({"tweet":""})
rawdata = rawdata.fillna({"texto":""})
rawdata = rawdata.fillna({"hashtag":""})
rawdata = rawdata.withColumn("uid", monotonically_increasing_id())



reviews_texto = dataframe_token(rawdata,"texto")

feat_reviews_texto = cria_contador_de_ocorrencia(reviews_texto)
feat_reviews_texto = calcula_valores_TF_IDF(feat_reviews_texto)

reviews = dataframe_token(rawdata,"tweet")

feat_reviews = cria_contador_de_ocorrencia(reviews)
feat_reviews = calcula_valores_TF_IDF(feat_reviews)

reviews_hashtag = dataframe_token(rawdata,"hashtag")

feat_reviews_hashtag = cria_contador_de_ocorrencia(reviews_hashtag)
feat_reviews_hashtag = calcula_valores_TF_IDF(feat_reviews_hashtag)


kmeans =KMeans(featuresCol= "features", k=2)

modelo_v1 = kmeans.fit(feat_reviews_texto)
predictions_v1 = modelo_v1.transform(feat_reviews_texto)
predictions_v1.select("uid","texto","prediction").limit(5).toPandas()

modelo_v2 = kmeans.fit(feat_reviews_hashtag)
predictions_v2 = modelo_v2.transform(feat_reviews_hashtag)
predictions_v2.select("uid","hashtag","prediction").limit(5).toPandas()

modelo_v3 = kmeans.fit(feat_reviews)
predictions_v3 = modelo_v3.transform(feat_reviews)
predictions_v3.select("uid","tweet","prediction").limit(5).toPandas()

evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='features', \
                                metricName='silhouette', distanceMeasure='cosine')

silhouette1 = evaluator.evaluate(predictions_v1)
silhouette2 = evaluator.evaluate(predictions_v2)
silhouette3 = evaluator.evaluate(predictions_v3)

print("Silhouette with squared euclidean distance model v1 = " + str(silhouette1))
print("Silhouette with squared euclidean distance model v2 = " + str(silhouette2))
print("Silhouette with squared euclidean distance model v3 = " + str(silhouette3))


In [None]:
!gcloud dataproc jobs submit pyspark $NOME_ARQUIVO_SCRIPT --cluster=$CLUSTER_NAME --region=$GCP_REGION --project=$PROJECT_NAME -- $CAMINHO_ARQUIVO_ENTRADA

 LINK DA PAGINA DE EXECUÇÃO <br>
 http://leide-elenkarla-m:8088/proxy/application_1674944375475_0003/


In [None]:
##########    DELETEANDO O CLUSTER ##################

!gcloud dataproc clusters delete $CLUSTER_NAME --region=$GCP_REGION --project=$PROJECT_NAME

The cluster 'leide-elenkarla' and all attached disks will be deleted.

Do you want to continue (Y/n)?  