In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:
df=spark.read.csv('/content/gdrive/MyDrive/st1800-231/twitterClimateData.csv',inferSchema=True,header=True, sep=';')

In [None]:
df.head(5)

[Row(Unnamed: 0=0, id='1.21181e+18', author_id=7.59e+17, text='2020 is the year we #votethemout, the year we #climatestrike our hearts out, the year we #rebelforlife because without a liveable future nothing else matters. 2020 is the year we get shit done. (3/3)', retweets='15', permalink='https://twitter.com/Sphiamia/status/1211807074436431872', date='2019-12-31 00:31:35+00:00', formatted_date='Tue Dec 31 00:31:35 +0000 2019', favorites='46', mentions=None, hashtags='#votethemout #climatestrike #rebelforlife', geo=None, urls=None, search_hashtags='#climatestrike', location='California, USA'),
 Row(Unnamed: 0=1, id='1.21067e+18', author_id=22195472.0, text='Winter has not stopped this group of dedicated climate activists. They are an example to follow. #climatefriday #climatestrike #ClimateAction', retweets='9', permalink='https://twitter.com/StephDujarric/status/1210665747212591104', date='2019-12-27 20:56:21+00:00', formatted_date='Fri Dec 27 20:56:21 +0000 2019', favorites='35', men

In [None]:
df.printSchema()

root
 |-- Unnamed: 0: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- author_id: double (nullable = true)
 |-- text: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- date: string (nullable = true)
 |-- formatted_date: string (nullable = true)
 |-- favorites: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- geo: string (nullable = true)
 |-- urls: string (nullable = true)
 |-- search_hashtags: string (nullable = true)
 |-- location: string (nullable = true)



In [None]:
#Tokenización
from pyspark.ml.feature import Tokenizer
tokenization=Tokenizer(inputCol='text',outputCol='tokens')
tokenized_df=tokenization.transform(df)
tokenized_df.printSchema()
tokenized_df.show(5)

root
 |-- Unnamed: 0: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- author_id: double (nullable = true)
 |-- text: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- date: string (nullable = true)
 |-- formatted_date: string (nullable = true)
 |-- favorites: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- geo: string (nullable = true)
 |-- urls: string (nullable = true)
 |-- search_hashtags: string (nullable = true)
 |-- location: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------+-----------+-------------+--------------------+--------+--------------------+--------------------+--------------------+---------+-------------------+--------------------+----+--------------------+---------------+---------------+--------------------+
|Unnamed: 0|         id|    author_id|             

In [None]:
#Remover stop words
from pyspark.ml.feature import StopWordsRemover
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
refined_df=stopword_removal.transform(tokenized_df)
refined_df.select(['tokens','refined_tokens']).show(10,False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tokens                                                                                                                                                                                                                                       

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *

In [None]:
len_udf = udf(lambda s: len(s), IntegerType())

refined_count_df = refined_df.withColumn("token_count", len_udf(col('refined_tokens')))

In [None]:
from pyspark.ml.feature import CountVectorizer
count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')
cv_df=count_vec.fit(refined_df).transform(refined_df)
cv_df.select(['refined_tokens','features']).show(4,False)
bow = count_vec.fit(refined_df).vocabulary
print(bow)


In [None]:
from pyspark.ml.feature import HashingTF

l = len(bow)
hashing_vec=HashingTF(inputCol='refined_tokens',outputCol='tf_features',numFeatures=l)

hashing_df=hashing_vec.transform(refined_df)
hashing_df.show(4)

+----------+-----------+-------------+--------------------+--------+--------------------+--------------------+--------------------+---------+-------------------+--------------------+----+--------------------+---------------+---------------+--------------------+--------------------+--------------------+
|Unnamed: 0|         id|    author_id|                text|retweets|           permalink|                date|      formatted_date|favorites|           mentions|            hashtags| geo|                urls|search_hashtags|       location|              tokens|      refined_tokens|         tf_features|
+----------+-----------+-------------+--------------------+--------+--------------------+--------------------+--------------------+---------+-------------------+--------------------+----+--------------------+---------------+---------------+--------------------+--------------------+--------------------+
|         0|1.21181e+18|      7.59E17|2020 is the year ...|      15|https://twitter.c...

In [None]:
from pyspark.ml.feature import IDF
tf_idf_vec=IDF(inputCol='tf_features',outputCol='tf_idf_features')
tf_idf_df=tf_idf_vec.fit(hashing_df).transform(hashing_df)
tf_idf_df.show(4,False)

+----------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------------------------------------------------------------+-------------------------+------------------------------+---------+-------------------+------------------------------------------------------------------------------------------------------------------------------+----+---------------------------------------------------------------------------------------------------------------------+---------------+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------