In [1]:
# https://github.com/johanna23cct/integrated-CA2-MSc-2023094.git

In [2]:
#git remote add origin https://github.com/johanna23cct/integrated-CA2-MSc-2023094.git
#git branch -M main
#git push -u origin main

In [3]:
pip install Matplotlib

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install bokeh

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [5]:
pip install skforecast --user

Note: you may need to restart the kernel to use updated packages.


In [6]:
pip install nltk

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [7]:
# Data manipulation
# ==============================================================================
import numpy as np
import pandas as pd
import nltk
nltk.download('punkt')
nltk.download('stopwords')
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.stem import PorterStemmer, SnowballStemmer
from nltk.corpus import stopwords
from nltk import download

from textblob import TextBlob

# Plots
# ==============================================================================
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')
plt.rcParams['lines.linewidth'] = 1.5
%matplotlib inline

# Modeling and Forecasting
# ==============================================================================
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import Lasso
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline

from skforecast.ForecasterAutoreg import ForecasterAutoreg
from skforecast.ForecasterAutoregCustom import ForecasterAutoregCustom
#from skforecast.ForecasterAutoregMultiOutput import ForecasterAutoregMultiOutput
from skforecast.model_selection import grid_search_forecaster
from skforecast.model_selection import backtesting_forecaster

from joblib import dump, load

# Spark
# import SparkSession library 
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lower, concat_ws, regexp_replace
from pyspark.sql.functions import split, size, length, broadcast, sum
from pyspark.sql.types import DoubleType, StructType, StructField, ArrayType
from pyspark.sql.types import StringType, IntegerType, TimestampType
from pyspark.sql.types import *
from pyspark.ml.feature import StopWordsRemover
import pyspark.sql.types as typ
import pyspark.sql.functions as fn
import pyspark.sql.functions as F


# Warnings configuration
# ==============================================================================
import warnings
# warnings.filterwarnings('ignore')

[nltk_data] Downloading package punkt to /home/hduser/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/hduser/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [8]:
sc

In [9]:
# SparkContext 
sc.master

'local[*]'

In [10]:
# Creating a SparkSession

In [11]:
spark = SparkSession.builder.appName("data_projectTweets").getOrCreate()

In [12]:
#schema = StructType().add("Ids","integer").add("Date","string").add("Flag", "string").add("User",'string').add("Text", "string")

In [13]:
# ****************************************************************************************************************
# *                                  Start with the Data                                                         *
# ****************************************************************************************************************

In [14]:
# Big headache and colapse, to read the dataset
# I was load the file from my hadoop (local)
# df = spark.read.csv('home/hduser/Documnets/CA2', header+True, inferSchema=True)

In [15]:
#Move the dataset fiel to HDFS path from my terminal:
#First check:  $fs -ls/user1  from /home/hduser/Documents/CA2/
#Next move :   $hadoop fs-put./ProjectTweets.scv/user1
#Check again : $fs -ls/user1
#I will use direct from the path, to have one of five V' (Velocity) 


#folowiong you can see the two way to load the dataset, 
#    (command)+(file://)+(/path/)+(filename)
#I will use direct from the path, to have one of five V' (Velocity) 

Tweets_path = "/user1/ProjectTweets.csv"
#df = spark.read.csv("file:///home/hduser/Documents/CA2/ProjectTweets.csv", header=True, inferSchema = True)

In [16]:
Tweets_path

'/user1/ProjectTweets.csv'

In [17]:
#spark_df = spark.read.csv("file:///home/hduser/Documents/CA2/ProjectTweets.csv", header=True, inferSchema = True)

In [18]:
spark_df = spark.read.csv("/user1/ProjectTweets.csv", header=False, inferSchema=True)

                                                                                

In [19]:
column_names = ['_c0', 'Ids', 'Date', 'Flag', 'User', 'Text']

In [20]:
for i, column_name in enumerate(column_names):
    spark_df = spark_df.withColumnRenamed("_c" + str(i),column_name)

In [21]:
spark_df.show()

+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       Ids|                Date|    Flag|           User|                Text|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|  5|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|  7|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|Mon Apr 06 22:20:...|NO_QUERY|2Hood4Hollywood|@Tatiana_K nop

In [22]:
# display the total number of rows data
total_rows = spark_df.count()


                                                                                

In [23]:
formatted_total_rows = "{:,}".format(total_rows)
print("Total Rows:", formatted_total_rows)

Total Rows: 1,600,000


In [24]:
# Drop unnecessary column  ************************************************************

In [25]:
column_to_drop = ['Flag', 'Unnamed: 0']


In [26]:
spark_df = spark_df.drop('Flag')

In [27]:
spark_df.show()

+---+----------+--------------------+---------------+--------------------+
|_c0|       Ids|                Date|           User|                Text|
+---+----------+--------------------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|Mon Apr 06 22:19:...|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|         Karoli|@nationwideclass ...|
|  5|1467811372|Mon Apr 06 22:20:...|       joy_wolf|@Kwesidei not the...|
|  6|1467811592|Mon Apr 06 22:20:...|        mybirch|         Need a hug |
|  7|1467811594|Mon Apr 06 22:20:...|           coZZ|@LOLTrish hey  lo...|
|  8|1467811795|Mon Apr 06 22:20:...|2Hood4Hollywood|@Tatiana_K nope t...|
|  9|1467812025|Mon Apr 06 22:20:...|        mimismo|@twittera que me ...|
| 10|1467812416|Mon Apr 0

In [28]:
# Creating a UDF to apply VADER sentiment analysis to a Text column

In [29]:
def analyze_sentiment(Text):
    sid = SentimentIntensityAnalyzer()
    sentiment = sid.polarity_scores(Text)
    return sentiment['compound']

In [30]:
#register the UDF

In [31]:
sentiment_udf = udf(analyze_sentiment, DoubleType())

In [32]:
#Apply sentiment

In [33]:
spark_df = spark_df.withColumn("sentiment", sentiment_udf(spark_df["Text"]))

In [34]:
print(spark_df.show())

[Stage 7:>                                                          (0 + 1) / 1]

+---+----------+--------------------+---------------+--------------------+---------+
|_c0|       Ids|                Date|           User|                Text|sentiment|
+---+----------+--------------------+---------------+--------------------+---------+
|  0|1467810369|Mon Apr 06 22:19:...|_TheSpecialOne_|@switchfoot http:...|  -0.0173|
|  1|1467810672|Mon Apr 06 22:19:...|  scotthamilton|is upset that he ...|    -0.75|
|  2|1467810917|Mon Apr 06 22:19:...|       mattycus|@Kenichan I dived...|   0.4939|
|  3|1467811184|Mon Apr 06 22:19:...|        ElleCTF|my whole body fee...|    -0.25|
|  4|1467811193|Mon Apr 06 22:19:...|         Karoli|@nationwideclass ...|  -0.6597|
|  5|1467811372|Mon Apr 06 22:20:...|       joy_wolf|@Kwesidei not the...|      0.0|
|  6|1467811592|Mon Apr 06 22:20:...|        mybirch|         Need a hug |   0.4767|
|  7|1467811594|Mon Apr 06 22:20:...|           coZZ|@LOLTrish hey  lo...|    0.745|
|  8|1467811795|Mon Apr 06 22:20:...|2Hood4Hollywood|@Tatiana_K n

                                                                                

In [35]:
#why i have Flag  again??
print(column_names[:6])

['_c0', 'Ids', 'Date', 'Flag', 'User', 'Text']


In [36]:
#to show

spark_df.drop('Flag').show()

[Stage 8:>                                                          (0 + 1) / 1]

+---+----------+--------------------+---------------+--------------------+---------+
|_c0|       Ids|                Date|           User|                Text|sentiment|
+---+----------+--------------------+---------------+--------------------+---------+
|  0|1467810369|Mon Apr 06 22:19:...|_TheSpecialOne_|@switchfoot http:...|  -0.0173|
|  1|1467810672|Mon Apr 06 22:19:...|  scotthamilton|is upset that he ...|    -0.75|
|  2|1467810917|Mon Apr 06 22:19:...|       mattycus|@Kenichan I dived...|   0.4939|
|  3|1467811184|Mon Apr 06 22:19:...|        ElleCTF|my whole body fee...|    -0.25|
|  4|1467811193|Mon Apr 06 22:19:...|         Karoli|@nationwideclass ...|  -0.6597|
|  5|1467811372|Mon Apr 06 22:20:...|       joy_wolf|@Kwesidei not the...|      0.0|
|  6|1467811592|Mon Apr 06 22:20:...|        mybirch|         Need a hug |   0.4767|
|  7|1467811594|Mon Apr 06 22:20:...|           coZZ|@LOLTrish hey  lo...|    0.745|
|  8|1467811795|Mon Apr 06 22:20:...|2Hood4Hollywood|@Tatiana_K n

[Stage 8:>                                                          (0 + 1) / 1]                                                                                

In [37]:
print(spark_df.show())

[Stage 9:>                                                          (0 + 1) / 1]

+---+----------+--------------------+---------------+--------------------+---------+
|_c0|       Ids|                Date|           User|                Text|sentiment|
+---+----------+--------------------+---------------+--------------------+---------+
|  0|1467810369|Mon Apr 06 22:19:...|_TheSpecialOne_|@switchfoot http:...|  -0.0173|
|  1|1467810672|Mon Apr 06 22:19:...|  scotthamilton|is upset that he ...|    -0.75|
|  2|1467810917|Mon Apr 06 22:19:...|       mattycus|@Kenichan I dived...|   0.4939|
|  3|1467811184|Mon Apr 06 22:19:...|        ElleCTF|my whole body fee...|    -0.25|
|  4|1467811193|Mon Apr 06 22:19:...|         Karoli|@nationwideclass ...|  -0.6597|
|  5|1467811372|Mon Apr 06 22:20:...|       joy_wolf|@Kwesidei not the...|      0.0|
|  6|1467811592|Mon Apr 06 22:20:...|        mybirch|         Need a hug |   0.4767|
|  7|1467811594|Mon Apr 06 22:20:...|           coZZ|@LOLTrish hey  lo...|    0.745|
|  8|1467811795|Mon Apr 06 22:20:...|2Hood4Hollywood|@Tatiana_K n

                                                                                

In [38]:
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%>

In [39]:
# Creating my Dataframe (i did it in line 27 )

In [40]:
#        Basic Feature Extraction

In [41]:
Tweets = spark_df

In [42]:
Tweets = Tweets.withColumn("word_count", size(split(Tweets["Text"], " ")))

In [43]:
# Number of Words  

In [44]:
#Tutorial did't work because is a Panda code: TypeError: 'Column' object is not callable

#Tweets['word_count'] = Tweets['Text'].apply(lambda x: len(str(x).split("")))
#Tweets[['Text','word_count']].head()

In [45]:
Tweets.select("Text", "word_count").show()

+--------------------+----------+
|                Text|word_count|
+--------------------+----------+
|@switchfoot http:...|        20|
|is upset that he ...|        22|
|@Kenichan I dived...|        19|
|my whole body fee...|        11|
|@nationwideclass ...|        22|
|@Kwesidei not the...|         6|
|         Need a hug |         4|
|@LOLTrish hey  lo...|        24|
|@Tatiana_K nope t...|         7|
|@twittera que me ...|         6|
|spring break in p...|         8|
|I just re-pierced...|         6|
|@caregiving I cou...|        21|
|@octolinz16 It it...|        16|
|@smarrison i woul...|        23|
|@iamjazzyfizzle I...|        20|
|Hollis' death sce...|        19|
|about to file taxes |         5|
|@LettyA ahh ive a...|        12|
|@FakerPattyPattz ...|        13|
+--------------------+----------+
only showing top 20 rows



In [46]:
# Number of Characters

In [47]:
#Tutorial did't work because is a Panda code:TypeError: 'Column' object is not callable
#Tweets['char_count'] = Tweets['Text'].str.len()
## this also includes spaces
#Tweets[['Text','char_count']].head()

In [48]:
Tweets = Tweets.withColumn("char_count", length(Tweets["Text"]))

In [49]:
Tweets.select("Text", "char_count").show()

+--------------------+----------+
|                Text|char_count|
+--------------------+----------+
|@switchfoot http:...|       115|
|is upset that he ...|       111|
|@Kenichan I dived...|        89|
|my whole body fee...|        47|
|@nationwideclass ...|       111|
|@Kwesidei not the...|        29|
|         Need a hug |        11|
|@LOLTrish hey  lo...|        99|
|@Tatiana_K nope t...|        36|
|@twittera que me ...|        25|
|spring break in p...|        43|
|I just re-pierced...|        26|
|@caregiving I cou...|        94|
|@octolinz16 It it...|        77|
|@smarrison i woul...|       117|
|@iamjazzyfizzle I...|       103|
|Hollis' death sce...|        93|
|about to file taxes |        20|
|@LettyA ahh ive a...|        64|
|@FakerPattyPattz ...|        79|
+--------------------+----------+
only showing top 20 rows



In [50]:
#Basic Pre-processing

In [51]:
Tweets = Tweets.withColumn("Text", concat_ws("", lower(col("Text"))))

In [52]:
#Removing Punctuation 

In [53]:
Tweets = Tweets.withColumn("Text", regexp_replace(col("Text"), r'[^\w\s]', ''))

In [54]:
Tweets.select("Text").show()

+--------------------+
|                Text|
+--------------------+
|switchfoot httptw...|
|is upset that he ...|
|kenichan i dived ...|
|my whole body fee...|
|nationwideclass n...|
|kwesidei not the ...|
|         need a hug |
|loltrish hey  lon...|
|tatiana_k nope th...|
|twittera que me m...|
|spring break in p...|
|i just repierced ...|
|caregiving i coul...|
|octolinz16 it it ...|
|smarrison i would...|
|iamjazzyfizzle i ...|
|hollis death scen...|
|about to file taxes |
|lettya ahh ive al...|
|fakerpattypattz o...|
+--------------------+
only showing top 20 rows



In [55]:
spark_df.show()    #Why no apear all new columns?

[Stage 13:>                                                         (0 + 1) / 1]

+---+----------+--------------------+---------------+--------------------+---------+
|_c0|       Ids|                Date|           User|                Text|sentiment|
+---+----------+--------------------+---------------+--------------------+---------+
|  0|1467810369|Mon Apr 06 22:19:...|_TheSpecialOne_|@switchfoot http:...|  -0.0173|
|  1|1467810672|Mon Apr 06 22:19:...|  scotthamilton|is upset that he ...|    -0.75|
|  2|1467810917|Mon Apr 06 22:19:...|       mattycus|@Kenichan I dived...|   0.4939|
|  3|1467811184|Mon Apr 06 22:19:...|        ElleCTF|my whole body fee...|    -0.25|
|  4|1467811193|Mon Apr 06 22:19:...|         Karoli|@nationwideclass ...|  -0.6597|
|  5|1467811372|Mon Apr 06 22:20:...|       joy_wolf|@Kwesidei not the...|      0.0|
|  6|1467811592|Mon Apr 06 22:20:...|        mybirch|         Need a hug |   0.4767|
|  7|1467811594|Mon Apr 06 22:20:...|           coZZ|@LOLTrish hey  lo...|    0.745|
|  8|1467811795|Mon Apr 06 22:20:...|2Hood4Hollywood|@Tatiana_K n

                                                                                

In [56]:
#Count stop Word

In [57]:
spark = SparkSession.builder.appName("StopwordsCount").getOrCreate()

In [58]:
#Why i must to read again the dataset, if i did it in line [40]
Tweets = spark_df

In [59]:
stopwords = ["is", "the", "I"]

In [60]:
def count_stopwords(text):
    words = text.split()
    return len([word for word in words if word in stopwords])

In [61]:
count_stopwords_udf = udf(count_stopwords, IntegerType())

In [62]:
Tweets = Tweets.withColumn("Stopwords", count_stopwords_udf(col("Text")))

In [63]:
Tweets.select("Text", "Sentiment", "Stopwords").show()

[Stage 14:>                                                         (0 + 1) / 1]

+--------------------+---------+---------+
|                Text|Sentiment|Stopwords|
+--------------------+---------+---------+
|@switchfoot http:...|  -0.0173|        0|
|is upset that he ...|    -0.75|        1|
|@Kenichan I dived...|   0.4939|        2|
|my whole body fee...|    -0.25|        0|
|@nationwideclass ...|  -0.6597|        1|
|@Kwesidei not the...|      0.0|        1|
|         Need a hug |   0.4767|        0|
|@LOLTrish hey  lo...|    0.745|        0|
|@Tatiana_K nope t...|      0.0|        0|
|@twittera que me ...|      0.0|        0|
|spring break in p...|      0.0|        0|
|I just re-pierced...|      0.0|        1|
|@caregiving I cou...|  -0.5994|        3|
|@octolinz16 It it...|  -0.1027|        1|
|@smarrison i woul...|   0.3724|        1|
|@iamjazzyfizzle I...|   0.4545|        4|
|Hollis' death sce...|  -0.9081|        1|
|about to file taxes |      0.0|        0|
|@LettyA ahh ive a...|   0.6988|        1|
|@FakerPattyPattz ...|   0.1779|        1|
+----------

                                                                                

In [69]:
#Number  of Special characters 

In [70]:
spark = SparkSession.builder.appName("SpecialCharactersCount").getOrCreate()

In [71]:
Tweets = spark_df

In [72]:
Special_Characters = ["@", "#"]

In [73]:
def count_Special_Characters(text):
    count = 0
    for char in Special_Characters:
        count += text.count(char)
    return count

In [74]:
count_Special_Characters_udf = udf(count_Special_Characters, IntegerType())

In [75]:
Tweets = Tweets.withColumn("Special_Characters_count", count_Special_Characters_udf(col("Text")))

In [76]:
selected_columns = Tweets.select("Text", "Special_Characters_count")

In [77]:
selected_columns.show()

+--------------------+------------------------+
|                Text|Special_Characters_count|
+--------------------+------------------------+
|@switchfoot http:...|                       1|
|is upset that he ...|                       0|
|@Kenichan I dived...|                       1|
|my whole body fee...|                       0|
|@nationwideclass ...|                       1|
|@Kwesidei not the...|                       1|
|         Need a hug |                       0|
|@LOLTrish hey  lo...|                       1|
|@Tatiana_K nope t...|                       1|
|@twittera que me ...|                       1|
|spring break in p...|                       0|
|I just re-pierced...|                       0|
|@caregiving I cou...|                       1|
|@octolinz16 It it...|                       1|
|@smarrison i woul...|                       1|
|@iamjazzyfizzle I...|                       2|
|Hollis' death sce...|                       0|
|about to file taxes |                  

In [123]:
# Averages Word ** Spark session **

In [124]:
#######################################################

In [141]:
spark = SparkSession.builder.appName("AverageWordLength").getOrCreate()

In [142]:
# define 

In [143]:
def avg_word(sentence):
    words = sentence.split()
    return sum(len(word) for word in words) / len(words)

In [144]:
avg_word_udf = udf(avg_word, DoubleType())

In [145]:
Tweets = Tweets.withColumn("average_word_length", avg_word_udf(col("Text")))

In [146]:
#selected_columns = Tweets.select("Text", "average_word_length")

In [147]:
#selected_columns.show()

In [148]:
Tweets.show() 

Py4JJavaError: An error occurred while calling o388.showString.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:750)

The currently active SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.lang.Thread.run(Thread.java:750)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:118)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1522)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:102)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues(FileFormat.scala:131)
	at org.apache.spark.sql.execution.datasources.FileFormat.buildReaderWithPartitionValues$(FileFormat.scala:122)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:177)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:426)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:417)
	at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:504)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.python.EvalPythonExec.doExecute(EvalPythonExec.scala:88)
	at org.apache.spark.sql.execution.python.EvalPythonExec.doExecute$(EvalPythonExec.scala:87)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.doExecute(BatchEvalPythonExec.scala:34)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:526)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:454)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:497)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:50)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:750)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:326)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:445)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
	at sun.reflect.GeneratedMethodAccessor74.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [119]:
spark.stop()

In [None]:
spark = SparkSession.builder.appName("AverageWordLength").getOrCreate()

In [None]:
Tweets_df = Tweets

In [None]:
Tweets_df = spark.createDataFrame(Tweets)

In [None]:
def avg_word(sentence):
    words = sentence.split()
    return sum(len(word) for word in words) / len(words)

In [None]:
avg_word_udf = udf(avg_word, DoubleType())

In [None]:
Tweets_df  = Tweets_df .withColumn("average_word_length", avg_word_udf(col("_6")))

In [None]:
Tweets_df.show()

In [None]:
#Schema for the  DataFrame
#schema = StructType([StructField("sentence", StringType(), True)])

In [None]:
#  Crate a DataFrame

In [None]:
Tweets = [("Text",)]
#df = spark.createDataFrame(Tweets, schema)

In [None]:
#Split the sentence

In [None]:
#df =df.withColumn("words", split(col("sentence"), " "))
#df =df.withColumn("words_lengths", size(col("words")))

In [None]:
# Calcul the average word length

In [None]:
#avg_word_length = df.select(sum(col("word_lengths")).alias("total_length")).collect()[0]["total_length"] / df.count()

In [None]:
#print(f"Average word length: {avg_word_length}")

In [None]:
################################################################

In [None]:
spark = SparkSession.builder.appName("AverageWordLength").getOrCreate()

In [None]:
def avg_word(sentence):
    words = sentence.split()
    return sum(len(word) for word in words) / len(words)

In [None]:
avg_word_udf = udf(avg_word, DoubleType())

In [None]:
spark_df = spark_df.withColumn("average_word_length", avg_word_udf(col("Text")))

In [None]:
spark_df ["avg_word"] = spark_df ["Text"].apply(lambda x : avg_word(x))

In [None]:
avg_word_udf = udf(lambda sentence: avg_word(sentence), DoubleType())

In [None]:
Tweets = Tweets.withColumn("average_word_length", avg_word_udf(col("Text")))

In [None]:
Tweets.show()

In [None]:
def avg_word(sentence):
    words = sentence.split()
    return sum(len(word) for word in words) / len(words)

In [None]:
avg_word_udf = udf(avg_word, DoubleType())

In [None]:
Tweets = Tweets.withColumn("average_word_length", avg_word_udf(col("Text")))

In [None]:
Tweets["avg_word"] = Tweets["Text"].apply(lambda x : avg_word(x))

In [None]:
Tweets.select("Text").show()  #.show() Error   #it isn't show the "avg_word_udf"

In [None]:
#spark_df.stop()

In [None]:
#Tweets["avg_word"]= Tweets["Text"].apply(lambda x : avg_word(x))

In [None]:
#Tweets[['Text', 'avg_word']].head()

In [86]:
#*****************************************************************************************************************
#*                                       T O K E N I Z A T I O N                                                 *
#*****************************************************************************************************************

In [104]:
# UDF for Tokenization

In [105]:
def tokenize_Tweets(Text):
    blob = TextBlob(Text)
    return [str(word) for word in blob.words]

In [106]:
tokenize_udf = udf(tokenize_Tweets, ArrayType(StringType()))

In [107]:
#Apply the UDF

In [91]:
spark_df = spark_df.withColumn('tokenized_Tweets', tokenize_udf(spark_df['Text']))

In [112]:
spark_df[['Text', 'tokenized_Tweets']].show()

[Stage 18:>                                                         (0 + 1) / 1]

+--------------------+--------------------+
|                Text|    tokenized_Tweets|
+--------------------+--------------------+
|@switchfoot http:...|[switchfoot, http...|
|is upset that he ...|[is, upset, that,...|
|@Kenichan I dived...|[Kenichan, I, div...|
|my whole body fee...|[my, whole, body,...|
|@nationwideclass ...|[nationwideclass,...|
|@Kwesidei not the...|[Kwesidei, not, t...|
|         Need a hug |      [Need, a, hug]|
|@LOLTrish hey  lo...|[LOLTrish, hey, l...|
|@Tatiana_K nope t...|[Tatiana_K, nope,...|
|@twittera que me ...|[twittera, que, m...|
|spring break in p...|[spring, break, i...|
|I just re-pierced...|[I, just, re-pier...|
|@caregiving I cou...|[caregiving, I, c...|
|@octolinz16 It it...|[octolinz16, It, ...|
|@smarrison i woul...|[smarrison, i, wo...|
|@iamjazzyfizzle I...|[iamjazzyfizzle, ...|
|Hollis' death sce...|[Hollis, death, s...|
|about to file taxes |[about, to, file,...|
|@LettyA ahh ive a...|[LettyA, ahh, ive...|
|@FakerPattyPattz ...|[FakerPatt

                                                                                

In [111]:
#selected_columns = Tweets.select("Text", "tokenized_Tweets")


In [None]:
#****************************************************************************************************************
#*                                        S T R E M M I N G                                                     *
#****************************************************************************************************************

In [None]:
st = PorterStemmer()

In [None]:
# Define a Udf for Stremming

In [None]:
def stem_text(text):
    stemmer = SnowballStemmer("english")
    words = nltk.word_tokenize(Text)
    stemmer_words = [stemmer.stem(word) for word in words]
    return " ".join(stemmed_words)

In [None]:
# Register the UDF

In [None]:
strem_udf = udf(stem_text, StringType())

In [None]:
#Create a Spark session

In [None]:
spark = SparkSession.builder.appName("StemmingExample").getOrCreate()


In [None]:
#DataFrame 

In [None]:
Tweets_stemmed = spark_df.withColumn("Stemmed_Text", stem_udf(spark_df["Text"]))

In [None]:
# Show te resulting DataFrame

In [None]:
Tweets_stemmed.show(truncate=False)

In [None]:
#Data Procesising in panda

In [None]:
#df = df.withColumn("Text", lower(df["text"]))
#df = df.withColumn("Text", regexp_replace(df["text"], "[^a-z0-9\\s]", ""))

In [None]:
df = df.withColumn("Text", lower(df["text"]))
df = df.withColumn("Text", regexp_replace(df["text"], "[^a-z0-9\\s]", ""))

In [None]:
rows_df.shape

In [None]:
# some excercises, it didn't work to me

In [None]:
#header = tweets.first()

#tweets = tweets.filter(lambda row: row != header) 
#tweets = tweets.map(lambda row: [int(elem) for elem in row.split(',')])

In [None]:
#tweets = sc.textFile("Tweets_path ")
#Header = tweets.first()

#tweets = tweets.filter(lambda row: row != header) 
#tweets = tweets.map(lambda row: [int(elem) for elem in row.split(',')])

In [None]:
# creating the schema for my DataFrame
# was one  error because i didn' import pyspark.sql.types as typ
# but next did it, the command worked, great!

In [None]:
#fields = [
#    *[
#        typ.StructField(h[1:-1], typ.IntegerType(), True)
#        for h in header.split(',')
#    ]
#]
#schema = typ.StructType(fields)

In [None]:
# First put the Titles to see better

In [None]:
#column_names  = ['_c0','Ids', 'Date', 'Flag', 'User', 'Text']    
#full_df = pd.read_csv('file:///home/hduser/Documents/CA2/ProjectTweets.csv', header=None, names=column_names)

In [None]:
# rename the file whit  row's title  ******* ******* ****** ******* ****** ********

In [None]:
#spark_df = "Title_pro_Tweets.csv"
#Check if the file already Exists
#if not os.path.exists("Title_pro_Tweets.csv"):
    #perform DataFrame operaction and save the file
#   spark_df.write.csv("file:///home/hduser/Documents/CA2/Title_pro_Tweets.csv", header=True)
#else:
#    print("file already exists. No need to save it again.")

In [None]:
#schema = StructType().add("_c0","integer").add("Ids","integer").add("Date","string").add("Flag", "string").add("User",'string').add("Text", "string")

In [None]:
#Schema for the  DataFrame
#schema = StructType([StructField("sentence", StringType(), True)]